DPDK-dev Archive on lore.kernel.org
 help / color / mirror / Atom feed
* Re: [PATCH v1 2/2] Test cases for rte_memcmp functions
From: Thomas Monjalon @ 2017-01-02 20:41 UTC (permalink / raw)
  To: Wang, Zhihong, Ravi Kerur; +Cc: dev
In-Reply-To: <8F6C2BD409508844A0EFC19955BE094110759EFE@SHSMSX103.ccr.corp.intel.com>

2016-06-07 11:09, Wang, Zhihong:
> From: Ravi Kerur [mailto:rkerur@gmail.com]
> > Zhilong, Thomas,
> > 
> > If there is enough interest within DPDK community I can work on adding support
> > for 'unaligned access' and 'test cases' for it. Please let me know either way.
> 
> Hi Ravi,
> 
> This rte_memcmp is proved with better performance than glibc's in aligned
> cases, I think it has good value to DPDK lib.
> 
> Though we don't have memcmp in critical pmd data path, it offers a better
> choice for applications who do.

Re-thinking about this series, could it be some values to have a rte_memcmp
implementation?
What is the value compared to glibc one? Why not working on glibc?

^ permalink raw reply

* Re: DPDK Acceleartion Enhancement - compression
From: Stephen Hemminger @ 2017-01-02 22:10 UTC (permalink / raw)
  To: Ant loves honey; +Cc: Dev
In-Reply-To: <1815632810.2503307.1482854884965@mail.yahoo.com>

On Tue, 27 Dec 2016 16:08:04 +0000 (UTC)
Ant loves honey <ant_love_honey@yahoo.com> wrote:

> Is this the correct forum to ask question about adding code for DPDK Acceleration Enhancement?  Or this is strictly for code review? If so, please direct me to the correct forum.
> I have already read these 2 documents:
>    
>    - http://dpdk.org/doc/guides/ cryptodevs/qat.html
>    - http://dpdk.org/doc/guides/ sample_app_ug/intel_ quickassist.html
> 
> I am new to DPDK and would like to do some work but is hitting a road block.  I am interested in making DPDK able to to use the Intel QuickAssist Technology for data compression. I think this will help IP payload compression and save bandwidth.
> Any help to get me going is greatly appreciated.
> Anthony.

You will need to use a real email address and name when submitting patches in order to comply
with the Developer's Certificate of Origin.

^ permalink raw reply

* [PATCH 0/6] VMBUS and driver infrastructure
From: Stephen Hemminger @ 2017-01-02 23:08 UTC (permalink / raw)
  To: dev; +Cc: Stephen Hemminger

These are the patches to allow building drivers on VMBUS with
DPDK. The Hyper-V PMD (later patch set) builds on this.

Most of the infrastructure for the bus and device model is already
done, the one missing piece was that the eth_driver structure still
assumed all devices were PCI.


Stephen Hemminger (6):
  ethdev: increase length ethernet device internal name
  i40e: don't refer to eth_dev->pci_dev
  vmxnet3: don't refer to eth_dev->pci_dev
  cxgbe: don't refer to eth_dev->pci_dev
  ethdev: break ethernet driver and pci_driver connection
  eal: VMBUS infrastructure

 app/test/virtual_pmd.c                      |  22 +-
 doc/guides/rel_notes/deprecation.rst        |   3 +
 drivers/net/bnx2x/bnx2x_ethdev.c            |  16 +-
 drivers/net/bnxt/bnxt_ethdev.c              |  22 +-
 drivers/net/cxgbe/cxgbe_ethdev.c            |   8 +-
 drivers/net/cxgbe/sge.c                     |   9 +-
 drivers/net/e1000/em_ethdev.c               |  10 +-
 drivers/net/e1000/igb_ethdev.c              |  20 +-
 drivers/net/ena/ena_ethdev.c                |   8 +-
 drivers/net/enic/enic_ethdev.c              |   8 +-
 drivers/net/fm10k/fm10k_ethdev.c            |  10 +-
 drivers/net/i40e/i40e_ethdev.c              |  10 +-
 drivers/net/i40e/i40e_ethdev_vf.c           |  10 +-
 drivers/net/i40e/i40e_fdir.c                |   3 +-
 drivers/net/ixgbe/ixgbe_ethdev.c            |  20 +-
 drivers/net/mlx4/mlx4.c                     |   8 +-
 drivers/net/mlx5/mlx5.c                     |   8 +-
 drivers/net/nfp/nfp_net.c                   |   8 +-
 drivers/net/qede/qede_ethdev.c              |  42 +-
 drivers/net/szedata2/rte_eth_szedata2.c     |  10 +-
 drivers/net/thunderx/nicvf_ethdev.c         |   8 +-
 drivers/net/virtio/virtio_ethdev.c          |  10 +-
 drivers/net/vmxnet3/vmxnet3_ethdev.c        |  10 +-
 drivers/net/vmxnet3/vmxnet3_rxtx.c          |   5 +-
 lib/librte_eal/common/Makefile              |   2 +-
 lib/librte_eal/common/eal_common_devargs.c  |   7 +
 lib/librte_eal/common/eal_common_options.c  |  38 ++
 lib/librte_eal/common/eal_internal_cfg.h    |   3 +-
 lib/librte_eal/common/eal_options.h         |   6 +
 lib/librte_eal/common/eal_private.h         |   5 +
 lib/librte_eal/common/include/rte_devargs.h |   8 +
 lib/librte_eal/common/include/rte_vmbus.h   | 249 ++++++++
 lib/librte_eal/linuxapp/eal/Makefile        |   6 +
 lib/librte_eal/linuxapp/eal/eal.c           |  11 +
 lib/librte_eal/linuxapp/eal/eal_vmbus.c     | 907 ++++++++++++++++++++++++++++
 lib/librte_ether/rte_ethdev.c               |  99 ++-
 lib/librte_ether/rte_ethdev.h               |  49 +-
 mk/rte.app.mk                               |   1 +
 38 files changed, 1542 insertions(+), 137 deletions(-)
 create mode 100644 lib/librte_eal/common/include/rte_vmbus.h
 create mode 100644 lib/librte_eal/linuxapp/eal/eal_vmbus.c

-- 
2.11.0

^ permalink raw reply

* [PATCH 1/6] ethdev: increase length ethernet device internal name
From: Stephen Hemminger @ 2017-01-02 23:08 UTC (permalink / raw)
  To: dev; +Cc: Stephen Hemminger
In-Reply-To: <20170102230850.32610-1-sthemmin@microsoft.com>

Allow sufficicent space for UUID in string form (36+1) which
is necessary with VMBUS. 

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
---
 doc/guides/rel_notes/deprecation.rst | 3 +++
 lib/librte_ether/rte_ethdev.h        | 6 +++++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/doc/guides/rel_notes/deprecation.rst b/doc/guides/rel_notes/deprecation.rst
index 1438c777..69669e44 100644
--- a/doc/guides/rel_notes/deprecation.rst
+++ b/doc/guides/rel_notes/deprecation.rst
@@ -58,6 +58,9 @@ Deprecation Notices
   ``port`` field, may be moved or removed as part of this mbuf work. A
   ``timestamp`` will also be added.
 
+* ethdev: for 17.02 the size of internal device name will be increased
+  to 40 characters to allow for storing UUID.
+
 * The mbuf flags PKT_RX_VLAN_PKT and PKT_RX_QINQ_PKT are deprecated and
   are respectively replaced by PKT_RX_VLAN_STRIPPED and
   PKT_RX_QINQ_STRIPPED, that are better described. The old flags and
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index fb517544..09dd744d 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1649,7 +1649,11 @@ struct rte_eth_dev_sriov {
 };
 #define RTE_ETH_DEV_SRIOV(dev)         ((dev)->data->sriov)
 
-#define RTE_ETH_NAME_MAX_LEN (32)
+/*
+ * Internal identifier length
+ * Sufficiently large to allow for UUID or PCI address
+ */
+#define RTE_ETH_NAME_MAX_LEN 40
 
 /**
  * @internal
-- 
2.11.0

^ permalink raw reply related

* [PATCH 2/6] i40e: don't refer to eth_dev->pci_dev
From: Stephen Hemminger @ 2017-01-02 23:08 UTC (permalink / raw)
  To: dev; +Cc: Stephen Hemminger
In-Reply-To: <20170102230850.32610-1-sthemmin@microsoft.com>

Later patches remove pci_dev from the ethernet device structure.
Fix the i40e code to just use it's own name when forming zone name.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
---
 drivers/net/i40e/i40e_fdir.c | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/drivers/net/i40e/i40e_fdir.c b/drivers/net/i40e/i40e_fdir.c
index 335bf15c..68a2523c 100644
--- a/drivers/net/i40e/i40e_fdir.c
+++ b/drivers/net/i40e/i40e_fdir.c
@@ -250,8 +250,7 @@ i40e_fdir_setup(struct i40e_pf *pf)
 	}
 
 	/* reserve memory for the fdir programming packet */
-	snprintf(z_name, sizeof(z_name), "%s_%s_%d",
-			eth_dev->driver->pci_drv.driver.name,
+	snprintf(z_name, sizeof(z_name), "i40e_%s_%d",
 			I40E_FDIR_MZ_NAME,
 			eth_dev->data->port_id);
 	mz = i40e_memzone_reserve(z_name, I40E_FDIR_PKT_LEN, SOCKET_ID_ANY);
-- 
2.11.0

^ permalink raw reply related

* [PATCH 3/6] vmxnet3: don't refer to eth_dev->pci_dev
From: Stephen Hemminger @ 2017-01-02 23:08 UTC (permalink / raw)
  To: dev; +Cc: Stephen Hemminger
In-Reply-To: <20170102230850.32610-1-sthemmin@microsoft.com>

Fix the vmxnet3 code to just use it's own name when forming zone name.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
---
 drivers/net/vmxnet3/vmxnet3_rxtx.c | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/drivers/net/vmxnet3/vmxnet3_rxtx.c b/drivers/net/vmxnet3/vmxnet3_rxtx.c
index b1091688..772cdf18 100644
--- a/drivers/net/vmxnet3/vmxnet3_rxtx.c
+++ b/drivers/net/vmxnet3/vmxnet3_rxtx.c
@@ -793,9 +793,8 @@ ring_dma_zone_reserve(struct rte_eth_dev *dev, const char *ring_name,
 	char z_name[RTE_MEMZONE_NAMESIZE];
 	const struct rte_memzone *mz;
 
-	snprintf(z_name, sizeof(z_name), "%s_%s_%d_%d",
-		 dev->driver->pci_drv.driver.name, ring_name,
-		 dev->data->port_id, queue_id);
+	snprintf(z_name, sizeof(z_name), "vmxnet3_%s_%d_%d",
+		 ring_name, dev->data->port_id, queue_id);
 
 	mz = rte_memzone_lookup(z_name);
 	if (mz)
-- 
2.11.0

^ permalink raw reply related

* [PATCH 4/6] cxgbe: don't refer to eth_dev->pci_dev
From: Stephen Hemminger @ 2017-01-02 23:08 UTC (permalink / raw)
  To: dev; +Cc: Stephen Hemminger
In-Reply-To: <20170102230850.32610-1-sthemmin@microsoft.com>

Later patches remove pci_dev from the ethernet device structure.
Fix the cxgbe code to just use it's own name when forming zone name.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
---
 drivers/net/cxgbe/sge.c | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/drivers/net/cxgbe/sge.c b/drivers/net/cxgbe/sge.c
index 736f08ce..e935dc42 100644
--- a/drivers/net/cxgbe/sge.c
+++ b/drivers/net/cxgbe/sge.c
@@ -1644,8 +1644,7 @@ int t4_sge_alloc_rxq(struct adapter *adap, struct sge_rspq *iq, bool fwevtq,
 	/* Size needs to be multiple of 16, including status entry. */
 	iq->size = cxgbe_roundup(iq->size, 16);
 
-	snprintf(z_name, sizeof(z_name), "%s_%s_%d_%d",
-		 eth_dev->driver->pci_drv.driver.name,
+	snprintf(z_name, sizeof(z_name), "cxgbe_%s_%d_%d",
 		 fwevtq ? "fwq_ring" : "rx_ring",
 		 eth_dev->data->port_id, queue_id);
 	snprintf(z_name_sw, sizeof(z_name_sw), "%s_sw_ring", z_name);
@@ -1697,8 +1696,7 @@ int t4_sge_alloc_rxq(struct adapter *adap, struct sge_rspq *iq, bool fwevtq,
 			fl->size = s->fl_starve_thres - 1 + 2 * 8;
 		fl->size = cxgbe_roundup(fl->size, 8);
 
-		snprintf(z_name, sizeof(z_name), "%s_%s_%d_%d",
-			 eth_dev->driver->pci_drv.driver.name,
+		snprintf(z_name, sizeof(z_name), "cxgbe_%s_%d_%d",
 			 fwevtq ? "fwq_ring" : "fl_ring",
 			 eth_dev->data->port_id, queue_id);
 		snprintf(z_name_sw, sizeof(z_name_sw), "%s_sw_ring", z_name);
@@ -1893,8 +1891,7 @@ int t4_sge_alloc_eth_txq(struct adapter *adap, struct sge_eth_txq *txq,
 	/* Add status entries */
 	nentries = txq->q.size + s->stat_len / sizeof(struct tx_desc);
 
-	snprintf(z_name, sizeof(z_name), "%s_%s_%d_%d",
-		 eth_dev->driver->pci_drv.driver.name, "tx_ring",
+	snprintf(z_name, sizeof(z_name), "cxgbe_tx_ring_%d_%d",
 		 eth_dev->data->port_id, queue_id);
 	snprintf(z_name_sw, sizeof(z_name_sw), "%s_sw_ring", z_name);
 
-- 
2.11.0

^ permalink raw reply related

* [PATCH 5/6] ethdev: break ethernet driver and pci_driver connection
From: Stephen Hemminger @ 2017-01-02 23:08 UTC (permalink / raw)
  To: dev; +Cc: Stephen Hemminger
In-Reply-To: <20170102230850.32610-1-sthemmin@microsoft.com>

There are multiple buses and device types now. Therefore it no longer
makes sense that PCI driver information is part of the Ethernet driver
structure.

This patch removes pci_driver from eth_driver and introduces a
new combined structure for use in all existing PMD's. The rationale
is that although all existing PCI drivers are Ethernet drivers,
it make sense that future projects may want to support PCI devices
that are not Ethernet.

It also removes the requirement that driver is first element in
PCI driver structure.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
---
 app/test/virtual_pmd.c                  | 22 ++++++++---------
 drivers/net/bnx2x/bnx2x_ethdev.c        | 16 ++++++++-----
 drivers/net/bnxt/bnxt_ethdev.c          | 22 +++++++++--------
 drivers/net/cxgbe/cxgbe_ethdev.c        |  8 ++++---
 drivers/net/e1000/em_ethdev.c           | 10 ++++----
 drivers/net/e1000/igb_ethdev.c          | 20 +++++++++-------
 drivers/net/ena/ena_ethdev.c            |  8 ++++---
 drivers/net/enic/enic_ethdev.c          |  8 ++++---
 drivers/net/fm10k/fm10k_ethdev.c        | 10 ++++----
 drivers/net/i40e/i40e_ethdev.c          | 10 ++++----
 drivers/net/i40e/i40e_ethdev_vf.c       | 10 ++++----
 drivers/net/ixgbe/ixgbe_ethdev.c        | 20 +++++++++-------
 drivers/net/mlx4/mlx4.c                 |  8 ++++---
 drivers/net/mlx5/mlx5.c                 |  8 ++++---
 drivers/net/nfp/nfp_net.c               |  8 ++++---
 drivers/net/qede/qede_ethdev.c          | 42 +++++++++++++++++----------------
 drivers/net/szedata2/rte_eth_szedata2.c | 10 ++++----
 drivers/net/thunderx/nicvf_ethdev.c     |  8 ++++---
 drivers/net/virtio/virtio_ethdev.c      | 10 ++++----
 drivers/net/vmxnet3/vmxnet3_ethdev.c    | 10 ++++----
 lib/librte_ether/rte_ethdev.c           |  9 +++----
 lib/librte_ether/rte_ethdev.h           | 18 +++++++++-----
 22 files changed, 172 insertions(+), 123 deletions(-)

diff --git a/app/test/virtual_pmd.c b/app/test/virtual_pmd.c
index 6e4dcd8f..e7f56527 100644
--- a/app/test/virtual_pmd.c
+++ b/app/test/virtual_pmd.c
@@ -533,7 +533,7 @@ virtual_ethdev_create(const char *name, struct ether_addr *mac_addr,
 	struct rte_pci_device *pci_dev = NULL;
 	struct rte_eth_dev *eth_dev = NULL;
 	struct eth_driver *eth_drv = NULL;
-	struct rte_pci_driver *pci_drv = NULL;
+	struct rte_pci_eth_driver *pci_eth_drv = NULL;
 	struct rte_pci_id *id_table = NULL;
 	struct virtual_ethdev_private *dev_private = NULL;
 	char name_buf[RTE_RING_NAMESIZE];
@@ -554,8 +554,8 @@ virtual_ethdev_create(const char *name, struct ether_addr *mac_addr,
 	if (eth_drv == NULL)
 		goto err;
 
-	pci_drv = rte_zmalloc_socket(name, sizeof(*pci_drv), 0, socket_id);
-	if (pci_drv == NULL)
+	pci_eth_drv = rte_zmalloc_socket(name, sizeof(*pci_eth_drv), 0, socket_id);
+	if (pci_eth_drv == NULL)
 		goto err;
 
 	id_table = rte_zmalloc_socket(name, sizeof(*id_table), 0, socket_id);
@@ -585,17 +585,15 @@ virtual_ethdev_create(const char *name, struct ether_addr *mac_addr,
 		goto err;
 
 	pci_dev->device.numa_node = socket_id;
-	pci_drv->driver.name = virtual_ethdev_driver_name;
-	pci_drv->id_table = id_table;
+	pci_eth_drv->pci_drv.driver.name = virtual_ethdev_driver_name;
+	pci_eth_drv->pci_drv.id_table = id_table;
 
 	if (isr_support)
-		pci_drv->drv_flags |= RTE_PCI_DRV_INTR_LSC;
+		pci_eth_drv->pci_drv.drv_flags |= RTE_PCI_DRV_INTR_LSC;
 	else
-		pci_drv->drv_flags &= ~RTE_PCI_DRV_INTR_LSC;
+		pci_eth_drv->pci_drv.drv_flags &= ~RTE_PCI_DRV_INTR_LSC;
 
-
-	eth_drv->pci_drv = (struct rte_pci_driver)(*pci_drv);
-	eth_dev->driver = eth_drv;
+	eth_dev->driver = &pci_eth_drv->eth_drv;
 
 	eth_dev->data->nb_rx_queues = (uint16_t)1;
 	eth_dev->data->nb_tx_queues = (uint16_t)1;
@@ -622,7 +620,7 @@ virtual_ethdev_create(const char *name, struct ether_addr *mac_addr,
 	dev_private->dev_ops = virtual_ethdev_default_dev_ops;
 	eth_dev->dev_ops = &dev_private->dev_ops;
 
-	pci_dev->device.driver = &eth_drv->pci_drv.driver;
+	pci_dev->device.driver = &pci_eth_drv->pci_drv.driver;
 	eth_dev->device = &pci_dev->device;
 
 	eth_dev->rx_pkt_burst = virtual_ethdev_rx_burst_success;
@@ -632,7 +630,7 @@ virtual_ethdev_create(const char *name, struct ether_addr *mac_addr,
 
 err:
 	rte_free(pci_dev);
-	rte_free(pci_drv);
+	rte_free(pci_eth_drv);
 	rte_free(eth_drv);
 	rte_free(id_table);
 	rte_free(dev_private);
diff --git a/drivers/net/bnx2x/bnx2x_ethdev.c b/drivers/net/bnx2x/bnx2x_ethdev.c
index 2735fd0f..296855dd 100644
--- a/drivers/net/bnx2x/bnx2x_ethdev.c
+++ b/drivers/net/bnx2x/bnx2x_ethdev.c
@@ -617,29 +617,33 @@ eth_bnx2xvf_dev_init(struct rte_eth_dev *eth_dev)
 	return bnx2x_common_dev_init(eth_dev, 1);
 }
 
-static struct eth_driver rte_bnx2x_pmd = {
+static struct rte_pci_eth_driver rte_bnx2x_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_bnx2x_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_bnx2x_dev_init,
-	.dev_private_size = sizeof(struct bnx2x_softc),
+	eth_drv = {
+		.eth_dev_init = eth_bnx2x_dev_init,
+		.dev_private_size = sizeof(struct bnx2x_softc),
+	},
 };
 
 /*
  * virtual function driver struct
  */
-static struct eth_driver rte_bnx2xvf_pmd = {
+static struct rte_pci_eth_driver rte_bnx2xvf_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_bnx2xvf_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_bnx2xvf_dev_init,
-	.dev_private_size = sizeof(struct bnx2x_softc),
+	eth_drv = {
+		.eth_dev_init = eth_bnx2xvf_dev_init,
+		.dev_private_size = sizeof(struct bnx2x_softc),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_bnx2x, rte_bnx2x_pmd.pci_drv);
diff --git a/drivers/net/bnxt/bnxt_ethdev.c b/drivers/net/bnxt/bnxt_ethdev.c
index 7518b6b7..9017825b 100644
--- a/drivers/net/bnxt/bnxt_ethdev.c
+++ b/drivers/net/bnxt/bnxt_ethdev.c
@@ -1164,17 +1164,19 @@ bnxt_dev_uninit(struct rte_eth_dev *eth_dev) {
 	return rc;
 }
 
-static struct eth_driver bnxt_rte_pmd = {
+static struct rte_pci_eth_driver bnxt_rte_pmd = {
 	.pci_drv = {
-		    .id_table = bnxt_pci_id_map,
-		    .drv_flags = RTE_PCI_DRV_NEED_MAPPING |
-			    RTE_PCI_DRV_DETACHABLE | RTE_PCI_DRV_INTR_LSC,
-		    .probe = rte_eth_dev_pci_probe,
-		    .remove = rte_eth_dev_pci_remove
-		    },
-	.eth_dev_init = bnxt_dev_init,
-	.eth_dev_uninit = bnxt_dev_uninit,
-	.dev_private_size = sizeof(struct bnxt),
+		.id_table = bnxt_pci_id_map,
+		.drv_flags = RTE_PCI_DRV_NEED_MAPPING |
+			     RTE_PCI_DRV_DETACHABLE | RTE_PCI_DRV_INTR_LSC,
+		.probe = rte_eth_dev_pci_probe,
+		.remove = rte_eth_dev_pci_remove
+	},
+	.eth_drv = {
+		.eth_dev_init = bnxt_dev_init,
+		.eth_dev_uninit = bnxt_dev_uninit,
+		.dev_private_size = sizeof(struct bnxt),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_bnxt, bnxt_rte_pmd.pci_drv);
diff --git a/drivers/net/cxgbe/cxgbe_ethdev.c b/drivers/net/cxgbe/cxgbe_ethdev.c
index 64345e37..ccf93904 100644
--- a/drivers/net/cxgbe/cxgbe_ethdev.c
+++ b/drivers/net/cxgbe/cxgbe_ethdev.c
@@ -1039,15 +1039,17 @@ static int eth_cxgbe_dev_init(struct rte_eth_dev *eth_dev)
 	return err;
 }
 
-static struct eth_driver rte_cxgbe_pmd = {
+static struct rte_pci_eth_driver rte_cxgbe_pmd = {
 	.pci_drv = {
 		.id_table = cxgb4_pci_tbl,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_cxgbe_dev_init,
-	.dev_private_size = sizeof(struct port_info),
+	.eth_drv = {
+		.eth_dev_init = eth_cxgbe_dev_init,
+		.dev_private_size = sizeof(struct port_info),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_cxgbe, rte_cxgbe_pmd.pci_drv);
diff --git a/drivers/net/e1000/em_ethdev.c b/drivers/net/e1000/em_ethdev.c
index 436acbb5..f5bb0764 100644
--- a/drivers/net/e1000/em_ethdev.c
+++ b/drivers/net/e1000/em_ethdev.c
@@ -388,7 +388,7 @@ eth_em_dev_uninit(struct rte_eth_dev *eth_dev)
 	return 0;
 }
 
-static struct eth_driver rte_em_pmd = {
+static struct rte_pci_eth_driver rte_em_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_em_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC |
@@ -396,9 +396,11 @@ static struct eth_driver rte_em_pmd = {
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_em_dev_init,
-	.eth_dev_uninit = eth_em_dev_uninit,
-	.dev_private_size = sizeof(struct e1000_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_em_dev_init,
+		.eth_dev_uninit = eth_em_dev_uninit,
+		.dev_private_size = sizeof(struct e1000_adapter),
+	},
 };
 
 static int
diff --git a/drivers/net/e1000/igb_ethdev.c b/drivers/net/e1000/igb_ethdev.c
index 4a154479..a8e769c1 100644
--- a/drivers/net/e1000/igb_ethdev.c
+++ b/drivers/net/e1000/igb_ethdev.c
@@ -1078,7 +1078,7 @@ eth_igbvf_dev_uninit(struct rte_eth_dev *eth_dev)
 	return 0;
 }
 
-static struct eth_driver rte_igb_pmd = {
+static struct rte_pci_eth_driver rte_igb_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_igb_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC |
@@ -1086,24 +1086,28 @@ static struct eth_driver rte_igb_pmd = {
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_igb_dev_init,
-	.eth_dev_uninit = eth_igb_dev_uninit,
-	.dev_private_size = sizeof(struct e1000_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_igb_dev_init,
+		.eth_dev_uninit = eth_igb_dev_uninit,
+		.dev_private_size = sizeof(struct e1000_adapter),
+	},
 };
 
 /*
  * virtual function driver struct
  */
-static struct eth_driver rte_igbvf_pmd = {
+static struct rte_pci_eth_driver rte_igbvf_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_igbvf_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_DETACHABLE,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_igbvf_dev_init,
-	.eth_dev_uninit = eth_igbvf_dev_uninit,
-	.dev_private_size = sizeof(struct e1000_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_igbvf_dev_init,
+		.eth_dev_uninit = eth_igbvf_dev_uninit,
+		.dev_private_size = sizeof(struct e1000_adapter),
+	},
 };
 
 static void
diff --git a/drivers/net/ena/ena_ethdev.c b/drivers/net/ena/ena_ethdev.c
index dcee8ed0..89f6bd6d 100644
--- a/drivers/net/ena/ena_ethdev.c
+++ b/drivers/net/ena/ena_ethdev.c
@@ -1705,15 +1705,17 @@ static uint16_t eth_ena_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
 	return sent_idx;
 }
 
-static struct eth_driver rte_ena_pmd = {
+static struct rte_pci_eth_driver rte_ena_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_ena_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_ena_dev_init,
-	.dev_private_size = sizeof(struct ena_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_ena_dev_init,
+		.dev_private_size = sizeof(struct ena_adapter),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_ena, rte_ena_pmd.pci_drv);
diff --git a/drivers/net/enic/enic_ethdev.c b/drivers/net/enic/enic_ethdev.c
index e5ceb98e..b47975d1 100644
--- a/drivers/net/enic/enic_ethdev.c
+++ b/drivers/net/enic/enic_ethdev.c
@@ -633,15 +633,17 @@ static int eth_enicpmd_dev_init(struct rte_eth_dev *eth_dev)
 	return enic_probe(enic);
 }
 
-static struct eth_driver rte_enic_pmd = {
+static struct rte_pci_eth_driver rte_enic_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_enic_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_enicpmd_dev_init,
-	.dev_private_size = sizeof(struct enic),
+	.eth_drv = {
+		.eth_dev_init = eth_enicpmd_dev_init,
+		.dev_private_size = sizeof(struct enic),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_enic, rte_enic_pmd.pci_drv);
diff --git a/drivers/net/fm10k/fm10k_ethdev.c b/drivers/net/fm10k/fm10k_ethdev.c
index b8257e46..cbd27cbc 100644
--- a/drivers/net/fm10k/fm10k_ethdev.c
+++ b/drivers/net/fm10k/fm10k_ethdev.c
@@ -3069,7 +3069,7 @@ static const struct rte_pci_id pci_id_fm10k_map[] = {
 	{ .vendor_id = 0, /* sentinel */ },
 };
 
-static struct eth_driver rte_pmd_fm10k = {
+static struct rte_pci_eth_driver rte_pmd_fm10k = {
 	.pci_drv = {
 		.id_table = pci_id_fm10k_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC |
@@ -3077,9 +3077,11 @@ static struct eth_driver rte_pmd_fm10k = {
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_fm10k_dev_init,
-	.eth_dev_uninit = eth_fm10k_dev_uninit,
-	.dev_private_size = sizeof(struct fm10k_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_fm10k_dev_init,
+		.eth_dev_uninit = eth_fm10k_dev_uninit,
+		.dev_private_size = sizeof(struct fm10k_adapter),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_fm10k, rte_pmd_fm10k.pci_drv);
diff --git a/drivers/net/i40e/i40e_ethdev.c b/drivers/net/i40e/i40e_ethdev.c
index 8f63044b..efb641b7 100644
--- a/drivers/net/i40e/i40e_ethdev.c
+++ b/drivers/net/i40e/i40e_ethdev.c
@@ -668,7 +668,7 @@ static const struct rte_i40e_xstats_name_off rte_i40e_txq_prio_strings[] = {
 #define I40E_NB_TXQ_PRIO_XSTATS (sizeof(rte_i40e_txq_prio_strings) / \
 		sizeof(rte_i40e_txq_prio_strings[0]))
 
-static struct eth_driver rte_i40e_pmd = {
+static struct rte_pci_eth_driver rte_i40e_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_i40e_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC |
@@ -676,9 +676,11 @@ static struct eth_driver rte_i40e_pmd = {
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_i40e_dev_init,
-	.eth_dev_uninit = eth_i40e_dev_uninit,
-	.dev_private_size = sizeof(struct i40e_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_i40e_dev_init,
+		.eth_dev_uninit = eth_i40e_dev_uninit,
+		.dev_private_size = sizeof(struct i40e_adapter),
+	},
 };
 
 static inline int
diff --git a/drivers/net/i40e/i40e_ethdev_vf.c b/drivers/net/i40e/i40e_ethdev_vf.c
index 0dc0af52..6dbcc88c 100644
--- a/drivers/net/i40e/i40e_ethdev_vf.c
+++ b/drivers/net/i40e/i40e_ethdev_vf.c
@@ -1526,16 +1526,18 @@ i40evf_dev_uninit(struct rte_eth_dev *eth_dev)
 /*
  * virtual function driver struct
  */
-static struct eth_driver rte_i40evf_pmd = {
+static struct rte_pci_eth_driver rte_i40evf_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_i40evf_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_DETACHABLE,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = i40evf_dev_init,
-	.eth_dev_uninit = i40evf_dev_uninit,
-	.dev_private_size = sizeof(struct i40e_adapter),
+	.eth_drv = {
+		.eth_dev_init = i40evf_dev_init,
+		.eth_dev_uninit = i40evf_dev_uninit,
+		.dev_private_size = sizeof(struct i40e_adapter),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_i40e_vf, rte_i40evf_pmd.pci_drv);
diff --git a/drivers/net/ixgbe/ixgbe_ethdev.c b/drivers/net/ixgbe/ixgbe_ethdev.c
index ec2edade..dbad48dc 100644
--- a/drivers/net/ixgbe/ixgbe_ethdev.c
+++ b/drivers/net/ixgbe/ixgbe_ethdev.c
@@ -1560,7 +1560,7 @@ eth_ixgbevf_dev_uninit(struct rte_eth_dev *eth_dev)
 	return 0;
 }
 
-static struct eth_driver rte_ixgbe_pmd = {
+static struct rte_pci_eth_driver rte_ixgbe_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_ixgbe_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC |
@@ -1568,24 +1568,28 @@ static struct eth_driver rte_ixgbe_pmd = {
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_ixgbe_dev_init,
-	.eth_dev_uninit = eth_ixgbe_dev_uninit,
-	.dev_private_size = sizeof(struct ixgbe_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_ixgbe_dev_init,
+		.eth_dev_uninit = eth_ixgbe_dev_uninit,
+		.dev_private_size = sizeof(struct ixgbe_adapter),
+	},
 };
 
 /*
  * virtual function driver struct
  */
-static struct eth_driver rte_ixgbevf_pmd = {
+static struct rte_pci_eth_driver rte_ixgbevf_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_ixgbevf_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_DETACHABLE,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_ixgbevf_dev_init,
-	.eth_dev_uninit = eth_ixgbevf_dev_uninit,
-	.dev_private_size = sizeof(struct ixgbe_adapter),
+	.eth_drv = {
+		.eth_dev_init = eth_ixgbevf_dev_init,
+		.eth_dev_uninit = eth_ixgbevf_dev_uninit,
+		.dev_private_size = sizeof(struct ixgbe_adapter),
+	},
 };
 
 static int
diff --git a/drivers/net/mlx4/mlx4.c b/drivers/net/mlx4/mlx4.c
index eb06f56a..7b184019 100644
--- a/drivers/net/mlx4/mlx4.c
+++ b/drivers/net/mlx4/mlx4.c
@@ -5524,7 +5524,7 @@ priv_dev_interrupt_handler_install(struct priv *priv, struct rte_eth_dev *dev)
 	}
 }
 
-static struct eth_driver mlx4_driver;
+static struct rte_pci_eth_driver mlx4_driver;
 
 /**
  * DPDK callback to register a PCI device.
@@ -5903,7 +5903,7 @@ static const struct rte_pci_id mlx4_pci_id_map[] = {
 	}
 };
 
-static struct eth_driver mlx4_driver = {
+static struct rte_pci_eth_driver mlx4_driver = {
 	.pci_drv = {
 		.driver = {
 			.name = MLX4_DRIVER_NAME
@@ -5912,7 +5912,9 @@ static struct eth_driver mlx4_driver = {
 		.probe = mlx4_pci_probe,
 		.drv_flags = RTE_PCI_DRV_INTR_LSC,
 	},
-	.dev_private_size = sizeof(struct priv)
+	.eth_drv = {
+		.dev_private_size = sizeof(struct priv),
+	},
 };
 
 /**
diff --git a/drivers/net/mlx5/mlx5.c b/drivers/net/mlx5/mlx5.c
index b97b6d16..efc0430c 100644
--- a/drivers/net/mlx5/mlx5.c
+++ b/drivers/net/mlx5/mlx5.c
@@ -338,7 +338,7 @@ mlx5_args(struct priv *priv, struct rte_devargs *devargs)
 	return 0;
 }
 
-static struct eth_driver mlx5_driver;
+static struct rte_pci_eth_driver mlx5_driver;
 
 /**
  * DPDK callback to register a PCI device.
@@ -723,7 +723,7 @@ static const struct rte_pci_id mlx5_pci_id_map[] = {
 	}
 };
 
-static struct eth_driver mlx5_driver = {
+static struct rte_pci_eth_driver mlx5_driver = {
 	.pci_drv = {
 		.driver = {
 			.name = MLX5_DRIVER_NAME
@@ -732,7 +732,9 @@ static struct eth_driver mlx5_driver = {
 		.probe = mlx5_pci_probe,
 		.drv_flags = RTE_PCI_DRV_INTR_LSC,
 	},
-	.dev_private_size = sizeof(struct priv)
+	.eth_drv = {
+		.dev_private_size = sizeof(struct priv),
+	},
 };
 
 /**
diff --git a/drivers/net/nfp/nfp_net.c b/drivers/net/nfp/nfp_net.c
index e85315f1..eb4006f9 100644
--- a/drivers/net/nfp/nfp_net.c
+++ b/drivers/net/nfp/nfp_net.c
@@ -2472,7 +2472,7 @@ static struct rte_pci_id pci_id_nfp_net_map[] = {
 	},
 };
 
-static struct eth_driver rte_nfp_net_pmd = {
+static struct rte_pci_eth_driver rte_nfp_net_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_nfp_net_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC |
@@ -2480,8 +2480,10 @@ static struct eth_driver rte_nfp_net_pmd = {
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = nfp_net_init,
-	.dev_private_size = sizeof(struct nfp_net_adapter),
+	.eth_drv = {
+		.eth_dev_init = nfp_net_init,
+		.dev_private_size = sizeof(struct nfp_net_adapter),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_nfp, rte_nfp_net_pmd.pci_drv);
diff --git a/drivers/net/qede/qede_ethdev.c b/drivers/net/qede/qede_ethdev.c
index e91e627c..beb10157 100644
--- a/drivers/net/qede/qede_ethdev.c
+++ b/drivers/net/qede/qede_ethdev.c
@@ -1641,30 +1641,32 @@ static struct rte_pci_id pci_id_qede_map[] = {
 	{.vendor_id = 0,}
 };
 
-static struct eth_driver rte_qedevf_pmd = {
+static struct rte_pci_eth_driver rte_qedevf_pmd = {
 	.pci_drv = {
-		    .id_table = pci_id_qedevf_map,
-		    .drv_flags =
-		    RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
-		    .probe = rte_eth_dev_pci_probe,
-		    .remove = rte_eth_dev_pci_remove,
-		   },
-	.eth_dev_init = qedevf_eth_dev_init,
-	.eth_dev_uninit = qedevf_eth_dev_uninit,
-	.dev_private_size = sizeof(struct qede_dev),
+		.id_table = pci_id_qedevf_map,
+		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
+		.probe = rte_eth_dev_pci_probe,
+		.remove = rte_eth_dev_pci_remove,
+	},
+	.eth_drv = {
+		.eth_dev_init = qedevf_eth_dev_init,
+		.eth_dev_uninit = qedevf_eth_dev_uninit,
+		.dev_private_size = sizeof(struct qede_dev),
+	},
 };
 
-static struct eth_driver rte_qede_pmd = {
+static struct rte_pci_eth_driver rte_qede_pmd = {
 	.pci_drv = {
-		    .id_table = pci_id_qede_map,
-		    .drv_flags =
-		    RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
-		    .probe = rte_eth_dev_pci_probe,
-		    .remove = rte_eth_dev_pci_remove,
-		   },
-	.eth_dev_init = qede_eth_dev_init,
-	.eth_dev_uninit = qede_eth_dev_uninit,
-	.dev_private_size = sizeof(struct qede_dev),
+		.id_table = pci_id_qede_map,
+		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
+		.probe = rte_eth_dev_pci_probe,
+		.remove = rte_eth_dev_pci_remove,
+	},
+	.eth_drv = {
+		.eth_dev_init = qede_eth_dev_init,
+		.eth_dev_uninit = qede_eth_dev_uninit,
+		.dev_private_size = sizeof(struct qede_dev),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_qede, rte_qede_pmd.pci_drv);
diff --git a/drivers/net/szedata2/rte_eth_szedata2.c b/drivers/net/szedata2/rte_eth_szedata2.c
index fe7a6b3b..b9054671 100644
--- a/drivers/net/szedata2/rte_eth_szedata2.c
+++ b/drivers/net/szedata2/rte_eth_szedata2.c
@@ -1587,15 +1587,17 @@ static const struct rte_pci_id rte_szedata2_pci_id_table[] = {
 	}
 };
 
-static struct eth_driver szedata2_eth_driver = {
+static struct rte_pci_eth_driver szedata2_eth_driver = {
 	.pci_drv = {
 		.id_table = rte_szedata2_pci_id_table,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init     = rte_szedata2_eth_dev_init,
-	.eth_dev_uninit   = rte_szedata2_eth_dev_uninit,
-	.dev_private_size = sizeof(struct pmd_internals),
+	.eth_drv = {
+		.eth_dev_init     = rte_szedata2_eth_dev_init,
+		.eth_dev_uninit   = rte_szedata2_eth_dev_uninit,
+		.dev_private_size = sizeof(struct pmd_internals),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(RTE_SZEDATA2_DRIVER_NAME, szedata2_eth_driver.pci_drv);
diff --git a/drivers/net/thunderx/nicvf_ethdev.c b/drivers/net/thunderx/nicvf_ethdev.c
index 10603197..f13fad90 100644
--- a/drivers/net/thunderx/nicvf_ethdev.c
+++ b/drivers/net/thunderx/nicvf_ethdev.c
@@ -2111,15 +2111,17 @@ static const struct rte_pci_id pci_id_nicvf_map[] = {
 	},
 };
 
-static struct eth_driver rte_nicvf_pmd = {
+static struct rte_pci_eth_driver rte_nicvf_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_nicvf_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_INTR_LSC,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = nicvf_eth_dev_init,
-	.dev_private_size = sizeof(struct nicvf),
+	.eth_drv = {
+		.eth_dev_init = nicvf_eth_dev_init,
+		.dev_private_size = sizeof(struct nicvf),
+	},
 };
 
 RTE_PMD_REGISTER_PCI(net_thunderx, rte_nicvf_pmd.pci_drv);
diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
index b11bee63..dc68dbb9 100644
--- a/drivers/net/virtio/virtio_ethdev.c
+++ b/drivers/net/virtio/virtio_ethdev.c
@@ -1375,7 +1375,7 @@ eth_virtio_dev_uninit(struct rte_eth_dev *eth_dev)
 	return 0;
 }
 
-static struct eth_driver rte_virtio_pmd = {
+static struct rte_pci_eth_driver rte_virtio_pmd = {
 	.pci_drv = {
 		.driver = {
 			.name = "net_virtio",
@@ -1385,9 +1385,11 @@ static struct eth_driver rte_virtio_pmd = {
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_virtio_dev_init,
-	.eth_dev_uninit = eth_virtio_dev_uninit,
-	.dev_private_size = sizeof(struct virtio_hw),
+	.eth_drv = {
+		.eth_dev_init = eth_virtio_dev_init,
+		.eth_dev_uninit = eth_virtio_dev_uninit,
+		.dev_private_size = sizeof(struct virtio_hw),
+	},
 };
 
 RTE_INIT(rte_virtio_pmd_init);
diff --git a/drivers/net/vmxnet3/vmxnet3_ethdev.c b/drivers/net/vmxnet3/vmxnet3_ethdev.c
index 9c4d93c1..422461e9 100644
--- a/drivers/net/vmxnet3/vmxnet3_ethdev.c
+++ b/drivers/net/vmxnet3/vmxnet3_ethdev.c
@@ -333,16 +333,18 @@ eth_vmxnet3_dev_uninit(struct rte_eth_dev *eth_dev)
 	return 0;
 }
 
-static struct eth_driver rte_vmxnet3_pmd = {
+static struct rte_pci_eth_driver rte_vmxnet3_pmd = {
 	.pci_drv = {
 		.id_table = pci_id_vmxnet3_map,
 		.drv_flags = RTE_PCI_DRV_NEED_MAPPING | RTE_PCI_DRV_DETACHABLE,
 		.probe = rte_eth_dev_pci_probe,
 		.remove = rte_eth_dev_pci_remove,
 	},
-	.eth_dev_init = eth_vmxnet3_dev_init,
-	.eth_dev_uninit = eth_vmxnet3_dev_uninit,
-	.dev_private_size = sizeof(struct vmxnet3_hw),
+	.eth_drv = {
+		.eth_dev_init = eth_vmxnet3_dev_init,
+		.eth_dev_uninit = eth_vmxnet3_dev_uninit,
+		.dev_private_size = sizeof(struct vmxnet3_hw),
+	},
 };
 
 static int
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 280f0db1..acc0e556 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -239,13 +239,14 @@ int
 rte_eth_dev_pci_probe(struct rte_pci_driver *pci_drv,
 		      struct rte_pci_device *pci_dev)
 {
-	struct eth_driver    *eth_drv;
+	const struct rte_pci_eth_driver *pci_eth_drv;
+	const struct eth_driver *eth_drv;
 	struct rte_eth_dev *eth_dev;
 	char ethdev_name[RTE_ETH_NAME_MAX_LEN];
-
 	int diag;
 
-	eth_drv = (struct eth_driver *)pci_drv;
+	pci_eth_drv = container_of(pci_drv, struct rte_pci_eth_driver, pci_drv);
+	eth_drv = &pci_eth_drv->eth_drv;
 
 	rte_eal_pci_device_name(&pci_dev->addr, ethdev_name,
 			sizeof(ethdev_name));
@@ -263,7 +264,7 @@ rte_eth_dev_pci_probe(struct rte_pci_driver *pci_drv,
 	}
 	eth_dev->device = &pci_dev->device;
 	eth_dev->intr_handle = &pci_dev->intr_handle;
-	eth_dev->driver = eth_drv;
+	eth_dev->driver = &pci_eth_drv->eth_drv;
 
 	/* Invoke PMD device initialization function */
 	diag = (*eth_drv->eth_dev_init)(eth_dev);
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 09dd744d..d310541a 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1851,25 +1851,31 @@ typedef int (*eth_dev_uninit_t)(struct rte_eth_dev *eth_dev);
  * @internal
  * The structure associated with a PMD Ethernet driver.
  *
- * Each Ethernet driver acts as a PCI driver and is represented by a generic
+ * Each Ethernet driver acts is represented by a generic
  * *eth_driver* structure that holds:
  *
- * - An *rte_pci_driver* structure (which must be the first field).
+ * - The *eth_dev_init* function invoked for each matching device.
  *
- * - The *eth_dev_init* function invoked for each matching PCI device.
- *
- * - The *eth_dev_uninit* function invoked for each matching PCI device.
+ * - The *eth_dev_uninit* function invoked for each matching device.
  *
  * - The size of the private data to allocate for each matching device.
  */
 struct eth_driver {
-	struct rte_pci_driver pci_drv;    /**< The PMD is also a PCI driver. */
 	eth_dev_init_t eth_dev_init;      /**< Device init function. */
 	eth_dev_uninit_t eth_dev_uninit;  /**< Device uninit function. */
 	unsigned int dev_private_size;    /**< Size of device private data. */
 };
 
 /**
+ * @internal
+ * The structure associated with a PMD PCI Ethernet driver.
+ */
+struct rte_pci_eth_driver {
+	struct rte_pci_driver	pci_drv;	/**< Underlying PCI driver. */
+	struct eth_driver	eth_drv;	/**< Ethernet driver. */
+};
+
+/**
  * Convert a numerical speed in Mbps to a bitmap flag that can be used in
  * the bitmap link_speeds of the struct rte_eth_conf
  *
-- 
2.11.0

^ permalink raw reply related

* [PATCH 6/6] eal: VMBUS infrastructure
From: Stephen Hemminger @ 2017-01-02 23:08 UTC (permalink / raw)
  To: dev; +Cc: Stephen Hemminger
In-Reply-To: <20170102230850.32610-1-sthemmin@microsoft.com>

Add support for VMBUS on Hyper-V/Azure. VMBUS is similar to PCI
but has different addressing and internal API's.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
---
 lib/librte_eal/common/Makefile              |   2 +-
 lib/librte_eal/common/eal_common_devargs.c  |   7 +
 lib/librte_eal/common/eal_common_options.c  |  38 ++
 lib/librte_eal/common/eal_internal_cfg.h    |   3 +-
 lib/librte_eal/common/eal_options.h         |   6 +
 lib/librte_eal/common/eal_private.h         |   5 +
 lib/librte_eal/common/include/rte_devargs.h |   8 +
 lib/librte_eal/common/include/rte_vmbus.h   | 249 ++++++++
 lib/librte_eal/linuxapp/eal/Makefile        |   6 +
 lib/librte_eal/linuxapp/eal/eal.c           |  11 +
 lib/librte_eal/linuxapp/eal/eal_vmbus.c     | 907 ++++++++++++++++++++++++++++
 lib/librte_ether/rte_ethdev.c               |  90 +++
 lib/librte_ether/rte_ethdev.h               |  25 +
 mk/rte.app.mk                               |   1 +
 14 files changed, 1356 insertions(+), 2 deletions(-)
 create mode 100644 lib/librte_eal/common/include/rte_vmbus.h
 create mode 100644 lib/librte_eal/linuxapp/eal/eal_vmbus.c

diff --git a/lib/librte_eal/common/Makefile b/lib/librte_eal/common/Makefile
index a92c984b..9254bae4 100644
--- a/lib/librte_eal/common/Makefile
+++ b/lib/librte_eal/common/Makefile
@@ -33,7 +33,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
 
 INC := rte_branch_prediction.h rte_common.h
 INC += rte_debug.h rte_eal.h rte_errno.h rte_launch.h rte_lcore.h
-INC += rte_log.h rte_memory.h rte_memzone.h rte_pci.h
+INC += rte_log.h rte_memory.h rte_memzone.h rte_pci.h rte_vmbus.h
 INC += rte_per_lcore.h rte_random.h
 INC += rte_tailq.h rte_interrupts.h rte_alarm.h
 INC += rte_string_fns.h rte_version.h
diff --git a/lib/librte_eal/common/eal_common_devargs.c b/lib/librte_eal/common/eal_common_devargs.c
index e403717b..934ca840 100644
--- a/lib/librte_eal/common/eal_common_devargs.c
+++ b/lib/librte_eal/common/eal_common_devargs.c
@@ -113,6 +113,13 @@ rte_eal_devargs_add(enum rte_devtype devtype, const char *devargs_str)
 			goto fail;
 
 		break;
+	case RTE_DEVTYPE_WHITELISTED_VMBUS:
+	case RTE_DEVTYPE_BLACKLISTED_VMBUS:
+#ifdef RTE_LIBRTE_HV_PMD
+		if (uuid_parse(buf, devargs->uuid) == 0)
+			break;
+#endif
+		goto fail;
 	}
 
 	free(buf);
diff --git a/lib/librte_eal/common/eal_common_options.c b/lib/librte_eal/common/eal_common_options.c
index 611e581c..89dc9de7 100644
--- a/lib/librte_eal/common/eal_common_options.c
+++ b/lib/librte_eal/common/eal_common_options.c
@@ -95,6 +95,11 @@ eal_long_options[] = {
 	{OPT_VFIO_INTR,         1, NULL, OPT_VFIO_INTR_NUM        },
 	{OPT_VMWARE_TSC_MAP,    0, NULL, OPT_VMWARE_TSC_MAP_NUM   },
 	{OPT_XEN_DOM0,          0, NULL, OPT_XEN_DOM0_NUM         },
+#ifdef RTE_LIBRTE_HV_PMD
+	{OPT_NO_VMBUS,          0, NULL, OPT_NO_VMBUS_NUM         },
+	{OPT_VMBUS_BLACKLIST,   1, NULL, OPT_VMBUS_BLACKLIST_NUM  },
+	{OPT_VMBUS_WHITELIST,   1, NULL, OPT_VMBUS_WHITELIST_NUM  },
+#endif
 	{0,                     0, NULL, 0                        }
 };
 
@@ -858,6 +863,21 @@ eal_parse_common_option(int opt, const char *optarg,
 		conf->no_pci = 1;
 		break;
 
+#ifdef RTE_LIBRTE_HV_PMD
+	case OPT_NO_VMBUS_NUM:
+		conf->no_vmbus = 1;
+		break;
+	case OPT_VMBUS_BLACKLIST_NUM:
+		if (rte_eal_devargs_add(RTE_DEVTYPE_BLACKLISTED_VMBUS,
+					optarg) < 0)
+			return -1;
+		break;
+	case OPT_VMBUS_WHITELIST_NUM:
+		if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_VMBUS,
+				optarg) < 0)
+			return -1;
+		break;
+#endif
 	case OPT_NO_HPET_NUM:
 		conf->no_hpet = 1;
 		break;
@@ -1017,6 +1037,14 @@ eal_check_common_options(struct internal_config *internal_cfg)
 		return -1;
 	}
 
+#ifdef RTE_LIBRTE_HV_PMD
+	if (rte_eal_devargs_type_count(RTE_DEVTYPE_WHITELISTED_VMBUS) != 0 &&
+		rte_eal_devargs_type_count(RTE_DEVTYPE_BLACKLISTED_VMBUS) != 0) {
+		RTE_LOG(ERR, EAL, "Options vmbus blacklist and whitelist "
+			"cannot be used at the same time\n");
+		return -1;
+	}
+#endif
 	return 0;
 }
 
@@ -1066,5 +1094,15 @@ eal_common_usage(void)
 	       "  --"OPT_NO_PCI"            Disable PCI\n"
 	       "  --"OPT_NO_HPET"           Disable HPET\n"
 	       "  --"OPT_NO_SHCONF"         No shared config (mmap'd files)\n"
+#ifdef RTE_LIBRTE_HV_PMD
+	       "  --"OPT_NO_VMBUS"          Disable VMBUS\n"
+	       "  --"OPT_VMBUS_BLACKLIST" Add a VMBUS device to black list.\n"
+	       "                      Prevent EAL from using this PCI device. The argument\n"
+	       "                      format is device UUID.\n"
+	       "  --"OPT_VMBUS_WHITELIST" Add a VMBUS device to white list.\n"
+	       "                      Only use the specified VMBUS devices. The argument format\n"
+	       "                      is device UUID This option can be present\n"
+	       "                      several times (once per device).\n"
+#endif
 	       "\n", RTE_MAX_LCORE);
 }
diff --git a/lib/librte_eal/common/eal_internal_cfg.h b/lib/librte_eal/common/eal_internal_cfg.h
index 5f1367eb..18271943 100644
--- a/lib/librte_eal/common/eal_internal_cfg.h
+++ b/lib/librte_eal/common/eal_internal_cfg.h
@@ -69,7 +69,8 @@ struct internal_config {
 	volatile unsigned no_pci;         /**< true to disable PCI */
 	volatile unsigned no_hpet;        /**< true to disable HPET */
 	volatile unsigned vmware_tsc_map; /**< true to use VMware TSC mapping
-										* instead of native TSC */
+					   * instead of native TSC */
+	volatile unsigned no_vmbus;       /**< true to disable VMBUS */
 	volatile unsigned no_shconf;      /**< true if there is no shared config */
 	volatile unsigned create_uio_dev; /**< true to create /dev/uioX devices */
 	volatile enum rte_proc_type_t process_type; /**< multi-process proc type */
diff --git a/lib/librte_eal/common/eal_options.h b/lib/librte_eal/common/eal_options.h
index a881c62e..156727e7 100644
--- a/lib/librte_eal/common/eal_options.h
+++ b/lib/librte_eal/common/eal_options.h
@@ -83,6 +83,12 @@ enum {
 	OPT_VMWARE_TSC_MAP_NUM,
 #define OPT_XEN_DOM0          "xen-dom0"
 	OPT_XEN_DOM0_NUM,
+#define OPT_NO_VMBUS          "no-vmbus"
+	OPT_NO_VMBUS_NUM,
+#define OPT_VMBUS_BLACKLIST   "vmbus-blacklist"
+	OPT_VMBUS_BLACKLIST_NUM,
+#define OPT_VMBUS_WHITELIST   "vmbus-whitelist"
+	OPT_VMBUS_WHITELIST_NUM,
 	OPT_LONG_MAX_NUM
 };
 
diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h
index 9e7d8f6b..c856c63e 100644
--- a/lib/librte_eal/common/eal_private.h
+++ b/lib/librte_eal/common/eal_private.h
@@ -210,6 +210,11 @@ int pci_uio_map_resource_by_index(struct rte_pci_device *dev, int res_idx,
 		struct mapped_pci_resource *uio_res, int map_idx);
 
 /**
+ * VMBUS related functions and structures
+ */
+int rte_eal_vmbus_init(void);
+
+/**
  * Init tail queues for non-EAL library structures. This is to allow
  * the rings, mempools, etc. lists to be shared among multiple processes
  *
diff --git a/lib/librte_eal/common/include/rte_devargs.h b/lib/librte_eal/common/include/rte_devargs.h
index 88120a1c..c079d289 100644
--- a/lib/librte_eal/common/include/rte_devargs.h
+++ b/lib/librte_eal/common/include/rte_devargs.h
@@ -51,6 +51,9 @@ extern "C" {
 #include <stdio.h>
 #include <sys/queue.h>
 #include <rte_pci.h>
+#ifdef RTE_LIBRTE_HV_PMD
+#include <uuid/uuid.h>
+#endif
 
 /**
  * Type of generic device
@@ -59,6 +62,8 @@ enum rte_devtype {
 	RTE_DEVTYPE_WHITELISTED_PCI,
 	RTE_DEVTYPE_BLACKLISTED_PCI,
 	RTE_DEVTYPE_VIRTUAL,
+	RTE_DEVTYPE_WHITELISTED_VMBUS,
+	RTE_DEVTYPE_BLACKLISTED_VMBUS,
 };
 
 /**
@@ -88,6 +93,9 @@ struct rte_devargs {
 			/** Driver name. */
 			char drv_name[32];
 		} virt;
+#ifdef RTE_LIBRTE_HV_PMD
+		uuid_t uuid;
+#endif
 	};
 	/** Arguments string as given by user or "" for no argument. */
 	char *args;
diff --git a/lib/librte_eal/common/include/rte_vmbus.h b/lib/librte_eal/common/include/rte_vmbus.h
new file mode 100644
index 00000000..f96d753e
--- /dev/null
+++ b/lib/librte_eal/common/include/rte_vmbus.h
@@ -0,0 +1,249 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2013-2016 Brocade Communications Systems, Inc.
+ *   Copyright(c) 2016 Microsoft Corporation
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef _RTE_VMBUS_H_
+#define _RTE_VMBUS_H_
+
+/**
+ * @file
+ *
+ * RTE VMBUS Interface
+ */
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <errno.h>
+#include <uuid/uuid.h>
+#include <sys/queue.h>
+#include <stdint.h>
+#include <inttypes.h>
+
+#include <rte_debug.h>
+#include <rte_interrupts.h>
+#include <rte_dev.h>
+
+TAILQ_HEAD(vmbus_device_list, rte_vmbus_device);
+TAILQ_HEAD(vmbus_driver_list, rte_vmbus_driver);
+
+extern struct vmbus_driver_list vmbus_driver_list;
+extern struct vmbus_device_list vmbus_device_list;
+
+/** Pathname of VMBUS devices directory. */
+#define SYSFS_VMBUS_DEVICES "/sys/bus/vmbus/devices"
+
+#define UUID_BUF_SZ	(36 + 1)
+
+
+/** Maximum number of VMBUS resources. */
+#define VMBUS_MAX_RESOURCE 7
+
+/**
+ * A structure describing a VMBUS device.
+ */
+struct rte_vmbus_device {
+	TAILQ_ENTRY(rte_vmbus_device) next;     /**< Next probed VMBUS device. */
+	struct rte_device device;               /**< Inherit core device */
+	uuid_t device_id;			/**< VMBUS device id */
+	uuid_t class_id;			/**< VMBUS device type */
+	uint32_t relid;				/**< VMBUS id for notification */
+	uint8_t	monitor_id;
+	struct rte_intr_handle intr_handle;     /**< Interrupt handle */
+	const struct rte_vmbus_driver *driver;  /**< Associated driver */
+
+	struct rte_mem_resource mem_resource[VMBUS_MAX_RESOURCE];
+						/**< VMBUS Memory Resource */
+	char sysfs_name[];			/**< Name in sysfs bus directory */
+};
+
+struct rte_vmbus_driver;
+
+/**
+ * Initialisation function for the driver called during VMBUS probing.
+ */
+typedef int (vmbus_probe_t)(struct rte_vmbus_driver *,
+			    struct rte_vmbus_device *);
+
+/**
+ * Uninitialisation function for the driver called during hotplugging.
+ */
+typedef int (vmbus_remove_t)(struct rte_vmbus_device *);
+
+/**
+ * A structure describing a VMBUS driver.
+ */
+struct rte_vmbus_driver {
+	TAILQ_ENTRY(rte_vmbus_driver) next;     /**< Next in list. */
+	struct rte_driver driver;
+	vmbus_probe_t *probe;                   /**< Device Probe function. */
+	vmbus_remove_t *remove;                 /**< Device Remove function. */
+
+	const uuid_t *id_table;			/**< ID table. */
+};
+
+struct vmbus_map {
+	void *addr;
+	char *path;
+	uint64_t offset;
+	uint64_t size;
+	uint64_t phaddr;
+};
+
+/*
+ * For multi-process we need to reproduce all vmbus mappings in secondary
+ * processes, so save them in a tailq.
+ */
+struct mapped_vmbus_resource {
+	TAILQ_ENTRY(mapped_vmbus_resource) next;
+
+	uuid_t uuid;
+	char path[PATH_MAX];
+	int nb_maps;
+	struct vmbus_map maps[VMBUS_MAX_RESOURCE];
+};
+
+TAILQ_HEAD(mapped_vmbus_res_list, mapped_vmbus_resource);
+
+/**
+ * Scan the content of the VMBUS bus, and the devices in the devices list
+ *
+ * @return
+ *  0 on success, negative on error
+ */
+int rte_eal_vmbus_scan(void);
+
+/**
+ * Probe the VMBUS bus for registered drivers.
+ *
+ * Scan the content of the VMBUS bus, and call the probe() function for
+ * all registered drivers that have a matching entry in its id_table
+ * for discovered devices.
+ *
+ * @return
+ *   - 0 on success.
+ *   - Negative on error.
+ */
+int rte_eal_vmbus_probe(void);
+
+/**
+ * Map the VMBUS device resources in user space virtual memory address
+ *
+ * @param dev
+ *   A pointer to a rte_vmbus_device structure describing the device
+ *   to use
+ *
+ * @return
+ *   0 on success, negative on error and positive if no driver
+ *   is found for the device.
+ */
+int rte_eal_vmbus_map_device(struct rte_vmbus_device *dev);
+
+/**
+ * Unmap this device
+ *
+ * @param dev
+ *   A pointer to a rte_vmbus_device structure describing the device
+ *   to use
+ */
+void rte_eal_vmbus_unmap_device(struct rte_vmbus_device *dev);
+
+/**
+ * Probe the single VMBUS device.
+ *
+ * Scan the content of the VMBUS bus, and find the vmbus device
+ * specified by device uuid, then call the probe() function for
+ * registered driver that has a matching entry in its id_table for
+ * discovered device.
+ *
+ * @param id
+ *   The VMBUS device uuid.
+ * @return
+ *   - 0 on success.
+ *   - Negative on error.
+ */
+int rte_eal_vmbus_probe_one(uuid_t id);
+
+/**
+ * Close the single VMBUS device.
+ *
+ * Scan the content of the VMBUS bus, and find the vmbus device id,
+ * then call the remove() function for registered driver that has a
+ * matching entry in its id_table for discovered device.
+ *
+ * @param id
+ *   The VMBUS device uuid.
+ * @return
+ *   - 0 on success.
+ *   - Negative on error.
+ */
+int rte_eal_vmbus_detach(uuid_t id);
+
+/**
+ * Register a VMBUS driver.
+ *
+ * @param driver
+ *   A pointer to a rte_vmbus_driver structure describing the driver
+ *   to be registered.
+ */
+void rte_eal_vmbus_register(struct rte_vmbus_driver *driver);
+
+/** Helper for VMBUS device registration from driver nstance */
+#define RTE_PMD_REGISTER_VMBUS(nm, vmbus_drv) \
+RTE_INIT(vmbusinitfn_ ##nm); \
+static void vmbusinitfn_ ##nm(void) \
+{\
+	(vmbus_drv).driver.name = RTE_STR(nm);\
+	(vmbus_drv).driver.type = PMD_VMBUS; \
+	rte_eal_vmbus_register(&vmbus_drv); \
+} \
+RTE_PMD_EXPORT_NAME(nm, __COUNTER__)
+
+/**
+ * Unregister a VMBUS driver.
+ *
+ * @param driver
+ *   A pointer to a rte_vmbus_driver structure describing the driver
+ *   to be unregistered.
+ */
+void rte_eal_vmbus_unregister(struct rte_vmbus_driver *driver);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_VMBUS_H_ */
diff --git a/lib/librte_eal/linuxapp/eal/Makefile b/lib/librte_eal/linuxapp/eal/Makefile
index 4e206f09..f6ca3848 100644
--- a/lib/librte_eal/linuxapp/eal/Makefile
+++ b/lib/librte_eal/linuxapp/eal/Makefile
@@ -71,6 +71,11 @@ SRCS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += eal_timer.c
 SRCS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += eal_interrupts.c
 SRCS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += eal_alarm.c
 
+ifeq ($(CONFIG_RTE_LIBRTE_HV_PMD),y)
+SRCS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += eal_vmbus.c
+LDLIBS += -luuid
+endif
+
 # from common dir
 SRCS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += eal_common_lcore.c
 SRCS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += eal_common_timer.c
@@ -114,6 +119,7 @@ CFLAGS_eal_hugepage_info.o := -D_GNU_SOURCE
 CFLAGS_eal_pci.o := -D_GNU_SOURCE
 CFLAGS_eal_pci_uio.o := -D_GNU_SOURCE
 CFLAGS_eal_pci_vfio.o := -D_GNU_SOURCE
+CFLAGS_eal_vmbux.o := -D_GNU_SOURCE
 CFLAGS_eal_common_whitelist.o := -D_GNU_SOURCE
 CFLAGS_eal_common_options.o := -D_GNU_SOURCE
 CFLAGS_eal_common_thread.o := -D_GNU_SOURCE
diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
index 16dd5b9c..1eec963f 100644
--- a/lib/librte_eal/linuxapp/eal/eal.c
+++ b/lib/librte_eal/linuxapp/eal/eal.c
@@ -70,6 +70,7 @@
 #include <rte_cpuflags.h>
 #include <rte_interrupts.h>
 #include <rte_pci.h>
+#include <rte_vmbus.h>
 #include <rte_dev.h>
 #include <rte_devargs.h>
 #include <rte_common.h>
@@ -830,6 +831,11 @@ rte_eal_init(int argc, char **argv)
 
 	eal_check_mem_on_local_socket();
 
+#ifdef RTE_LIBRTE_HV_PMD
+	if (rte_eal_vmbus_init() < 0)
+		RTE_LOG(ERR, EAL, "Cannot init VMBUS\n");
+#endif
+
 	if (eal_plugins_init() < 0)
 		rte_panic("Cannot init plugins\n");
 
@@ -884,6 +890,11 @@ rte_eal_init(int argc, char **argv)
 	if (rte_eal_pci_probe())
 		rte_panic("Cannot probe PCI\n");
 
+#ifdef RTE_LIBRTE_HV_PMD
+	if (rte_eal_vmbus_probe() < 0)
+		rte_panic("Cannot probe VMBUS\n");
+#endif
+
 	if (rte_eal_dev_init() < 0)
 		rte_panic("Cannot init pmd devices\n");
 
diff --git a/lib/librte_eal/linuxapp/eal/eal_vmbus.c b/lib/librte_eal/linuxapp/eal/eal_vmbus.c
new file mode 100644
index 00000000..f298e04e
--- /dev/null
+++ b/lib/librte_eal/linuxapp/eal/eal_vmbus.c
@@ -0,0 +1,907 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2013-2016 Brocade Communications Systems, Inc.
+ *   Copyright(c) 2016 Microsoft Corporation
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *	 notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *	 notice, this list of conditions and the following disclaimer in
+ *	 the documentation and/or other materials provided with the
+ *	 distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *	 contributors may be used to endorse or promote products derived
+ *	 from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+
+#include <rte_eal.h>
+#include <rte_tailq.h>
+#include <rte_log.h>
+#include <rte_devargs.h>
+#include <rte_vmbus.h>
+#include <rte_malloc.h>
+
+#include "eal_private.h"
+#include "eal_pci_init.h"
+#include "eal_filesystem.h"
+
+struct vmbus_driver_list vmbus_driver_list =
+	TAILQ_HEAD_INITIALIZER(vmbus_driver_list);
+struct vmbus_device_list vmbus_device_list =
+	TAILQ_HEAD_INITIALIZER(vmbus_device_list);
+
+static void *vmbus_map_addr;
+
+static struct rte_tailq_elem rte_vmbus_uio_tailq = {
+	.name = "UIO_RESOURCE_LIST",
+};
+EAL_REGISTER_TAILQ(rte_vmbus_uio_tailq);
+
+/*
+ * parse a sysfs file containing one integer value
+ * different to the eal version, as it needs to work with 64-bit values
+ */
+static int
+vmbus_get_sysfs_uuid(const char *filename, uuid_t uu)
+{
+	char buf[BUFSIZ];
+	char *cp = NULL;
+	FILE *f;
+
+	f = fopen(filename, "r");
+	if (f == NULL) {
+		RTE_LOG(ERR, EAL, "%s(): cannot open sysfs value %s\n",
+				__func__, filename);
+		return -1;
+	}
+
+	if (fgets(buf, sizeof(buf), f) == NULL) {
+		RTE_LOG(ERR, EAL, "%s(): cannot read sysfs value %s\n",
+				__func__, filename);
+		fclose(f);
+		return -1;
+	}
+	fclose(f);
+
+	cp = strchr(cp, '\n');
+	if (cp)
+		*cp = '\0';
+
+	/* strip { } notation */
+	if (buf[0] == '{' && (cp = strchr(buf, '}')))
+		*cp = '\0';
+
+	if (uuid_parse(buf, uu) < 0) {
+		RTE_LOG(ERR, EAL, "%s %s not a valid UUID\n",
+			filename, buf);
+		return -1;
+	}
+
+	return 0;
+}
+
+/* map a particular resource from a file */
+static void *
+vmbus_map_resource(void *requested_addr, int fd, off_t offset, size_t size,
+		   int flags)
+{
+	void *mapaddr;
+
+	/* Map the memory resource of device */
+	mapaddr = mmap(requested_addr, size, PROT_READ | PROT_WRITE,
+		       MAP_SHARED | flags, fd, offset);
+	if (mapaddr == MAP_FAILED ||
+	    (requested_addr != NULL && mapaddr != requested_addr)) {
+		RTE_LOG(ERR, EAL,
+			"%s(): cannot mmap(%d, %p, 0x%lx, 0x%lx): %s)\n",
+			__func__, fd, requested_addr,
+			(unsigned long)size, (unsigned long)offset,
+			strerror(errno));
+	} else
+		RTE_LOG(DEBUG, EAL, "  VMBUS memory mapped at %p\n", mapaddr);
+
+	return mapaddr;
+}
+
+/* unmap a particular resource */
+static void
+vmbus_unmap_resource(void *requested_addr, size_t size)
+{
+	if (requested_addr == NULL)
+		return;
+
+	/* Unmap the VMBUS memory resource of device */
+	if (munmap(requested_addr, size)) {
+		RTE_LOG(ERR, EAL, "%s(): cannot munmap(%p, 0x%lx): %s\n",
+			__func__, requested_addr, (unsigned long)size,
+			strerror(errno));
+	} else
+		RTE_LOG(DEBUG, EAL, "  VMBUS memory unmapped at %p\n",
+				requested_addr);
+}
+
+/* Only supports current kernel version
+ * Unlike PCI there is no option (or need) to create UIO device.
+ */
+static int vmbus_get_uio_dev(const char *name,
+			     char *dstbuf, size_t buflen)
+{
+	char dirname[PATH_MAX];
+	unsigned int uio_num;
+	struct dirent *e;
+	DIR *dir;
+
+	snprintf(dirname, sizeof(dirname),
+		 "/sys/bus/vmbus/devices/%s/uio", name);
+
+	dir = opendir(dirname);
+	if (dir == NULL) {
+		RTE_LOG(ERR, EAL, "Cannot map uio resources for %s: %s\n",
+			name, strerror(errno));
+		return -1;
+	}
+
+	/* take the first file starting with "uio" */
+	while ((e = readdir(dir)) != NULL) {
+		if (sscanf(e->d_name, "uio%u", &uio_num) != 1)
+			continue;
+
+		snprintf(dstbuf, buflen, "%s/uio%u", dirname, uio_num);
+		break;
+	}
+	closedir(dir);
+
+	return e ? (int) uio_num : -1;
+}
+
+/*
+ * parse a sysfs file containing one integer value
+ * different to the eal version, as it needs to work with 64-bit values
+ */
+static int
+vmbus_parse_sysfs_value(const char *dir, const char *name,
+			uint64_t *val)
+{
+	char filename[PATH_MAX];
+	FILE *f;
+	char buf[BUFSIZ];
+	char *end = NULL;
+
+	snprintf(filename, sizeof(filename), "%s/%s", dir, name);
+	f = fopen(filename, "r");
+	if (f == NULL) {
+		RTE_LOG(ERR, EAL, "%s(): cannot open sysfs value %s\n",
+				__func__, filename);
+		return -1;
+	}
+
+	if (fgets(buf, sizeof(buf), f) == NULL) {
+		RTE_LOG(ERR, EAL, "%s(): cannot read sysfs value %s\n",
+				__func__, filename);
+		fclose(f);
+		return -1;
+	}
+	fclose(f);
+
+	*val = strtoull(buf, &end, 0);
+	if ((buf[0] == '\0') || (end == NULL) || (*end != '\n')) {
+		RTE_LOG(ERR, EAL, "%s(): cannot parse sysfs value %s\n",
+				__func__, filename);
+		return -1;
+	}
+	return 0;
+}
+
+/* Get mappings out of values provided by uio */
+static int
+vmbus_uio_get_mappings(const char *uioname,
+		       struct vmbus_map maps[])
+{
+	int i;
+
+	for (i = 0; i != VMBUS_MAX_RESOURCE; i++) {
+		struct vmbus_map *map = &maps[i];
+		char dirname[PATH_MAX];
+
+		/* check if map directory exists */
+		snprintf(dirname, sizeof(dirname),
+			 "%s/maps/map%d", uioname, i);
+
+		if (access(dirname, F_OK) != 0)
+			break;
+
+		/* get mapping offset */
+		if (vmbus_parse_sysfs_value(dirname, "offset",
+					    &map->offset) < 0)
+			return -1;
+
+		/* get mapping size */
+		if (vmbus_parse_sysfs_value(dirname, "size",
+					    &map->size) < 0)
+			return -1;
+
+		/* get mapping physical address */
+		if (vmbus_parse_sysfs_value(dirname, "addr",
+					    &maps->phaddr) < 0)
+			return -1;
+	}
+
+	return i;
+}
+
+static void
+vmbus_uio_free_resource(struct rte_vmbus_device *dev,
+		struct mapped_vmbus_resource *uio_res)
+{
+	rte_free(uio_res);
+
+	if (dev->intr_handle.fd) {
+		close(dev->intr_handle.fd);
+		dev->intr_handle.fd = -1;
+		dev->intr_handle.type = RTE_INTR_HANDLE_UNKNOWN;
+	}
+}
+
+static struct mapped_vmbus_resource *
+vmbus_uio_alloc_resource(struct rte_vmbus_device *dev)
+{
+	struct mapped_vmbus_resource *uio_res;
+	char dirname[PATH_MAX], devname[PATH_MAX];
+	int uio_num, nb_maps;
+
+	uio_num = vmbus_get_uio_dev(dev->sysfs_name, dirname, sizeof(dirname));
+	if (uio_num < 0) {
+		RTE_LOG(WARNING, EAL,
+			"  %s not managed by UIO driver, skipping\n",
+			dev->sysfs_name);
+		return NULL;
+	}
+
+	/* allocate the mapping details for secondary processes*/
+	uio_res = rte_zmalloc("UIO_RES", sizeof(*uio_res), 0);
+	if (uio_res == NULL) {
+		RTE_LOG(ERR, EAL,
+			"%s(): cannot store uio mmap details\n", __func__);
+		goto error;
+	}
+
+	snprintf(devname, sizeof(devname), "/dev/uio%u", uio_num);
+	dev->intr_handle.fd = open(devname, O_RDWR);
+	if (dev->intr_handle.fd < 0) {
+		RTE_LOG(ERR, EAL, "Cannot open %s: %s\n",
+			devname, strerror(errno));
+		goto error;
+	}
+
+	dev->intr_handle.type = RTE_INTR_HANDLE_UIO_INTX;
+
+	snprintf(uio_res->path, sizeof(uio_res->path), "%s", devname);
+	uuid_copy(uio_res->uuid, dev->device_id);
+
+	nb_maps = vmbus_uio_get_mappings(dirname, uio_res->maps);
+	if (nb_maps < 0)
+		goto error;
+
+	RTE_LOG(DEBUG, EAL, "Found %d memory maps for device %s\n",
+		nb_maps, dev->sysfs_name);
+
+	return uio_res;
+
+ error:
+	vmbus_uio_free_resource(dev, uio_res);
+	return NULL;
+}
+
+static int
+vmbus_uio_map_resource_by_index(struct rte_vmbus_device *dev,
+				unsigned int res_idx,
+				struct mapped_vmbus_resource *uio_res,
+				unsigned int map_idx)
+{
+	struct vmbus_map *maps = uio_res->maps;
+	char devname[PATH_MAX];
+	void *mapaddr;
+	int fd;
+
+	snprintf(devname, sizeof(devname),
+		 "/sys/bus/vmbus/%s/resource%u", dev->sysfs_name, res_idx);
+
+	fd = open(devname, O_RDWR);
+	if (fd < 0) {
+		RTE_LOG(ERR, EAL, "Cannot open %s: %s\n",
+				devname, strerror(errno));
+		return -1;
+	}
+
+	/* allocate memory to keep path */
+	maps[map_idx].path = rte_malloc(NULL, strlen(devname) + 1, 0);
+	if (maps[map_idx].path == NULL) {
+		RTE_LOG(ERR, EAL, "Cannot allocate memory for path: %s\n",
+				strerror(errno));
+		return -1;
+	}
+
+	/* try mapping somewhere close to the end of hugepages */
+	if (vmbus_map_addr == NULL)
+		vmbus_map_addr = pci_find_max_end_va();
+
+	mapaddr = vmbus_map_resource(vmbus_map_addr, fd, 0,
+				     dev->mem_resource[res_idx].len, 0);
+	close(fd);
+	if (mapaddr == MAP_FAILED) {
+		rte_free(maps[map_idx].path);
+		return -1;
+	}
+
+	vmbus_map_addr = RTE_PTR_ADD(mapaddr,
+				     dev->mem_resource[res_idx].len);
+
+	maps[map_idx].phaddr = dev->mem_resource[res_idx].phys_addr;
+	maps[map_idx].size = dev->mem_resource[res_idx].len;
+	maps[map_idx].addr = mapaddr;
+	maps[map_idx].offset = 0;
+	strcpy(maps[map_idx].path, devname);
+	dev->mem_resource[res_idx].addr = mapaddr;
+
+	return 0;
+}
+
+static void
+vmbus_uio_unmap(struct mapped_vmbus_resource *uio_res)
+{
+	int i;
+
+	if (uio_res == NULL)
+		return;
+
+	for (i = 0; i != uio_res->nb_maps; i++) {
+		vmbus_unmap_resource(uio_res->maps[i].addr,
+				     uio_res->maps[i].size);
+
+		if (rte_eal_process_type() == RTE_PROC_PRIMARY)
+			rte_free(uio_res->maps[i].path);
+	}
+}
+
+static struct mapped_vmbus_resource *
+vmbus_uio_find_resource(struct rte_vmbus_device *dev)
+{
+	struct mapped_vmbus_resource *uio_res;
+	struct mapped_vmbus_res_list *uio_res_list =
+			RTE_TAILQ_CAST(rte_vmbus_uio_tailq.head,
+				       mapped_vmbus_res_list);
+
+	if (dev == NULL)
+		return NULL;
+
+	TAILQ_FOREACH(uio_res, uio_res_list, next) {
+		if (uuid_compare(uio_res->uuid, dev->device_id) == 0)
+			return uio_res;
+	}
+	return NULL;
+}
+
+/* unmap the VMBUS resource of a VMBUS device in virtual memory */
+static void
+vmbus_uio_unmap_resource(struct rte_vmbus_device *dev)
+{
+	struct mapped_vmbus_resource *uio_res;
+	struct mapped_vmbus_res_list *uio_res_list =
+			RTE_TAILQ_CAST(rte_vmbus_uio_tailq.head,
+				       mapped_vmbus_res_list);
+
+	if (dev == NULL)
+		return;
+
+	/* find an entry for the device */
+	uio_res = vmbus_uio_find_resource(dev);
+	if (uio_res == NULL)
+		return;
+
+	/* secondary processes - just free maps */
+	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+		return vmbus_uio_unmap(uio_res);
+
+	TAILQ_REMOVE(uio_res_list, uio_res, next);
+
+	/* unmap all resources */
+	vmbus_uio_unmap(uio_res);
+
+	/* free uio resource */
+	rte_free(uio_res);
+
+	/* close fd if in primary process */
+	close(dev->intr_handle.fd);
+	if (dev->intr_handle.uio_cfg_fd >= 0) {
+		close(dev->intr_handle.uio_cfg_fd);
+		dev->intr_handle.uio_cfg_fd = -1;
+	}
+
+	dev->intr_handle.fd = -1;
+	dev->intr_handle.type = RTE_INTR_HANDLE_UNKNOWN;
+}
+
+static int
+vmbus_uio_map_secondary(struct rte_vmbus_device *dev)
+{
+	struct mapped_vmbus_resource *uio_res;
+	struct mapped_vmbus_res_list *uio_res_list =
+			RTE_TAILQ_CAST(rte_vmbus_uio_tailq.head,
+				       mapped_vmbus_res_list);
+
+	TAILQ_FOREACH(uio_res, uio_res_list, next) {
+		int i;
+
+		/* skip this element if it doesn't match our id */
+		if (uuid_compare(uio_res->uuid, dev->device_id))
+			continue;
+
+		for (i = 0; i != uio_res->nb_maps; i++) {
+			void *mapaddr;
+			int fd;
+
+			fd = open(uio_res->maps[i].path, O_RDWR);
+			if (fd < 0) {
+				RTE_LOG(ERR, EAL, "Cannot open %s: %s\n",
+					uio_res->maps[i].path, strerror(errno));
+				return -1;
+			}
+
+			mapaddr = vmbus_map_resource(uio_res->maps[i].addr, fd,
+						     uio_res->maps[i].offset,
+						     uio_res->maps[i].size, 0);
+			/* fd is not needed in slave process, close it */
+			close(fd);
+
+			if (mapaddr == uio_res->maps[i].addr)
+				continue;
+
+			RTE_LOG(ERR, EAL,
+				"Cannot mmap device resource file %s to address: %p\n",
+				uio_res->maps[i].path,
+				uio_res->maps[i].addr);
+
+			/* unmap addrs correctly mapped */
+			while (i != 0) {
+				--i;
+				vmbus_unmap_resource(uio_res->maps[i].addr,
+						     uio_res->maps[i].size);
+			}
+			return -1;
+
+		}
+		return 0;
+	}
+
+	RTE_LOG(ERR, EAL, "Cannot find resource for device\n");
+	return 1;
+}
+
+/* map the resources of a vmbus device in virtual memory */
+int
+rte_eal_vmbus_map_device(struct rte_vmbus_device *dev)
+{
+	struct mapped_vmbus_resource *uio_res;
+	struct mapped_vmbus_res_list *uio_res_list =
+		RTE_TAILQ_CAST(rte_vmbus_uio_tailq.head, mapped_vmbus_res_list);
+	int i, ret, map_idx = 0;
+
+	dev->intr_handle.fd = -1;
+	dev->intr_handle.uio_cfg_fd = -1;
+	dev->intr_handle.type = RTE_INTR_HANDLE_UNKNOWN;
+
+	/* secondary processes - use already recorded details */
+	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+		return vmbus_uio_map_secondary(dev);
+
+	/* allocate uio resource */
+	uio_res = vmbus_uio_alloc_resource(dev);
+	if (uio_res == NULL)
+		return -1;
+
+	/* Map all BARs */
+	for (i = 0; i != VMBUS_MAX_RESOURCE; i++) {
+		uint64_t phaddr;
+
+		/* skip empty BAR */
+		phaddr = dev->mem_resource[i].phys_addr;
+		if (phaddr == 0)
+			continue;
+
+		ret = vmbus_uio_map_resource_by_index(dev, i,
+						      uio_res, map_idx);
+		if (ret)
+			goto error;
+
+		map_idx++;
+	}
+
+	uio_res->nb_maps = map_idx;
+
+	TAILQ_INSERT_TAIL(uio_res_list, uio_res, next);
+
+	return 0;
+error:
+	for (i = 0; i < map_idx; i++) {
+		vmbus_unmap_resource(uio_res->maps[i].addr,
+				     uio_res->maps[i].size);
+		rte_free(uio_res->maps[i].path);
+	}
+	vmbus_uio_free_resource(dev, uio_res);
+	return -1;
+}
+
+/* Scan one vmbus sysfs entry, and fill the devices list from it. */
+static int
+vmbus_scan_one(const char *name)
+{
+	struct rte_vmbus_device *dev, *dev2;
+	char filename[PATH_MAX];
+	char dirname[PATH_MAX];
+	unsigned long tmp;
+
+	dev = malloc(sizeof(*dev) + strlen(name) + 1);
+	if (dev == NULL)
+		return -1;
+
+	memset(dev, 0, sizeof(*dev));
+	strcpy(dev->sysfs_name, name);
+	if (dev->sysfs_name == NULL)
+		goto error;
+
+	/* sysfs base directory
+	 *   /sys/bus/vmbus/devices/7a08391f-f5a0-4ac0-9802-d13fd964f8df
+	 * or on older kernel
+	 *   /sys/bus/vmbus/devices/vmbus_1
+	 */
+	snprintf(dirname, sizeof(dirname), "%s/%s",
+		 SYSFS_VMBUS_DEVICES, name);
+
+	/* get device id */
+	snprintf(filename, sizeof(filename), "%s/device_id", dirname);
+	if (vmbus_get_sysfs_uuid(filename, dev->device_id) < 0)
+		goto error;
+
+	/* get device class  */
+	snprintf(filename, sizeof(filename), "%s/class_id", dirname);
+	if (vmbus_get_sysfs_uuid(filename, dev->class_id) < 0)
+		goto error;
+
+	/* get relid */
+	snprintf(filename, sizeof(filename), "%s/id", dirname);
+	if (eal_parse_sysfs_value(filename, &tmp) < 0)
+		goto error;
+	dev->relid = tmp;
+
+	/* get monitor id */
+	snprintf(filename, sizeof(filename), "%s/monitor_id", dirname);
+	if (eal_parse_sysfs_value(filename, &tmp) < 0)
+		goto error;
+	dev->monitor_id = tmp;
+
+	/* get numa node */
+	snprintf(filename, sizeof(filename), "%s/numa_node",
+		 dirname);
+	if (eal_parse_sysfs_value(filename, &tmp) < 0)
+		/* if no NUMA support, set default to 0 */
+		dev->device.numa_node = 0;
+	else
+		dev->device.numa_node = tmp;
+
+	/* device is valid, add in list (sorted) */
+	RTE_LOG(DEBUG, EAL, "Adding vmbus device %s\n", name);
+
+	TAILQ_FOREACH(dev2, &vmbus_device_list, next) {
+		int ret;
+
+		ret = uuid_compare(dev->device_id, dev->device_id);
+		if (ret > 0)
+			continue;
+
+		if (ret < 0) {
+			TAILQ_INSERT_BEFORE(dev2, dev, next);
+			rte_eal_device_insert(&dev->device);
+		} else { /* already registered */
+			memmove(dev2->mem_resource, dev->mem_resource,
+				sizeof(dev->mem_resource));
+			free(dev);
+		}
+		return 0;
+	}
+
+	rte_eal_device_insert(&dev->device);
+	TAILQ_INSERT_TAIL(&vmbus_device_list, dev, next);
+
+	return 0;
+error:
+	free(dev);
+	return -1;
+}
+
+/*
+ * Scan the content of the vmbus, and the devices in the devices list
+ */
+static int
+vmbus_scan(void)
+{
+	struct dirent *e;
+	DIR *dir;
+
+	dir = opendir(SYSFS_VMBUS_DEVICES);
+	if (dir == NULL) {
+		if (errno == ENOENT)
+			return 0;
+
+		RTE_LOG(ERR, EAL, "%s(): opendir failed: %s\n",
+			__func__, strerror(errno));
+		return -1;
+	}
+
+	while ((e = readdir(dir)) != NULL) {
+		if (e->d_name[0] == '.')
+			continue;
+
+		if (vmbus_scan_one(e->d_name) < 0)
+			goto error;
+	}
+	closedir(dir);
+	return 0;
+
+error:
+	closedir(dir);
+	return -1;
+}
+
+/* Init the VMBUS EAL subsystem */
+int rte_eal_vmbus_init(void)
+{
+	/* VMBUS can be disabled */
+	if (internal_config.no_vmbus)
+		return 0;
+
+	if (vmbus_scan() < 0) {
+		RTE_LOG(ERR, EAL, "%s(): Cannot scan vmbus\n", __func__);
+		return -1;
+	}
+	return 0;
+}
+
+/* Below is PROBE part of eal_vmbus library */
+
+/*
+ * If device ID match, call the devinit() function of the driver.
+ */
+static int
+rte_eal_vmbus_probe_one_driver(struct rte_vmbus_driver *dr,
+			       struct rte_vmbus_device *dev)
+{
+	const uuid_t *id_table;
+
+	RTE_LOG(DEBUG, EAL, "  probe driver: %s\n", dr->driver.name);
+
+	for (id_table = dr->id_table; !uuid_is_null(*id_table); ++id_table) {
+		struct rte_devargs *args;
+		char guid[UUID_BUF_SZ];
+		int ret;
+
+		/* skip devices not assocaited with this device class */
+		if (uuid_compare(*id_table, dev->class_id) != 0)
+			continue;
+
+		uuid_unparse(dev->device_id, guid);
+		RTE_LOG(INFO, EAL, "VMBUS device %s on NUMA socket %i\n",
+			guid, dev->device.numa_node);
+
+		/* no initialization when blacklisted, return without error */
+		args = dev->device.devargs;
+		if (args && args->type == RTE_DEVTYPE_BLACKLISTED_VMBUS) {
+			RTE_LOG(INFO, EAL, "  Device is blacklisted, not initializing\n");
+			return 1;
+		}
+
+		RTE_LOG(INFO, EAL, "  probe driver: %s\n", dr->driver.name);
+
+		/* map resources for device */
+		ret = rte_eal_vmbus_map_device(dev);
+		if (ret != 0)
+			return ret;
+
+		/* reference driver structure */
+		dev->driver = dr;
+
+		/* call the driver probe() function */
+		ret = dr->probe(dr, dev);
+		if (ret)
+			dev->driver = NULL;
+
+		return ret;
+	}
+
+	/* return positive value if driver doesn't support this device */
+	return 1;
+}
+
+
+/*
+ * If vendor/device ID match, call the remove() function of the
+ * driver.
+ */
+static int
+vmbus_detach_dev(struct rte_vmbus_driver *dr,
+		 struct rte_vmbus_device *dev)
+{
+	const uuid_t *id_table;
+
+	for (id_table = dr->id_table; !uuid_is_null(*id_table); ++id_table) {
+		char guid[UUID_BUF_SZ];
+
+		/* skip devices not assocaited with this device class */
+		if (uuid_compare(*id_table, dev->class_id) != 0)
+			continue;
+
+		uuid_unparse(dev->device_id, guid);
+		RTE_LOG(INFO, EAL, "VMBUS device %s on NUMA socket %i\n",
+			guid, dev->device.numa_node);
+
+		RTE_LOG(DEBUG, EAL, "  remove driver: %s\n", dr->driver.name);
+
+		if (dr->remove && (dr->remove(dev) < 0))
+			return -1;	/* negative value is an error */
+
+		/* clear driver structure */
+		dev->driver = NULL;
+
+		vmbus_uio_unmap_resource(dev);
+		return 0;
+	}
+
+	/* return positive value if driver doesn't support this device */
+	return 1;
+}
+
+/*
+ * call the devinit() function of all
+ * registered drivers for the vmbus device. Return -1 if no driver is
+ * found for this class of vmbus device.
+ * The present assumption is that we have drivers only for vmbus network
+ * devices. That's why we don't check driver's id_table now.
+ */
+static int
+vmbus_probe_all_drivers(struct rte_vmbus_device *dev)
+{
+	struct rte_vmbus_driver *dr = NULL;
+	int ret;
+
+	TAILQ_FOREACH(dr, &vmbus_driver_list, next) {
+		ret = rte_eal_vmbus_probe_one_driver(dr, dev);
+		if (ret < 0) {
+			/* negative value is an error */
+			RTE_LOG(ERR, EAL, "Failed to probe driver %s\n",
+				dr->driver.name);
+			return -1;
+		}
+		/* positive value means driver doesn't support it */
+		if (ret > 0)
+			continue;
+
+		return 0;
+	}
+
+	return 1;
+}
+
+
+/*
+ * If device ID matches, call the remove() function of all
+ * registered driver for the given device. Return -1 if initialization
+ * failed, return 1 if no driver is found for this device.
+ */
+static int
+vmbus_detach_all_drivers(struct rte_vmbus_device *dev)
+{
+	struct rte_vmbus_driver *dr;
+	int rc = 0;
+
+	if (dev == NULL)
+		return -1;
+
+	TAILQ_FOREACH(dr, &vmbus_driver_list, next) {
+		rc = vmbus_detach_dev(dr, dev);
+		if (rc < 0)
+			/* negative value is an error */
+			return -1;
+		if (rc > 0)
+			/* positive value means driver doesn't support it */
+			continue;
+		return 0;
+	}
+	return 1;
+}
+
+/* Detach device specified by its VMBUS id */
+int
+rte_eal_vmbus_detach(uuid_t device_id)
+{
+	struct rte_vmbus_device *dev;
+	char ubuf[UUID_BUF_SZ];
+
+	TAILQ_FOREACH(dev, &vmbus_device_list, next) {
+		if (uuid_compare(dev->device_id, device_id) != 0)
+			continue;
+
+		if (vmbus_detach_all_drivers(dev) < 0)
+			goto err_return;
+
+		TAILQ_REMOVE(&vmbus_device_list, dev, next);
+		free(dev);
+		return 0;
+	}
+	return -1;
+
+err_return:
+	uuid_unparse(device_id, ubuf);
+	RTE_LOG(WARNING, EAL, "Requested device %s cannot be used\n",
+		ubuf);
+	return -1;
+}
+
+/*
+ * Scan the vmbus, and call the devinit() function for
+ * all registered drivers that have a matching entry in its id_table
+ * for discovered devices.
+ */
+int
+rte_eal_vmbus_probe(void)
+{
+	struct rte_vmbus_device *dev = NULL;
+
+	TAILQ_FOREACH(dev, &vmbus_device_list, next) {
+		char ubuf[UUID_BUF_SZ];
+
+		uuid_unparse(dev->device_id, ubuf);
+
+		RTE_LOG(DEBUG, EAL, "Probing driver for device %s ...\n",
+			ubuf);
+		vmbus_probe_all_drivers(dev);
+	}
+	return 0;
+}
+
+/* register vmbus driver */
+void
+rte_eal_vmbus_register(struct rte_vmbus_driver *driver)
+{
+	TAILQ_INSERT_TAIL(&vmbus_driver_list, driver, next);
+}
+
+/* unregister vmbus driver */
+void
+rte_eal_vmbus_unregister(struct rte_vmbus_driver *driver)
+{
+	TAILQ_REMOVE(&vmbus_driver_list, driver, next);
+}
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index acc0e556..92f296a0 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -3331,3 +3331,93 @@ rte_eth_dev_l2_tunnel_offload_set(uint8_t port_id,
 				-ENOTSUP);
 	return (*dev->dev_ops->l2_tunnel_offload_set)(dev, l2_tunnel, mask, en);
 }
+
+
+#ifdef RTE_LIBRTE_HV_PMD
+int
+rte_eth_dev_vmbus_probe(struct rte_vmbus_driver *vmbus_drv,
+			struct rte_vmbus_device *vmbus_dev)
+{
+	struct eth_driver  *eth_drv = (struct eth_driver *)vmbus_drv;
+	struct rte_eth_dev *eth_dev;
+	char ustr[UUID_BUF_SZ];
+	int diag;
+
+	uuid_unparse(vmbus_dev->device_id, ustr);
+
+	eth_dev = rte_eth_dev_allocate(ustr);
+	if (eth_dev == NULL)
+		return -ENOMEM;
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		eth_dev->data->dev_private = rte_zmalloc("ethdev private structure",
+				  eth_drv->dev_private_size,
+				  RTE_CACHE_LINE_SIZE);
+		if (eth_dev->data->dev_private == NULL)
+			rte_panic("Cannot allocate memzone for private port data\n");
+	}
+
+	eth_dev->device = &vmbus_dev->device;
+	eth_dev->driver = eth_drv;
+	eth_dev->data->rx_mbuf_alloc_failed = 0;
+
+	/* init user callbacks */
+	TAILQ_INIT(&(eth_dev->link_intr_cbs));
+
+	/*
+	 * Set the default maximum frame size.
+	 */
+	eth_dev->data->mtu = ETHER_MTU;
+
+	/* Invoke PMD device initialization function */
+	diag = (*eth_drv->eth_dev_init)(eth_dev);
+	if (diag == 0)
+		return 0;
+
+	RTE_PMD_DEBUG_TRACE("driver %s: eth_dev_init(%s) failed\n",
+			    vmbus_drv->driver.name, ustr);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY)
+		rte_free(eth_dev->data->dev_private);
+
+	return diag;
+}
+
+int
+rte_eth_dev_vmbus_remove(struct rte_vmbus_device *vmbus_dev)
+{
+	const struct eth_driver *eth_drv;
+	struct rte_eth_dev *eth_dev;
+	char ustr[UUID_BUF_SZ];
+	int ret;
+
+	if (vmbus_dev == NULL)
+		return -EINVAL;
+
+	uuid_unparse(vmbus_dev->device_id, ustr);
+	eth_dev = rte_eth_dev_allocated(ustr);
+	if (eth_dev == NULL)
+		return -ENODEV;
+
+	eth_drv = (const struct eth_driver *)vmbus_dev->driver;
+
+	/* Invoke PMD device uninit function */
+	if (*eth_drv->eth_dev_uninit) {
+		ret = (*eth_drv->eth_dev_uninit)(eth_dev);
+		if (ret)
+			return ret;
+	}
+
+	/* free ether device */
+	rte_eth_dev_release_port(eth_dev);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY)
+		rte_free(eth_dev->data->dev_private);
+
+	eth_dev->device = NULL;
+	eth_dev->driver = NULL;
+	eth_dev->data = NULL;
+
+	return 0;
+}
+#endif
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index d310541a..d5f06529 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -180,6 +180,7 @@ extern "C" {
 #include <rte_log.h>
 #include <rte_interrupts.h>
 #include <rte_pci.h>
+#include <rte_vmbus.h>
 #include <rte_dev.h>
 #include <rte_devargs.h>
 #include "rte_ether.h"
@@ -1876,6 +1877,15 @@ struct rte_pci_eth_driver {
 };
 
 /**
+ * @internal
+ * The structure associated with a PMD VMBUS Ethernet driver.
+ */
+struct rte_vmbus_eth_driver {
+	struct rte_vmbus_driver vmbus_drv;	/**< Underlying VMBUS driver. */
+	struct eth_driver	eth_drv;	/**< Ethernet driver. */
+};
+
+/**
  * Convert a numerical speed in Mbps to a bitmap flag that can be used in
  * the bitmap link_speeds of the struct rte_eth_conf
  *
@@ -4399,6 +4409,21 @@ int rte_eth_dev_pci_probe(struct rte_pci_driver *pci_drv,
  */
 int rte_eth_dev_pci_remove(struct rte_pci_device *pci_dev);
 
+/**
+ * @internal
+ * Wrapper for use by vmbus drivers as a .probe function to attach to a ethdev
+ * interface.
+ */
+int rte_eth_dev_vmbus_probe(struct rte_vmbus_driver *vmbus_drv,
+			  struct rte_vmbus_device *vmbus_dev);
+
+/**
+ * @internal
+ * Wrapper for use by vmbus drivers as a .remove function to detach a ethdev
+ * interface.
+ */
+int rte_eth_dev_vmbus_remove(struct rte_vmbus_device *vmbus_dev);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index f75f0e24..6b304084 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -130,6 +130,7 @@ ifeq ($(CONFIG_RTE_LIBRTE_VHOST),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_VHOST)      += -lrte_pmd_vhost
 endif # $(CONFIG_RTE_LIBRTE_VHOST)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_VMXNET3_PMD)    += -lrte_pmd_vmxnet3_uio
+_LDLIBS-$(CONFIG_RTE_LIBRTE_HV_PMD)	    += -luuid
 
 ifeq ($(CONFIG_RTE_LIBRTE_CRYPTODEV),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_AESNI_MB)    += -lrte_pmd_aesni_mb
-- 
2.11.0

^ permalink raw reply related

* Re: [PATCH v2 16/18] net/ixgbe: create consistent filter
From: Xing, Beilei @ 2017-01-03  2:04 UTC (permalink / raw)
  To: Zhao1, Wei, dev@dpdk.org; +Cc: Zhao1, Wei, Lu, Wenzhuo
In-Reply-To: <1483084390-53159-17-git-send-email-wei.zhao1@intel.com>



> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Wei Zhao
> Sent: Friday, December 30, 2016 3:53 PM
> To: dev@dpdk.org
> Cc: Zhao1, Wei <wei.zhao1@intel.com>; Lu, Wenzhuo
> <wenzhuo.lu@intel.com>
> Subject: [dpdk-dev] [PATCH v2 16/18] net/ixgbe: create consistent filter
> 
> This patch adds a function to create the flow directory filter.
> 
> Signed-off-by: Wei Zhao <wei.zhao1@intel.com>
> Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com>
> 
> ---
> 
> v2:
> --add new error set function
> ---
>  drivers/net/ixgbe/ixgbe_ethdev.c | 240
> ++++++++++++++++++++++++++++++++++++++-
>  drivers/net/ixgbe/ixgbe_ethdev.h |   5 +
>  2 files changed, 244 insertions(+), 1 deletion(-)
> 
> diff --git a/drivers/net/ixgbe/ixgbe_ethdev.c
> b/drivers/net/ixgbe/ixgbe_ethdev.c
> index c98aa0d..1c857fc 100644
> --- a/drivers/net/ixgbe/ixgbe_ethdev.c
> +++ b/drivers/net/ixgbe/ixgbe_ethdev.c
> @@ -468,6 +468,11 @@ ixgbe_flow_validate(__rte_unused struct
> rte_eth_dev *dev,
>  		const struct rte_flow_item pattern[],
>  		const struct rte_flow_action actions[],
>  		struct rte_flow_error *error);
> +static struct rte_flow *ixgbe_flow_create(struct rte_eth_dev *dev,
> +		const struct rte_flow_attr *attr,
> +		const struct rte_flow_item pattern[],
> +		const struct rte_flow_action actions[],
> +		struct rte_flow_error *error);
>  static int ixgbe_flow_flush(struct rte_eth_dev *dev,
>  		struct rte_flow_error *error);
>  /*
> @@ -850,11 +855,55 @@ static const struct rte_ixgbe_xstats_name_off
> rte_ixgbevf_stats_strings[] = {
>  		sizeof(rte_ixgbevf_stats_strings[0]))
>  static const struct rte_flow_ops ixgbe_flow_ops = {
>  	ixgbe_flow_validate,
> -	NULL,
> +	ixgbe_flow_create,
>  	NULL,
>  	ixgbe_flow_flush,
>  	NULL,
>  };
> +/* ntuple filter list structure */
> +struct ixgbe_ntuple_filter_ele {
> +	TAILQ_ENTRY(ixgbe_ntuple_filter_ele) entries;
> +	struct rte_eth_ntuple_filter filter_info; };
> +/* ethertype filter list structure */
> +struct ixgbe_ethertype_filter_ele {
> +	TAILQ_ENTRY(ixgbe_ethertype_filter_ele) entries;
> +	struct rte_eth_ethertype_filter filter_info; };
> +/* syn filter list structure */
> +struct ixgbe_eth_syn_filter_ele {
> +	TAILQ_ENTRY(ixgbe_eth_syn_filter_ele) entries;
> +	struct rte_eth_syn_filter filter_info; };
> +/* fdir filter list structure */
> +struct ixgbe_fdir_rule_ele {
> +	TAILQ_ENTRY(ixgbe_fdir_rule_ele) entries;
> +	struct ixgbe_fdir_rule filter_info;
> +};
> +/* l2_tunnel filter list structure */
> +struct ixgbe_eth_l2_tunnel_conf_ele {
> +	TAILQ_ENTRY(ixgbe_eth_l2_tunnel_conf_ele) entries;
> +	struct rte_eth_l2_tunnel_conf filter_info; };
> +/* ixgbe_flow memory list structure */

Why need to define these structures? There has been structures to store the filter before, isn't it?

> +struct ixgbe_flow_mem {
> +	TAILQ_ENTRY(ixgbe_flow_mem) entries;
> +	struct ixgbe_flow *flow;
> +};
> +
> +TAILQ_HEAD(ixgbe_ntuple_filter_list, ixgbe_ntuple_filter_ele); struct
> +ixgbe_ntuple_filter_list filter_ntuple_list;
> +TAILQ_HEAD(ixgbe_ethertype_filter_list, ixgbe_ethertype_filter_ele);
> +struct ixgbe_ethertype_filter_list filter_ethertype_list;
> +TAILQ_HEAD(ixgbe_syn_filter_list, ixgbe_eth_syn_filter_ele); struct
> +ixgbe_syn_filter_list filter_syn_list;
> +TAILQ_HEAD(ixgbe_fdir_rule_filter_list, ixgbe_fdir_rule_ele); struct
> +ixgbe_fdir_rule_filter_list filter_fdir_list;
> +TAILQ_HEAD(ixgbe_l2_tunnel_filter_list, ixgbe_eth_l2_tunnel_conf_ele);
> +struct ixgbe_l2_tunnel_filter_list filter_l2_tunnel_list;
> +TAILQ_HEAD(ixgbe_flow_mem_list, ixgbe_flow_mem); struct
> +ixgbe_flow_mem_list ixgbe_flow_list;
> +
>  /**
>   * Atomically reads the link status information from global
>   * structure rte_eth_dev.
> @@ -1380,6 +1429,14 @@ eth_ixgbe_dev_init(struct rte_eth_dev *eth_dev)
> 
>  	/* initialize l2 tunnel filter list & hash */
>  	ixgbe_l2_tn_filter_init(eth_dev);
> +
> +	TAILQ_INIT(&filter_ntuple_list);
> +	TAILQ_INIT(&filter_ethertype_list);
> +	TAILQ_INIT(&filter_syn_list);
> +	TAILQ_INIT(&filter_fdir_list);
> +	TAILQ_INIT(&filter_l2_tunnel_list);
> +	TAILQ_INIT(&ixgbe_flow_list);
> +
>  	return 0;
>  }
> 

^ permalink raw reply

* [WARNING: A/V UNSCANNABLE][PATCH v3 0/6] distributor-performance-improvements
From: David Hunt @ 2017-01-02 10:22 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson
In-Reply-To: <1482381428-148094-2-git-send-email-david.hunt@intel.com>

This patch aims to improve the throughput of the distributor library.

It uses a similar handshake mechanism to the previous version of
the library, in that bits are used to indicate when packets are ready
to be sent to a worker and ready to be returned from a worker. One main
difference is that instead of sending one packet in a cache line, it makes
use of the 7 free spaces in the same cache line in order to send up to
8 packets at a time to/from a worker.

The flow matching algorithm has had significant re-work, and now keeps an
array of inflight flows and an array of backlog flows, and matches incoming
flows to the inflight/backlog flows of all workers so that flow pinning to
workers can be maintained.

The Flow Match algorithm has both scalar and a vector versions, and a
function pointer is used to select the post appropriate function at run time,
depending on the presence of the SSE2 cpu flag. On non-x86 platforms, the
the scalar match function is selected, which should still gives a good boost
in performance over the non-burst API.

v2 changes:
  * Created a common distributor_priv.h header file with common
    definitions and structures.
  * Added a scalar version so it can be built and used on machines without
    sse2 instruction set
  * Added unit autotests
  * Added perf autotest

v3 changes:
  * Addressed mailing list review comments
  * Test code removal
  * Split out SSE match into separate file to facilitate NEON addition
  * Cleaned up conditional compilation flags for SSE2
  * Addressed c99 style compilation errors
  * rebased on latest head (Jan 2 2017, Happy New Year to all)

Notes:
   Apps must now work in bursts, as up to 8 are given to a worker at a time
   For performance in matching, Flow ID's are 15-bits
   Original API (and code) is kept for backward compatibility

Performance Gains
   2.2GHz Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz
   2 x XL710 40GbE NICS to 2 x 40Gbps traffic generator channels 64b packets
   separate cores for rx, tx, distributor
    1 worker  - 4.8x
    4 workers - 2.9x
    8 workers - 1.8x
   12 workers - 2.1x
   16 workers - 1.8x

[PATCH v3 1/6] lib: distributor performance enhancements
[PATCH v3 2/6] lib: add distributor vector flow matching
[PATCH v3 3/6] test: unit tests for new distributor burst api
[PATCH v3 4/6] test: add distributor_perf autotest
[PATCH v3 5/6] example: distributor app showing burst api
[PATCH v3 6/6] doc: distributor library changes for new burst api

^ permalink raw reply

* [WARNING: A/V UNSCANNABLE][PATCH v3 2/6] lib: add distributor vector flow matching
From: David Hunt @ 2017-01-02 10:22 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483352546-171068-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_distributor/Makefile                    |   4 +
 lib/librte_distributor/rte_distributor_burst.c     |  11 +-
 lib/librte_distributor/rte_distributor_match_sse.c | 113 +++++++++++++++++++++
 lib/librte_distributor/rte_distributor_priv.h      |   6 ++
 4 files changed, 133 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_distributor/rte_distributor_match_sse.c

diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
index 2acc54d..a725aaf 100644
--- a/lib/librte_distributor/Makefile
+++ b/lib/librte_distributor/Makefile
@@ -44,6 +44,10 @@ LIBABIVER := 1
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_burst.c
+ifeq ($(CONFIG_RTE_ARCH_X86),y)
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_match_sse.c
+endif
+
 
 # install this header file
 SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
diff --git a/lib/librte_distributor/rte_distributor_burst.c b/lib/librte_distributor/rte_distributor_burst.c
index ae7cf9d..35044c4 100644
--- a/lib/librte_distributor/rte_distributor_burst.c
+++ b/lib/librte_distributor/rte_distributor_burst.c
@@ -352,6 +352,9 @@ rte_distributor_process_burst(struct rte_distributor_burst *d,
 		}
 
 		switch (d->dist_match_fn) {
+		case RTE_DIST_MATCH_VECTOR:
+			find_match_vec(d, &flows[0], &matches[0]);
+			break;
 		default:
 			find_match_scalar(d, &flows[0], &matches[0]);
 		}
@@ -538,7 +541,13 @@ rte_distributor_create_burst(const char *name,
 	snprintf(d->name, sizeof(d->name), "%s", name);
 	d->num_workers = num_workers;
 
-	d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+#if defined(RTE_ARCH_X86)
+	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2)) {
+		d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
+	} else {
+#endif
+		d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+	}
 
 	/*
 	 * Set up the backog tags so they're pointing at the second cache
diff --git a/lib/librte_distributor/rte_distributor_match_sse.c b/lib/librte_distributor/rte_distributor_match_sse.c
new file mode 100644
index 0000000..78641f5
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_match_sse.c
@@ -0,0 +1,113 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_mbuf.h>
+#include "rte_distributor_priv.h"
+#include "rte_distributor_burst.h"
+#include "smmintrin.h"
+
+
+void
+find_match_vec(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr)
+{
+	/* Setup */
+	__m128i incoming_fids;
+	__m128i inflight_fids;
+	__m128i preflight_fids;
+	__m128i wkr;
+	__m128i mask1;
+	__m128i mask2;
+	__m128i output;
+	struct rte_distributor_backlog *bl;
+	uint16_t i;
+
+	/*
+	 * Function overview:
+	 * 2. Loop through all worker ID's
+	 *  2a. Load the current inflights for that worker into an xmm reg
+	 *  2b. Load the current backlog for that worker into an xmm reg
+	 *  2c. use cmpestrm to intersect flow_ids with backlog and inflights
+	 *  2d. Add any matches to the output
+	 * 3. Write the output xmm (matching worker ids).
+	 */
+
+
+	output = _mm_set1_epi16(0);
+	incoming_fids = _mm_load_si128((__m128i *)data_ptr);
+
+	for (i = 0; i < d->num_workers; i++) {
+		bl = &d->backlog[i];
+
+		inflight_fids =
+			_mm_load_si128((__m128i *)&(d->in_flight_tags[i]));
+		preflight_fids =
+			_mm_load_si128((__m128i *)(bl->tags));
+
+		/*
+		 * Any incoming_fid that exists anywhere in inflight_fids will
+		 * have 0xffff in same position of the mask as the incoming fid
+		 * Example (shortened to bytes for brevity):
+		 * incoming_fids   0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08
+		 * inflight_fids   0x03 0x05 0x07 0x00 0x00 0x00 0x00 0x00
+		 * mask            0x00 0x00 0xff 0x00 0xff 0x00 0xff 0x00
+		 */
+
+		mask1 = _mm_cmpestrm(inflight_fids, 8, incoming_fids, 8,
+			_SIDD_UWORD_OPS |
+			_SIDD_CMP_EQUAL_ANY |
+			_SIDD_UNIT_MASK);
+		mask2 = _mm_cmpestrm(preflight_fids, 8, incoming_fids, 8,
+			_SIDD_UWORD_OPS |
+			_SIDD_CMP_EQUAL_ANY |
+			_SIDD_UNIT_MASK);
+
+		mask1 = _mm_or_si128(mask1, mask2);
+		/*
+		 * Now mask contains 0xffff where there's a match.
+		 * Next we need to store the worker_id in the relevant position
+		 * in the output.
+		 */
+
+		wkr = _mm_set1_epi16(i+1);
+		mask1 = _mm_and_si128(mask1, wkr);
+		output = _mm_or_si128(mask1, output);
+	}
+
+	/*
+	 * At this stage, the output 128-bit contains 8 16-bit values, with
+	 * each non-zero value containing the worker ID on which the
+	 * corresponding flow is pinned to.
+	 */
+	_mm_store_si128((__m128i *)output_ptr, output);
+}
diff --git a/lib/librte_distributor/rte_distributor_priv.h b/lib/librte_distributor/rte_distributor_priv.h
index 833855f..cc2c478 100644
--- a/lib/librte_distributor/rte_distributor_priv.h
+++ b/lib/librte_distributor/rte_distributor_priv.h
@@ -155,6 +155,7 @@ struct rte_distributor {
 /* All different signature compare functions */
 enum rte_distributor_match_function {
 	RTE_DIST_MATCH_SCALAR = 0,
+	RTE_DIST_MATCH_VECTOR,
 	RTE_DIST_MATCH_NUM
 };
 
@@ -182,6 +183,11 @@ struct rte_distributor_burst {
 	enum rte_distributor_match_function dist_match_fn;
 };
 
+void
+find_match_vec(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr);
+
 #ifdef __cplusplus
 }
 #endif
-- 
2.7.4

^ permalink raw reply related

* [WARNING: A/V UNSCANNABLE][PATCH v3 1/6] lib: distributor performance enhancements
From: David Hunt @ 2017-01-02 10:22 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483352546-171068-1-git-send-email-david.hunt@intel.com>

Now sends bursts of up to 8 mbufs to each worker, and tracks
the in-flight flow-ids (atomic scheduling)

New file with a new api, similar to the old API except with _burst
at the end of the function names

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_distributor/Makefile                |   2 +
 lib/librte_distributor/rte_distributor.c       |  72 +---
 lib/librte_distributor/rte_distributor_burst.c | 558 +++++++++++++++++++++++++
 lib/librte_distributor/rte_distributor_burst.h | 255 +++++++++++
 lib/librte_distributor/rte_distributor_priv.h  | 189 +++++++++
 5 files changed, 1005 insertions(+), 71 deletions(-)
 create mode 100644 lib/librte_distributor/rte_distributor_burst.c
 create mode 100644 lib/librte_distributor/rte_distributor_burst.h
 create mode 100644 lib/librte_distributor/rte_distributor_priv.h

diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
index 4c9af17..2acc54d 100644
--- a/lib/librte_distributor/Makefile
+++ b/lib/librte_distributor/Makefile
@@ -43,9 +43,11 @@ LIBABIVER := 1
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_burst.c
 
 # install this header file
 SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include += rte_distributor_burst.h
 
 # this lib needs eal
 DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index f3f778c..c05f6e3 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -40,79 +40,9 @@
 #include <rte_errno.h>
 #include <rte_string_fns.h>
 #include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
 #include "rte_distributor.h"
 
-#define NO_FLAGS 0
-#define RTE_DISTRIB_PREFIX "DT_"
-
-/* we will use the bottom four bits of pointer for flags, shifting out
- * the top four bits to make room (since a 64-bit pointer actually only uses
- * 48 bits). An arithmetic-right-shift will then appropriately restore the
- * original pointer value with proper sign extension into the top bits. */
-#define RTE_DISTRIB_FLAG_BITS 4
-#define RTE_DISTRIB_FLAGS_MASK (0x0F)
-#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
-#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
-#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
-
-#define RTE_DISTRIB_BACKLOG_SIZE 8
-#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
-
-#define RTE_DISTRIB_MAX_RETURNS 128
-#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
-
-/**
- * Maximum number of workers allowed.
- * Be aware of increasing the limit, becaus it is limited by how we track
- * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
- */
-#define RTE_DISTRIB_MAX_WORKERS	64
-
-/**
- * Buffer structure used to pass the pointer data between cores. This is cache
- * line aligned, but to improve performance and prevent adjacent cache-line
- * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
- * the next cache line to worker 0, we pad this out to three cache lines.
- * Only 64-bits of the memory is actually used though.
- */
-union rte_distributor_buffer {
-	volatile int64_t bufptr64;
-	char pad[RTE_CACHE_LINE_SIZE*3];
-} __rte_cache_aligned;
-
-struct rte_distributor_backlog {
-	unsigned start;
-	unsigned count;
-	int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
-};
-
-struct rte_distributor_returned_pkts {
-	unsigned start;
-	unsigned count;
-	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
-};
-
-struct rte_distributor {
-	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
-
-	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
-	unsigned num_workers;                 /**< Number of workers polling */
-
-	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
-		/**< Tracks the tag being processed per core */
-	uint64_t in_flight_bitmask;
-		/**< on/off bits for in-flight tags.
-		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
-		 * the bitmask has to expand.
-		 */
-
-	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
-
-	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
-
-	struct rte_distributor_returned_pkts returns;
-};
-
 TAILQ_HEAD(rte_distributor_list, rte_distributor);
 
 static struct rte_tailq_elem rte_distributor_tailq = {
diff --git a/lib/librte_distributor/rte_distributor_burst.c b/lib/librte_distributor/rte_distributor_burst.c
new file mode 100644
index 0000000..ae7cf9d
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.c
@@ -0,0 +1,558 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memory.h>
+#include <rte_cycles.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
+#include "rte_distributor_burst.h"
+
+TAILQ_HEAD(rte_dist_burst_list, rte_distributor_burst);
+
+static struct rte_tailq_elem rte_dist_burst_tailq = {
+	.name = "RTE_DIST_BURST",
+};
+EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
+
+/**** APIs called by workers ****/
+
+/**** Burst Packet APIs called by workers ****/
+
+/* This function should really be called return_pkt_burst() */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[worker_id]);
+	unsigned int i;
+
+	volatile int64_t *retptr64;
+
+
+	/* if we dont' have any packets to return, return. */
+	if (count == 0)
+		return;
+
+	retptr64 = &(buf->retptr64[0]);
+	/* Spin while handshake bits are set (scheduler clears it) */
+	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
+		rte_pause();
+		uint64_t t = rte_rdtsc()+100;
+
+		while (rte_rdtsc() < t)
+			rte_pause();
+	}
+
+	/*
+	 * OK, if we've got here, then the scheduler has just cleared the
+	 * handshake bits. Populate the retptrs with returning packets.
+	 */
+
+	for (i = count; i < RTE_DIST_BURST_SIZE; i++)
+		buf->retptr64[i] = 0;
+
+	/* Set Return bit for each packet returned */
+	for (i = count; i-- > 0; )
+		buf->retptr64[i] =
+			(((int64_t)(uintptr_t)(oldpkt[i])) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/*
+	 * Finally, set the GET_BUF  to signal to distributor that cache
+	 * line is ready for processing
+	 */
+	*retptr64 |= RTE_DISTRIB_GET_BUF;
+}
+
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	uint64_t ret;
+	int count = 0;
+	unsigned int i;
+
+	/* If bit is set, return */
+	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
+		return 0;
+
+	/* since bufptr64 is signed, this should be an arithmetic shift */
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+		if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
+			ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
+			pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
+		}
+	}
+
+	/*
+	 * so now we've got the contents of the cacheline into an  array of
+	 * mbuf pointers, so toggle the bit so scheduler can start working
+	 * on the next cacheline while we're working.
+	 */
+	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+
+	return count;
+}
+
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts,
+		struct rte_mbuf **oldpkt, unsigned int return_count)
+{
+	unsigned int count;
+	uint64_t retries = 0;
+
+	rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
+
+	count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	while (count == 0) {
+		rte_pause();
+		retries++;
+		if (retries > 1000)
+			return 0;
+
+		uint64_t t = rte_rdtsc()+100;
+
+		while (rte_rdtsc() < t)
+			rte_pause();
+
+		count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	}
+	return count;
+}
+
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	unsigned int i;
+
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+		/* Switch off the return bit first */
+		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+
+	for (i = num; i-- > 0; )
+		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/* set the GET_BUF but even if we got no returns */
+	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+	return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor_burst *d,
+		unsigned int *ret_start, unsigned int *ret_count)
+{
+	if (!oldbuf)
+		return;
+	/* store returns in a circular buffer */
+	d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
+			= (void *)oldbuf;
+	*ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
+	*ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
+}
+
+static inline void
+find_match_scalar(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr)
+{
+	struct rte_distributor_backlog *bl;
+	uint16_t i, j, w;
+
+	/*
+	 * Function overview:
+	 * 1. Loop through all worker ID's
+	 * 2. Compare the current inflights to the incoming tags
+	 * 3. Compare the current backlog to the incoming tags
+	 * 4. Add any matches to the output
+	 */
+
+	for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
+		output_ptr[j] = 0;
+
+	for (i = 0; i < d->num_workers; i++) {
+		bl = &d->backlog[i];
+
+		for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
+			for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+				if (d->in_flight_tags[i][j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+		for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
+			for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+				if (bl->tags[j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+	}
+
+	/*
+	 * At this stage, the output contains 8 16-bit values, with
+	 * each non-zero value containing the worker ID on which the
+	 * corresponding flow is pinned to.
+	 */
+}
+
+
+
+static unsigned int
+handle_returns(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	uintptr_t oldbuf;
+	unsigned int ret_start = d->returns.start,
+			ret_count = d->returns.count;
+	unsigned int count = 0;
+	unsigned int i;
+	/*
+	 * wait for the GET_BUF bit to go high, otherwise we can't send
+	 * the packets to the worker
+	 */
+
+	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
+		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
+					RTE_DISTRIB_FLAG_BITS));
+				/* store returns in a circular buffer */
+				store_return(oldbuf, d, &ret_start, &ret_count);
+				count++;
+				buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+			}
+		}
+		d->returns.start = ret_start;
+		d->returns.count = ret_count;
+		/* Clear for the worker to populate with more returns */
+		buf->retptr64[0] = 0;
+	}
+	return count;
+}
+
+static unsigned int
+release(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	unsigned int i;
+
+	if (d->backlog[wkr].count == 0)
+		return 0;
+
+	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+		rte_pause();
+
+	handle_returns(d, wkr);
+
+	buf->count = 0;
+
+	for (i = 0; i < d->backlog[wkr].count; i++) {
+		d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
+				RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
+		d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
+	}
+	buf->count = i;
+	for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
+		buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
+		d->in_flight_tags[wkr][i] = 0;
+	}
+
+	d->backlog[wkr].count = 0;
+
+	/* Clear the GET bit */
+	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
+	return  buf->count;
+
+}
+
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs)
+{
+	unsigned int next_idx = 0;
+	static unsigned int wkr;
+	struct rte_mbuf *next_mb = NULL;
+	int64_t next_value = 0;
+	uint16_t new_tag = 0;
+	uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
+	unsigned int i, wid;
+	int j, w;
+
+	if (unlikely(num_mbufs == 0)) {
+		/* Flush out all non-full cache-lines to workers. */
+		for (wid = 0 ; wid < d->num_workers; wid++) {
+			if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) {
+				release(d, wid);
+				handle_returns(d, wid);
+			}
+		}
+		return 0;
+	}
+
+	while (next_idx < num_mbufs) {
+		uint16_t matches[RTE_DIST_BURST_SIZE];
+		int pkts;
+
+		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
+			d->bufs[wkr].count = 0;
+
+		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (mbufs[next_idx + i]) {
+				/* flows have to be non-zero */
+				flows[i] = mbufs[next_idx + i]->hash.usr | 1;
+			} else
+				flows[i] = 0;
+		}
+
+		switch (d->dist_match_fn) {
+		default:
+			find_match_scalar(d, &flows[0], &matches[0]);
+		}
+
+		/*
+		 * Matches array now contain the intended worker ID (+1) of
+		 * the incoming packets. Any zeroes need to be assigned
+		 * workers.
+		 */
+
+		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
+			pkts = num_mbufs - next_idx;
+		else
+			pkts = RTE_DIST_BURST_SIZE;
+
+		for (j = 0; j < pkts; j++) {
+
+			next_mb = mbufs[next_idx++];
+			next_value = (((int64_t)(uintptr_t)next_mb) <<
+					RTE_DISTRIB_FLAG_BITS);
+			/*
+			 * User is advocated to set tag vaue for each
+			 * mbuf before calling rte_distributor_process.
+			 * User defined tags are used to identify flows,
+			 * or sessions.
+			 */
+			/* flows MUST be non-zero */
+			new_tag = (uint16_t)(next_mb->hash.usr) | 1;
+
+			/*
+			 * Uncommenting the next line will cause the find_match
+			 * function to be optimised out, making this function
+			 * do parallel (non-atomic) distribution
+			 */
+			/* matches[j] = 0; */
+
+			if (matches[j]) {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[matches[j]-1];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, matches[j]-1);
+				}
+
+				/* Add to worker that already has flow */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+
+			} else {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[wkr];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, wkr);
+				}
+
+				/* Add to current worker worker */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+				/*
+				 * Now that we've just added an unpinned flow
+				 * to a worker, we need to ensure that all
+				 * other packets with that same flow will go
+				 * to the same worker in this burst.
+				 */
+				for (w = j; w < pkts; w++)
+					if (flows[w] == new_tag)
+						matches[w] = wkr+1;
+			}
+		}
+		wkr++;
+		if (wkr >= d->num_workers)
+			wkr = 0;
+	}
+
+	/* Flush out all non-full cache-lines to workers. */
+	for (wid = 0 ; wid < d->num_workers; wid++)
+		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+			release(d, wid);
+
+	return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs)
+{
+	struct rte_distributor_returned_pkts *returns = &d->returns;
+	unsigned int retval = (max_mbufs < returns->count) ?
+			max_mbufs : returns->count;
+	unsigned int i;
+
+	for (i = 0; i < retval; i++) {
+		unsigned int idx = (returns->start + i) &
+				RTE_DISTRIB_RETURNS_MASK;
+
+		mbufs[i] = returns->mbufs[idx];
+	}
+	returns->start += i;
+	returns->count -= i;
+
+	return retval;
+}
+
+/*
+ * Return the number of packets in-flight in a distributor, i.e. packets
+ * being workered on or queued up in a backlog.
+ */
+static inline unsigned int
+total_outstanding(const struct rte_distributor_burst *d)
+{
+	unsigned int wkr, total_outstanding = 0;
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		total_outstanding += d->backlog[wkr].count;
+
+	return total_outstanding;
+}
+
+/*
+ * Flush the distributor, so that there are no outstanding packets in flight or
+ * queued up.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d)
+{
+	const unsigned int flushed = total_outstanding(d);
+	unsigned int wkr;
+
+	while (total_outstanding(d) > 0)
+		rte_distributor_process_burst(d, NULL, 0);
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		handle_returns(d, wkr);
+
+	return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d)
+{
+	unsigned int wkr;
+
+	/* throw away returns, so workers can exit */
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		d->bufs[wkr].retptr64[0] = 0;
+}
+
+/* creates a distributor instance */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name,
+		unsigned int socket_id,
+		unsigned int num_workers)
+{
+	struct rte_distributor_burst *d;
+	struct rte_dist_burst_list *dist_burst_list;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	const struct rte_memzone *mz;
+	unsigned int i;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+
+	if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+	mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+	if (mz == NULL) {
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	d = mz->addr;
+	snprintf(d->name, sizeof(d->name), "%s", name);
+	d->num_workers = num_workers;
+
+	d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+
+	/*
+	 * Set up the backog tags so they're pointing at the second cache
+	 * line for performance during flow matching
+	 */
+	for (i = 0 ; i < num_workers ; i++)
+		d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
+
+	dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
+					  rte_dist_burst_list);
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+	TAILQ_INSERT_TAIL(dist_burst_list, d, next);
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return d;
+}
diff --git a/lib/librte_distributor/rte_distributor_burst.h b/lib/librte_distributor/rte_distributor_burst.h
new file mode 100644
index 0000000..5096b13
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.h
@@ -0,0 +1,255 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_BURST_H_
+#define _RTE_DIST_BURST_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rte_distributor_burst;
+struct rte_mbuf;
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ *   The name to be given to the distributor instance.
+ * @param socket_id
+ *   The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ *   The maximum number of workers that will request packets from this
+ *   distributor
+ * @return
+ *   The newly created distributor instance
+ */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name, unsigned int socket_id,
+		unsigned int num_workers);
+
+/*  *** APIS to be called on the distributor lcore ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on a
+ * single lcore which acts as the distributor lcore for a given distributor
+ * instance. These functions cannot be called on multiple cores simultaneously
+ * without using locking to protect access to the internals of the distributor.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be processed on different cores at
+ * the same time.
+ *
+ * The user is advocated to set tag for each mbuf before calling this function.
+ * If user doesn't set the tag, the tag value can be various values depending on
+ * driver implementation and configuration.
+ *
+ * This is not multi-thread safe and should only be called on a single lcore.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs to be distributed
+ * @param num_mbufs
+ *   The number of mbufs in the mbufs array
+ * @return
+ *   The number of mbufs processed.
+ */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs pointer array to be filled in
+ * @param max_mbufs
+ *   The size of the mbufs array
+ * @return
+ *   The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @return
+ *   The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d);
+
+/*  *** APIS to be called on the worker lcores ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on
+ * multiple lcores which act as workers for a distributor. Each lcore should use
+ * a unique worker id when requesting packets.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * API called by a worker to get new packets to process. Any previous packets
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param pkts
+ *   The mbufs pointer array to be filled in (up to 8 packets)
+ * @param oldpkt
+ *   The previous packet, if any, being processed by the worker
+ * @param retcount
+ *   The number of packets being returned
+ *
+ * @return
+ *   The number of packets in the pkts array
+ */
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **pkts,
+	struct rte_mbuf **oldpkt, unsigned int retcount);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbuf
+ *   The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **oldpkt, int num);
+
+/**
+ * API called by a worker to request a new packet to process.
+ * Any previous packet given to the worker is assumed to have completed
+ * processing, and may be optionally returned to the distributor via
+ * the oldpkt parameter.
+ * Unlike rte_distributor_get_pkt_burst(), this function does not wait for a
+ * new packet to be provided by the distributor.
+ *
+ * NOTE: after calling this function, rte_distributor_poll_pkt_burst() should
+ * be used to poll for the packet requested. The rte_distributor_get_pkt_burst()
+ * API should *not* be used to try and retrieve the new packet.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param oldpkt
+ *   The returning packets, if any, processed by the worker
+ * @param count
+ *   The number of returning packets
+ */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count);
+
+/**
+ * API called by a worker to check for a new packet that was previously
+ * requested by a call to rte_distributor_request_pkt(). It does not wait
+ * for the new packet to be available, but returns NULL if the request has
+ * not yet been fulfilled by the distributor.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbufs
+ *   The array of mbufs being given to the worker
+ *
+ * @return
+ *   The number of packets being given to the worker thread, zero if no
+ *   packet is yet available.
+ */
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **mbufs);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_distributor/rte_distributor_priv.h b/lib/librte_distributor/rte_distributor_priv.h
new file mode 100644
index 0000000..833855f
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_priv.h
@@ -0,0 +1,189 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_PRIV_H_
+#define _RTE_DIST_PRIV_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/*
+ * We will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits.
+ */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
+#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
+#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
+#define RTE_DISTRIB_VALID_BUF (4)  /**< set if bufptr contains ptr */
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS 64
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to three cache lines.
+ * Only 64-bits of the memory is actually used though.
+ */
+union rte_distributor_buffer {
+	volatile int64_t bufptr64;
+	char pad[RTE_CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+/**
+ * Number of packets to deal with in bursts. Needs to be 8 so as to
+ * fit in one cache line.
+ */
+#define RTE_DIST_BURST_SIZE (sizeof(__m128i) / sizeof(uint16_t))
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to two cache lines.
+ * We can pass up to 8 mbufs at a time in one cacheline.
+ * There is a separate cacheline for returns in the burst API.
+ */
+struct rte_distributor_buffer_burst {
+	volatile int64_t bufptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= outgoing to worker */
+
+	int64_t pad1 __rte_cache_aligned;    /* <= one cache line  */
+
+	volatile int64_t retptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= incoming from worker */
+
+	int64_t pad2 __rte_cache_aligned;    /* <= one cache line  */
+
+	int count __rte_cache_aligned;       /* <= number of current mbufs */
+};
+
+
+struct rte_distributor_backlog {
+	unsigned int start;
+	unsigned int count;
+	int64_t pkts[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
+	uint16_t *tags; /* will point to second cacheline of inflights */
+} __rte_cache_aligned;
+
+
+struct rte_distributor_returned_pkts {
+	unsigned int start;
+	unsigned int count;
+	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+		/**< Tracks the tag being processed per core */
+	uint64_t in_flight_bitmask;
+		/**< on/off bits for in-flight tags.
+		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+		 * the bitmask has to expand.
+		 */
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
+
+	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+};
+
+/* All different signature compare functions */
+enum rte_distributor_match_function {
+	RTE_DIST_MATCH_SCALAR = 0,
+	RTE_DIST_MATCH_NUM
+};
+
+struct rte_distributor_burst {
+	TAILQ_ENTRY(rte_distributor_burst) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	/**>
+	 * First cache line in the this array are the tags inflight
+	 * on the worker core. Second cache line are the backlog
+	 * that are going to go to the worker core.
+	 */
+	uint16_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS][RTE_DIST_BURST_SIZE*2]
+			__rte_cache_aligned;
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]
+			__rte_cache_aligned;
+
+	struct rte_distributor_buffer_burst bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+
+	enum rte_distributor_match_function dist_match_fn;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
-- 
2.7.4

^ permalink raw reply related

* [WARNING: A/V UNSCANNABLE][PATCH v3 3/6] test: unit tests for new distributor burst api
From: David Hunt @ 2017-01-02 10:22 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483352546-171068-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor.c | 501 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 392 insertions(+), 109 deletions(-)

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index 85cb8f3..3871f86 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -40,11 +40,24 @@
 #include <rte_mempool.h>
 #include <rte_mbuf.h>
 #include <rte_distributor.h>
+#include <rte_distributor_burst.h>
 
 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
 #define BURST 32
 #define BIG_BATCH 1024
 
+#define DIST_SINGLE 0
+#define DIST_BURST  1
+#define DIST_NUM_TYPES 2
+
+struct worker_params {
+	struct rte_distributor *d;
+	struct rte_distributor_burst *db;
+	int dist_type;
+};
+
+struct worker_params worker_params;
+
 /* statics - all zero-initialized by default */
 static volatile int quit;      /**< general quit variable for all threads */
 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
@@ -81,17 +94,36 @@ static int
 handle_work(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt(d, id, NULL);
-	while (!quit) {
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0, num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+	int i;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+		while (!quit) {
+			worker_stats[id].handled_packets++, count++;
+			pkt = rte_distributor_get_pkt(d, id, pkt);
+		}
 		worker_stats[id].handled_packets++, count++;
-		pkt = rte_distributor_get_pkt(d, id, pkt);
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		for (i = 0; i < 8; i++)
+			buf[i] = NULL;
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
+		while (!quit) {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			num = rte_distributor_get_pkt_burst(db, id,
+					buf, buf, num);
+		}
+		worker_stats[id].handled_packets += num;
+		count += num;
+		rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 	return 0;
 }
 
@@ -107,12 +139,21 @@ handle_work(void *arg)
  *   not necessarily in the same order (as different flows).
  */
 static int
-sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+sanity_test(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
-	unsigned i;
+	struct rte_mbuf *returns[BURST*2];
+	unsigned int i;
+	unsigned int retries;
+	unsigned int count = 0;
+
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Basic distributor sanity tests (single) ===\n");
+	else
+		printf("=== Basic distributor sanity tests (burst) ===\n");
 
-	printf("=== Basic distributor sanity tests ===\n");
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
@@ -124,8 +165,21 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process(d, bufs, BURST);
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		count = 0;
+		do {
+
+			rte_distributor_flush_burst(db);
+			count += rte_distributor_returned_pkts_burst(db,
+					returns, BURST*2);
+		} while (count < BURST);
+	}
+
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -146,8 +200,18 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 		for (i = 0; i < BURST; i++)
 			bufs[i]->hash.usr = (i & 1) << 8;
 
-		rte_distributor_process(d, bufs, BURST);
-		rte_distributor_flush(d);
+		if (wp->dist_type == DIST_SINGLE) {
+			rte_distributor_process(d, bufs, BURST);
+			rte_distributor_flush(d);
+		} else {
+			rte_distributor_process_burst(db, bufs, BURST);
+			count = 0;
+			do {
+				rte_distributor_flush_burst(db);
+				count += rte_distributor_returned_pkts_burst(db,
+						returns, BURST*2);
+			} while (count < BURST);
+		}
 		if (total_packet_count() != BURST) {
 			printf("Line %d: Error, not all packets flushed. "
 					"Expected %u, got %u\n",
@@ -155,24 +219,32 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 			return -1;
 		}
 
+
 		for (i = 0; i < rte_lcore_count() - 1; i++)
 			printf("Worker %u handled %u packets\n", i,
 					worker_stats[i].handled_packets);
 		printf("Sanity test with two hash values done\n");
-
-		if (worker_stats[0].handled_packets != 16 ||
-				worker_stats[1].handled_packets != 16)
-			return -1;
 	}
 
 	/* give a different hash value to each packet,
 	 * so load gets distributed */
 	clear_packet_count();
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = i;
+		bufs[i]->hash.usr = i+1;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		count = 0;
+		do {
+			rte_distributor_flush_burst(db);
+			count += rte_distributor_returned_pkts_burst(db,
+					returns, BURST*2);
+		} while (count < BURST);
+	}
 
-	rte_distributor_process(d, bufs, BURST);
-	rte_distributor_flush(d);
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -194,8 +266,15 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	unsigned num_returned = 0;
 
 	/* flush out any remaining packets */
-	rte_distributor_flush(d);
-	rte_distributor_clear_returns(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_flush(d);
+		rte_distributor_clear_returns(d);
+	} else {
+		rte_distributor_flush_burst(db);
+		rte_distributor_clear_returns_burst(db);
+	}
+
+
 	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
@@ -203,28 +282,59 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	for (i = 0; i < BIG_BATCH; i++)
 		many_bufs[i]->hash.usr = i << 2;
 
-	for (i = 0; i < BIG_BATCH/BURST; i++) {
-		rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+	if (wp->dist_type == DIST_SINGLE) {
+		printf("===testing single big burst===\n");
+		for (i = 0; i < BIG_BATCH/BURST; i++) {
+			rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+			num_returned += rte_distributor_returned_pkts(d,
+					&return_bufs[num_returned],
+					BIG_BATCH - num_returned);
+		}
+		rte_distributor_flush(d);
 		num_returned += rte_distributor_returned_pkts(d,
 				&return_bufs[num_returned],
 				BIG_BATCH - num_returned);
+	} else {
+		printf("===testing burst big burst===\n");
+		for (i = 0; i < BIG_BATCH/BURST; i++) {
+			rte_distributor_process_burst(db,
+					&many_bufs[i*BURST], BURST);
+			count = rte_distributor_returned_pkts_burst(db,
+					&return_bufs[num_returned],
+					BIG_BATCH - num_returned);
+			num_returned += count;
+		}
+		rte_distributor_flush_burst(db);
+		count = rte_distributor_returned_pkts_burst(db,
+				&return_bufs[num_returned],
+				BIG_BATCH - num_returned);
+		num_returned += count;
 	}
-	rte_distributor_flush(d);
-	num_returned += rte_distributor_returned_pkts(d,
-			&return_bufs[num_returned], BIG_BATCH - num_returned);
+	retries = 0;
+	do {
+		rte_distributor_flush_burst(db);
+		count = rte_distributor_returned_pkts_burst(db,
+				&return_bufs[num_returned],
+				BIG_BATCH - num_returned);
+		num_returned += count;
+		retries++;
+	} while ((num_returned < BIG_BATCH) && (retries < 100));
+
 
 	if (num_returned != BIG_BATCH) {
-		printf("line %d: Number returned is not the same as "
-				"number sent\n", __LINE__);
+		printf("line %d: Missing packets, expected %d\n",
+				__LINE__, num_returned);
 		return -1;
 	}
+
 	/* big check -  make sure all packets made it back!! */
 	for (i = 0; i < BIG_BATCH; i++) {
 		unsigned j;
 		struct rte_mbuf *src = many_bufs[i];
-		for (j = 0; j < BIG_BATCH; j++)
+		for (j = 0; j < BIG_BATCH; j++) {
 			if (return_bufs[j] == src)
 				break;
+		}
 
 		if (j == BIG_BATCH) {
 			printf("Error: could not find source packet #%u\n", i);
@@ -234,7 +344,6 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	printf("Sanity test of returned packets done\n");
 
 	rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
-
 	printf("\n");
 	return 0;
 }
@@ -249,18 +358,40 @@ static int
 handle_work_with_free_mbufs(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt(d, id, NULL);
-	while (!quit) {
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0;
+	unsigned int i;
+	unsigned int num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	if (wp->dist_type == DIST_SINGLE) {
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+		while (!quit) {
+			worker_stats[id].handled_packets++, count++;
+			rte_pktmbuf_free(pkt);
+			pkt = rte_distributor_get_pkt(d, id, pkt);
+		}
 		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt(d, id, pkt);
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		for (i = 0; i < 8; i++)
+			buf[i] = NULL;
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
+		while (!quit) {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			for (i = 0; i < num; i++)
+				rte_pktmbuf_free(buf[i]);
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+		}
+		worker_stats[id].handled_packets += num;
+		count += num;
+		rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 	return 0;
 }
 
@@ -270,26 +401,45 @@ handle_work_with_free_mbufs(void *arg)
  * library.
  */
 static int
-sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
+sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	unsigned i;
 	struct rte_mbuf *bufs[BURST];
 
-	printf("=== Sanity test with mbuf alloc/free  ===\n");
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Sanity test with mbuf alloc/free (single) ===\n");
+	else
+		printf("=== Sanity test with mbuf alloc/free (burst)  ===\n");
+
 	clear_packet_count();
 	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
 		unsigned j;
-		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
-			rte_distributor_process(d, NULL, 0);
+		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) {
+			if (wp->dist_type == DIST_SINGLE)
+				rte_distributor_process(d, NULL, 0);
+			else
+				rte_distributor_process_burst(db, NULL, 0);
+		}
 		for (j = 0; j < BURST; j++) {
 			bufs[j]->hash.usr = (i+j) << 1;
 			rte_mbuf_refcnt_set(bufs[j], 1);
 		}
 
-		rte_distributor_process(d, bufs, BURST);
+		if (wp->dist_type == DIST_SINGLE)
+			rte_distributor_process(d, bufs, BURST);
+		else
+			rte_distributor_process_burst(db, bufs, BURST);
 	}
 
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_flush(d);
+	else
+		rte_distributor_flush_burst(db);
+
+	rte_delay_us(10000);
+
 	if (total_packet_count() < (1<<ITER_POWER)) {
 		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
 				__LINE__, total_packet_count(),
@@ -305,20 +455,48 @@ static int
 handle_work_for_shutdown_test(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0;
+	unsigned int num = 0;
+	unsigned int total = 0;
+	unsigned int i;
+	unsigned int returned = 0;
+	const unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	if (wp->dist_type == DIST_SINGLE)
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+	else
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
 
-	pkt = rte_distributor_get_pkt(d, id, NULL);
 	/* wait for quit single globally, or for worker zero, wait
 	 * for zero_quit */
 	while (!quit && !(id == 0 && zero_quit)) {
-		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt(d, id, NULL);
+		if (wp->dist_type == DIST_SINGLE) {
+			worker_stats[id].handled_packets++, count++;
+			rte_pktmbuf_free(pkt);
+			pkt = rte_distributor_get_pkt(d, id, NULL);
+			num = 1;
+			total += num;
+		} else {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			for (i = 0; i < num; i++)
+				rte_pktmbuf_free(buf[i]);
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+			total += num;
+		}
+	}
+	worker_stats[id].handled_packets += num;
+	count += num;
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		returned = rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 
 	if (id == 0) {
 		/* for worker zero, allow it to restart to pick up last packet
@@ -326,13 +504,29 @@ handle_work_for_shutdown_test(void *arg)
 		 */
 		while (zero_quit)
 			usleep(100);
-		pkt = rte_distributor_get_pkt(d, id, NULL);
+		if (wp->dist_type == DIST_SINGLE) {
+			pkt = rte_distributor_get_pkt(d, id, NULL);
+		} else {
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+		}
 		while (!quit) {
 			worker_stats[id].handled_packets++, count++;
 			rte_pktmbuf_free(pkt);
-			pkt = rte_distributor_get_pkt(d, id, NULL);
+			if (wp->dist_type == DIST_SINGLE) {
+				pkt = rte_distributor_get_pkt(d, id, NULL);
+			} else {
+				num = rte_distributor_get_pkt_burst(db,
+						id, buf, buf, num);
+			}
+		}
+		if (wp->dist_type == DIST_SINGLE) {
+			rte_distributor_return_pkt(d, id, pkt);
+		} else {
+			returned = rte_distributor_return_pkt_burst(db,
+					id, buf, num);
+			printf("Num returned = %d\n", returned);
 		}
-		rte_distributor_return_pkt(d, id, pkt);
 	}
 	return 0;
 }
@@ -344,26 +538,37 @@ handle_work_for_shutdown_test(void *arg)
  * library.
  */
 static int
-sanity_test_with_worker_shutdown(struct rte_distributor *d,
+sanity_test_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
 	printf("=== Sanity test of worker shutdown ===\n");
 
 	clear_packet_count();
+
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
 	}
 
-	/* now set all hash values in all buffers to zero, so all pkts go to the
-	 * one worker thread */
+	/*
+	 * Now set all hash values in all buffers to same value so all
+	 * pkts go to the one worker thread
+	 */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		rte_distributor_flush_burst(db);
+	}
 
-	rte_distributor_process(d, bufs, BURST);
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -374,14 +579,25 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
 
 	/* get worker zero to quit */
 	zero_quit = 1;
-	rte_distributor_process(d, bufs, BURST);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		/* flush the distributor */
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		/* flush the distributor */
+		rte_distributor_flush_burst(db);
+	}
+	rte_delay_us(10000);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
 
-	/* flush the distributor */
-	rte_distributor_flush(d);
 	if (total_packet_count() != BURST * 2) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -389,10 +605,6 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Sanity test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -401,13 +613,18 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
  * one worker shuts down..
  */
 static int
-test_flush_with_worker_shutdown(struct rte_distributor *d,
+test_flush_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
-	printf("=== Test flush fn with worker shutdown ===\n");
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Test flush fn with worker shutdown (single) ===\n");
+	else
+		printf("=== Test flush fn with worker shutdown (burst) ===\n");
 
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
@@ -420,7 +637,11 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process(d, bufs, BURST);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_process(d, bufs, BURST);
+	else
+		rte_distributor_process_burst(db, bufs, BURST);
+
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -429,9 +650,18 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 	zero_quit = 1;
 
 	/* flush the distributor */
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_flush(d);
+	else
+		rte_distributor_flush_burst(db);
+
+	rte_delay_us(10000);
 
 	zero_quit = 0;
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -439,10 +669,6 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Flush test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -451,6 +677,7 @@ static
 int test_error_distributor_create_name(void)
 {
 	struct rte_distributor *d = NULL;
+	struct rte_distributor_burst *db = NULL;
 	char *name = NULL;
 
 	d = rte_distributor_create(name, rte_socket_id(),
@@ -460,6 +687,13 @@ int test_error_distributor_create_name(void)
 		return -1;
 	}
 
+	db = rte_distributor_create_burst(name, rte_socket_id(),
+			rte_lcore_count() - 1);
+	if (db != NULL || rte_errno != EINVAL) {
+		printf("ERROR: No error on create_burst() with NULL param\n");
+		return -1;
+	}
+
 	return 0;
 }
 
@@ -468,20 +702,32 @@ static
 int test_error_distributor_create_numworkers(void)
 {
 	struct rte_distributor *d = NULL;
+	struct rte_distributor_burst *db = NULL;
+
 	d = rte_distributor_create("test_numworkers", rte_socket_id(),
 			RTE_MAX_LCORE + 10);
 	if (d != NULL || rte_errno != EINVAL) {
 		printf("ERROR: No error on create() with num_workers > MAX\n");
 		return -1;
 	}
+
+	db = rte_distributor_create_burst("test_numworkers", rte_socket_id(),
+			RTE_MAX_LCORE + 10);
+	if (db != NULL || rte_errno != EINVAL) {
+		printf("ERROR: No error on create_burst() num_workers > MAX\n");
+		return -1;
+	}
+
 	return 0;
 }
 
 
 /* Useful function which ensures that all worker functions terminate */
 static void
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+quit_workers(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	const unsigned num_workers = rte_lcore_count() - 1;
 	unsigned i;
 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
@@ -491,12 +737,20 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 	quit = 1;
 	for (i = 0; i < num_workers; i++)
 		bufs[i]->hash.usr = i << 1;
-	rte_distributor_process(d, bufs, num_workers);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_process(d, bufs, num_workers);
+	else
+		rte_distributor_process_burst(db, bufs, num_workers);
 
 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
 
-	rte_distributor_process(d, NULL, 0);
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, NULL, 0);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, NULL, 0);
+		rte_distributor_flush_burst(db);
+	}
 	rte_eal_mp_wait_lcore();
 	quit = 0;
 	worker_idx = 0;
@@ -506,7 +760,9 @@ static int
 test_distributor(void)
 {
 	static struct rte_distributor *d;
+	static struct rte_distributor_burst *db;
 	static struct rte_mempool *p;
+	int i;
 
 	if (rte_lcore_count() < 2) {
 		printf("ERROR: not enough cores to test distributor\n");
@@ -525,6 +781,19 @@ test_distributor(void)
 		rte_distributor_clear_returns(d);
 	}
 
+	if (db == NULL) {
+		db = rte_distributor_create_burst("Test_dist_burst",
+				rte_socket_id(),
+				rte_lcore_count() - 1);
+		if (db == NULL) {
+			printf("Error creating burst distributor\n");
+			return -1;
+		}
+	} else {
+		rte_distributor_flush_burst(db);
+		rte_distributor_clear_returns_burst(db);
+	}
+
 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
 	if (p == NULL) {
@@ -536,31 +805,45 @@ test_distributor(void)
 		}
 	}
 
-	rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
-	if (sanity_test(d, p) < 0)
-		goto err;
-	quit_workers(d, p);
+	worker_params.d = d;
+	worker_params.db = db;
 
-	rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
-	if (sanity_test_with_mbuf_alloc(d, p) < 0)
-		goto err;
-	quit_workers(d, p);
+	for (i = 0; i < DIST_NUM_TYPES; i++) {
 
-	if (rte_lcore_count() > 2) {
-		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
-				SKIP_MASTER);
-		if (sanity_test_with_worker_shutdown(d, p) < 0)
-			goto err;
-		quit_workers(d, p);
+		worker_params.dist_type = i;
 
-		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
-				SKIP_MASTER);
-		if (test_flush_with_worker_shutdown(d, p) < 0)
+		rte_eal_mp_remote_launch(handle_work,
+				&worker_params, SKIP_MASTER);
+		if (sanity_test(&worker_params, p) < 0)
 			goto err;
-		quit_workers(d, p);
+		quit_workers(&worker_params, p);
 
-	} else {
-		printf("Not enough cores to run tests for worker shutdown\n");
+		rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
+				&worker_params, SKIP_MASTER);
+		if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
+			goto err;
+		quit_workers(&worker_params, p);
+
+		if (rte_lcore_count() > 2) {
+			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
+					&worker_params,
+					SKIP_MASTER);
+			if (sanity_test_with_worker_shutdown(&worker_params,
+					p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
+			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
+					&worker_params,
+					SKIP_MASTER);
+			if (test_flush_with_worker_shutdown(&worker_params,
+					p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
+		} else {
+			printf("Too few cores to run worker shutdown test\n");
+		}
 	}
 
 	if (test_error_distributor_create_numworkers() == -1 ||
@@ -572,7 +855,7 @@ test_distributor(void)
 	return 0;
 
 err:
-	quit_workers(d, p);
+	quit_workers(&worker_params, p);
 	return -1;
 }
 
-- 
2.7.4

^ permalink raw reply related

* Re: [PATCH] net/i40e: fix wrong return value when handling PF message
From: Lu, Wenzhuo @ 2017-01-03  2:58 UTC (permalink / raw)
  To: Yigit, Ferruh, dev@dpdk.org
In-Reply-To: <3f15d1e2-7fad-21c4-cc1e-122e4cadf2de@intel.com>

Hi Ferruh,


> -----Original Message-----
> From: Yigit, Ferruh
> Sent: Friday, December 23, 2016 12:59 AM
> To: Lu, Wenzhuo; dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH] net/i40e: fix wrong return value when handling
> PF message
> 
> On 12/21/2016 8:29 AM, Wenzhuo Lu wrote:
> > When VF receives a message from PF, it should check the return value.
> > But in i40evf_execute_vf_cmd the value is ignored and not returned to
> > the caller.
> >
> > Fixes: 95cd21f45d1b ("i40evf: allocate virtchnl commands buffer per
> > VF")
> >
> > Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com>
> > ---
> >  drivers/net/i40e/i40e_ethdev.h    | 2 +-
> >  drivers/net/i40e/i40e_ethdev_vf.c | 1 +
> >  2 files changed, 2 insertions(+), 1 deletion(-)
> >
> > diff --git a/drivers/net/i40e/i40e_ethdev.h
> > b/drivers/net/i40e/i40e_ethdev.h index 298cef4..28111a7 100644
> > --- a/drivers/net/i40e/i40e_ethdev.h
> > +++ b/drivers/net/i40e/i40e_ethdev.h
> > @@ -527,7 +527,7 @@ struct i40e_vf {
> >  	enum i40e_aq_link_speed link_speed;
> >  	bool vf_reset;
> >  	volatile uint32_t pend_cmd; /* pending command not finished yet */
> > -	uint32_t cmd_retval; /* return value of the cmd response from PF */
> > +	int32_t cmd_retval; /* return value of the cmd response from PF */
> >  	u16 pend_msg; /* flags indicates events from pf not handled yet */
> >  	uint8_t *aq_resp; /* buffer to store the adminq response from PF */
> >
> > diff --git a/drivers/net/i40e/i40e_ethdev_vf.c
> > b/drivers/net/i40e/i40e_ethdev_vf.c
> > index 12da0ec..5d25764 100644
> > --- a/drivers/net/i40e/i40e_ethdev_vf.c
> > +++ b/drivers/net/i40e/i40e_ethdev_vf.c
> > @@ -361,6 +361,7 @@ struct rte_i40evf_xstats_name_off {
> >  		err = -1;
> >  		do {
> >  			ret = i40evf_read_pfmsg(dev, &info);
> > +			vf->cmd_retval = info.result;
> 
> This is for op_version, and op_get_vf_resources! Which seems good.
> 
> Is something similar required for other commands (default case of the switch),
> but for them not sure how to get retval (event.desc.cookie_low to
> vf->cmd_retval) ?
There's no same issue for the other ops :)

> 
> >  			if (ret == I40EVF_MSG_CMD) {
> >  				err = 0;
> >  				break;
> >

^ permalink raw reply

* [WARNING: A/V UNSCANNABLE][PATCH v3 4/6] test: add distributor_perf autotest
From: David Hunt @ 2017-01-02 10:22 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483352546-171068-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor_perf.c | 148 ++++++++++++++++++++++++++++++++++++---
 1 file changed, 137 insertions(+), 11 deletions(-)

diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
index 7947fe9..b273bf9 100644
--- a/app/test/test_distributor_perf.c
+++ b/app/test/test_distributor_perf.c
@@ -40,9 +40,11 @@
 #include <rte_common.h>
 #include <rte_mbuf.h>
 #include <rte_distributor.h>
+#include <rte_distributor_burst.h>
 
-#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
-#define BURST 32
+#define ITER_POWER_CL 25 /* log 2 of how many iterations  for Cache Line test */
+#define ITER_POWER 21 /* log 2 of how many iterations we do when timing. */
+#define BURST 64
 #define BIG_BATCH 1024
 
 /* static vars - zero initialized by default */
@@ -54,7 +56,8 @@ struct worker_stats {
 } __rte_cache_aligned;
 struct worker_stats worker_stats[RTE_MAX_LCORE];
 
-/* worker thread used for testing the time to do a round-trip of a cache
+/*
+ * worker thread used for testing the time to do a round-trip of a cache
  * line between two cores and back again
  */
 static void
@@ -69,7 +72,8 @@ flip_bit(volatile uint64_t *arg)
 	}
 }
 
-/* test case to time the number of cycles to round-trip a cache line between
+/*
+ * test case to time the number of cycles to round-trip a cache line between
  * two cores and back again.
  */
 static void
@@ -86,7 +90,7 @@ time_cache_line_switch(void)
 		rte_pause();
 
 	const uint64_t start_time = rte_rdtsc();
-	for (i = 0; i < (1 << ITER_POWER); i++) {
+	for (i = 0; i < (1 << ITER_POWER_CL); i++) {
 		while (*pdata)
 			rte_pause();
 		*pdata = 1;
@@ -98,13 +102,14 @@ time_cache_line_switch(void)
 	*pdata = 2;
 	rte_eal_wait_lcore(slaveid);
 	printf("==== Cache line switch test ===\n");
-	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER_CL),
 			end_time-start_time);
 	printf("Ticks per iteration = %"PRIu64"\n\n",
-			(end_time-start_time) >> ITER_POWER);
+			(end_time-start_time) >> ITER_POWER_CL);
 }
 
-/* returns the total count of the number of packets handled by the worker
+/*
+ * returns the total count of the number of packets handled by the worker
  * functions given below.
  */
 static unsigned
@@ -123,7 +128,8 @@ clear_packet_count(void)
 	memset(&worker_stats, 0, sizeof(worker_stats));
 }
 
-/* this is the basic worker function for performance tests.
+/*
+ * this is the basic worker function for performance tests.
  * it does nothing but return packets and count them.
  */
 static int
@@ -144,7 +150,37 @@ handle_work(void *arg)
 	return 0;
 }
 
-/* this basic performance test just repeatedly sends in 32 packets at a time
+/*
+ * this is the basic worker function for performance tests.
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work_burst(void *arg)
+{
+	struct rte_distributor_burst *d = arg;
+	unsigned int count = 0;
+	unsigned int num = 0;
+	int i;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+
+	for (i = 0; i < 8; i++)
+		buf[i] = NULL;
+
+	num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+	while (!quit) {
+		worker_stats[id].handled_packets += num;
+		count += num;
+		num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+	}
+	worker_stats[id].handled_packets += num;
+	count += num;
+	rte_distributor_return_pkt_burst(d, id, buf, num);
+	return 0;
+}
+
+/*
+ * this basic performance test just repeatedly sends in 32 packets at a time
  * to the distributor and verifies at the end that we got them all in the worker
  * threads and finally how long per packet the processing took.
  */
@@ -174,6 +210,8 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
 		rte_distributor_process(d, NULL, 0);
 	} while (total_packet_count() < (BURST << ITER_POWER));
 
+	rte_distributor_clear_returns(d);
+
 	printf("=== Performance test of distributor ===\n");
 	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
 	printf("Time per packet: %"PRIu64"\n\n",
@@ -190,6 +228,55 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
 	return 0;
 }
 
+/*
+ * this basic performance test just repeatedly sends in 32 packets at a time
+ * to the distributor and verifies at the end that we got them all in the worker
+ * threads and finally how long per packet the processing took.
+ */
+static inline int
+perf_test_burst(struct rte_distributor_burst *d, struct rte_mempool *p)
+{
+	unsigned int i;
+	uint64_t start, end;
+	struct rte_mbuf *bufs[BURST];
+
+	clear_packet_count();
+	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+		printf("Error getting mbufs from pool\n");
+		return -1;
+	}
+	/* ensure we have different hash value for each pkt */
+	for (i = 0; i < BURST; i++)
+		bufs[i]->hash.usr = i;
+
+	start = rte_rdtsc();
+	for (i = 0; i < (1<<ITER_POWER); i++)
+		rte_distributor_process_burst(d, bufs, BURST);
+	end = rte_rdtsc();
+
+	do {
+		usleep(100);
+		rte_distributor_process_burst(d, NULL, 0);
+	} while (total_packet_count() < (BURST << ITER_POWER));
+
+	rte_distributor_clear_returns_burst(d);
+
+	printf("=== Performance test of burst distributor ===\n");
+	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
+	printf("Time per packet: %"PRIu64"\n\n",
+			((end - start) >> ITER_POWER)/BURST);
+	rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+	printf("Total packets: %u (%x)\n", total_packet_count(),
+			total_packet_count());
+	printf("=== Perf test done ===\n\n");
+
+	return 0;
+}
+
 /* Useful function which ensures that all worker functions terminate */
 static void
 quit_workers(struct rte_distributor *d, struct rte_mempool *p)
@@ -212,10 +299,34 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 	worker_idx = 0;
 }
 
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers_burst(struct rte_distributor_burst *d, struct rte_mempool *p)
+{
+	const unsigned int num_workers = rte_lcore_count() - 1;
+	unsigned int i;
+	struct rte_mbuf *bufs[RTE_MAX_LCORE];
+
+	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+	quit = 1;
+	for (i = 0; i < num_workers; i++)
+		bufs[i]->hash.usr = i << 1;
+	rte_distributor_process_burst(d, bufs, num_workers);
+
+	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+	rte_distributor_process_burst(d, NULL, 0);
+	rte_eal_mp_wait_lcore();
+	quit = 0;
+	worker_idx = 0;
+}
+
 static int
 test_distributor_perf(void)
 {
 	static struct rte_distributor *d;
+	static struct rte_distributor_burst *db;
 	static struct rte_mempool *p;
 
 	if (rte_lcore_count() < 2) {
@@ -234,10 +345,20 @@ test_distributor_perf(void)
 			return -1;
 		}
 	} else {
-		rte_distributor_flush(d);
 		rte_distributor_clear_returns(d);
 	}
 
+	if (db == NULL) {
+		db = rte_distributor_create_burst("Test_burst", rte_socket_id(),
+				rte_lcore_count() - 1);
+		if (db == NULL) {
+			printf("Error creating burst distributor\n");
+			return -1;
+		}
+	} else {
+		rte_distributor_clear_returns_burst(db);
+	}
+
 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
 	if (p == NULL) {
@@ -254,6 +375,11 @@ test_distributor_perf(void)
 		return -1;
 	quit_workers(d, p);
 
+	rte_eal_mp_remote_launch(handle_work_burst, db, SKIP_MASTER);
+	if (perf_test_burst(db, p) < 0)
+		return -1;
+	quit_workers_burst(db, p);
+
 	return 0;
 }
 
-- 
2.7.4

^ permalink raw reply related

* [WARNING: A/V UNSCANNABLE][PATCH v3 5/6] example: distributor app showing burst api
From: David Hunt @ 2017-01-02 10:22 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483352546-171068-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 examples/distributor/main.c | 508 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 390 insertions(+), 118 deletions(-)

diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index e7641d2..eebfb74 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -1,8 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
- *   All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
  *   modification, are permitted provided that the following conditions
@@ -31,6 +30,8 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#define BURST_API 1
+
 #include <stdint.h>
 #include <inttypes.h>
 #include <unistd.h>
@@ -43,39 +44,87 @@
 #include <rte_malloc.h>
 #include <rte_debug.h>
 #include <rte_prefetch.h>
+#if BURST_API
+#include <rte_distributor_burst.h>
+#else
 #include <rte_distributor.h>
+#endif
 
-#define RX_RING_SIZE 256
-#define TX_RING_SIZE 512
+#define RX_QUEUE_SIZE 512
+#define TX_QUEUE_SIZE 512
 #define NUM_MBUFS ((64*1024)-1)
-#define MBUF_CACHE_SIZE 250
+#define MBUF_CACHE_SIZE 128
+#if BURST_API
+#define BURST_SIZE 64
+#define SCHED_RX_RING_SZ 8192
+#define SCHED_TX_RING_SZ 65536
+#else
 #define BURST_SIZE 32
-#define RTE_RING_SZ 1024
+#define SCHED_RX_RING_SZ 1024
+#define SCHED_TX_RING_SZ 1024
+#endif
+#define BURST_SIZE_TX 32
 
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
 
+#define ANSI_COLOR_RED     "\x1b[31m"
+#define ANSI_COLOR_RESET   "\x1b[0m"
+
 /* mask of enabled ports */
 static uint32_t enabled_port_mask;
 volatile uint8_t quit_signal;
 volatile uint8_t quit_signal_rx;
+volatile uint8_t quit_signal_dist;
+volatile uint8_t quit_signal_work;
 
 static volatile struct app_stats {
 	struct {
 		uint64_t rx_pkts;
 		uint64_t returned_pkts;
 		uint64_t enqueued_pkts;
+		uint64_t enqdrop_pkts;
 	} rx __rte_cache_aligned;
+	int pad1 __rte_cache_aligned;
+
+	struct {
+		uint64_t in_pkts;
+		uint64_t ret_pkts;
+		uint64_t sent_pkts;
+		uint64_t enqdrop_pkts;
+	} dist __rte_cache_aligned;
+	int pad2 __rte_cache_aligned;
 
 	struct {
 		uint64_t dequeue_pkts;
 		uint64_t tx_pkts;
+		uint64_t enqdrop_pkts;
 	} tx __rte_cache_aligned;
+	int pad3 __rte_cache_aligned;
+
+	uint64_t worker_pkts[64] __rte_cache_aligned;
+
+	int pad4 __rte_cache_aligned;
+
+	uint64_t worker_bursts[64][8] __rte_cache_aligned;
+
+	int pad5 __rte_cache_aligned;
+
+	uint64_t port_rx_pkts[64] __rte_cache_aligned;
+	uint64_t port_tx_pkts[64] __rte_cache_aligned;
 } app_stats;
 
+struct app_stats prev_app_stats;
+
 static const struct rte_eth_conf port_conf_default = {
 	.rxmode = {
 		.mq_mode = ETH_MQ_RX_RSS,
 		.max_rx_pkt_len = ETHER_MAX_LEN,
+		.split_hdr_size = 0,
+		.header_split   = 0, /**< Header Split disabled */
+		.hw_ip_checksum = 1, /**< IP checksum offload enabled */
+		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
+		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
+		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
 	},
 	.txmode = {
 		.mq_mode = ETH_MQ_TX_NONE,
@@ -93,6 +142,8 @@ struct output_buffer {
 	struct rte_mbuf *mbufs[BURST_SIZE];
 };
 
+static void print_stats(void);
+
 /*
  * Initialises a given port using global settings and with the rx buffers
  * coming from the mbuf_pool passed as parameter
@@ -101,9 +152,13 @@ static inline int
 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 {
 	struct rte_eth_conf port_conf = port_conf_default;
-	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
-	int retval;
+	const uint16_t rxRings = 1;
+	uint16_t txRings = rte_lcore_count() - 1;
 	uint16_t q;
+	int retval;
+
+	if (txRings > RTE_MAX_ETHPORTS)
+		txRings = RTE_MAX_ETHPORTS;
 
 	if (port >= rte_eth_dev_count())
 		return -1;
@@ -113,7 +168,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 		return retval;
 
 	for (q = 0; q < rxRings; q++) {
-		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+		retval = rte_eth_rx_queue_setup(port, q, RX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL, mbuf_pool);
 		if (retval < 0)
@@ -121,7 +176,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	}
 
 	for (q = 0; q < txRings; q++) {
-		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+		retval = rte_eth_tx_queue_setup(port, q, TX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL);
 		if (retval < 0)
@@ -134,7 +189,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 
 	struct rte_eth_link link;
 	rte_eth_link_get_nowait(port, &link);
-	if (!link.link_status) {
+	while (!link.link_status) {
+		printf("Waiting for Link up on port %"PRIu8"\n", port);
 		sleep(1);
 		rte_eth_link_get_nowait(port, &link);
 	}
@@ -160,41 +216,52 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 
 struct lcore_params {
 	unsigned worker_id;
-	struct rte_distributor *d;
-	struct rte_ring *r;
+	struct rte_distributor_burst *d;
+	struct rte_ring *rx_dist_ring;
+	struct rte_ring *dist_tx_ring;
 	struct rte_mempool *mem_pool;
 };
 
-static int
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+static inline void
+flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
-	const unsigned num_workers = rte_lcore_count() - 2;
-	unsigned i;
-	struct rte_mbuf *bufs[num_workers];
+	unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+			outbuf->mbufs, outbuf->count);
+	app_stats.tx.tx_pkts += outbuf->count;
 
-	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
-		printf("line %d: Error getting mbufs from pool\n", __LINE__);
-		return -1;
+	if (unlikely(nb_tx < outbuf->count)) {
+		app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
+		do {
+			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
+		} while (++nb_tx < outbuf->count);
 	}
+	outbuf->count = 0;
+}
+
+static inline void
+flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+{
+	uint8_t outp;
 
-	for (i = 0; i < num_workers; i++)
-		bufs[i]->hash.rss = i << 1;
+	for (outp = 0; outp < nb_ports; outp++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << outp)) == 0)
+			continue;
 
-	rte_distributor_process(d, bufs, num_workers);
-	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+		if (tx_buffers[outp].count == 0)
+			continue;
 
-	return 0;
+		flush_one_port(&tx_buffers[outp], outp);
+	}
 }
 
 static int
 lcore_rx(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
-	struct rte_mempool *mem_pool = p->mem_pool;
-	struct rte_ring *r = p->r;
 	const uint8_t nb_ports = rte_eth_dev_count();
 	const int socket_id = rte_socket_id();
 	uint8_t port;
+	struct rte_mbuf *bufs[BURST_SIZE*2];
 
 	for (port = 0; port < nb_ports; port++) {
 		/* skip ports that are not enabled */
@@ -210,6 +277,7 @@ lcore_rx(struct lcore_params *p)
 
 	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
 	port = 0;
+
 	while (!quit_signal_rx) {
 
 		/* skip ports that are not enabled */
@@ -218,7 +286,7 @@ lcore_rx(struct lcore_params *p)
 				port = 0;
 			continue;
 		}
-		struct rte_mbuf *bufs[BURST_SIZE*2];
+
 		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
 				BURST_SIZE);
 		if (unlikely(nb_rx == 0)) {
@@ -228,19 +296,46 @@ lcore_rx(struct lcore_params *p)
 		}
 		app_stats.rx.rx_pkts += nb_rx;
 
-		rte_distributor_process(d, bufs, nb_rx);
-		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
-				bufs, BURST_SIZE*2);
+/*
+ * You can run the distributor on the rx core with this code. Returned
+ * packets are then send straight to the tx core.
+ */
+#if 0
+
+#if BURST_API
+	rte_distributor_process_burst(d, bufs, nb_rx);
+	const uint16_t nb_ret = rte_distributor_returned_pkts_burst(d,
+			bufs, BURST_SIZE*2);
+#else
+	rte_distributor_process(d, bufs, nb_rx);
+	const uint16_t nb_ret = rte_distributor_returned_pkts(d,
+			bufs, BURST_SIZE*2);
+#endif
+
 		app_stats.rx.returned_pkts += nb_ret;
 		if (unlikely(nb_ret == 0)) {
 			if (++port == nb_ports)
 				port = 0;
 			continue;
 		}
-
-		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+		struct rte_ring *tx_ring = p->dist_tx_ring;
+		uint16_t sent = rte_ring_enqueue_burst(tx_ring,
+				(void *)bufs, nb_ret);
+#else
+		uint16_t nb_ret = nb_rx;
+		/*
+		* Swap the following two lines if you want the rx traffic
+		* to go directly to tx, no distribution.
+		*/
+		struct rte_ring *out_ring = p->rx_dist_ring;
+		//struct rte_ring *out_ring = p->dist_tx_ring;
+
+		uint16_t sent = rte_ring_enqueue_burst(out_ring,
+				(void *)bufs, nb_ret);
+#endif
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
+			app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
 			RTE_LOG_DP(DEBUG, DISTRAPP,
 				"%s:Packet loss due to full ring\n", __func__);
 			while (sent < nb_ret)
@@ -249,56 +344,88 @@ lcore_rx(struct lcore_params *p)
 		if (++port == nb_ports)
 			port = 0;
 	}
-	rte_distributor_process(d, NULL, 0);
-	/* flush distributor to bring to known state */
-	rte_distributor_flush(d);
 	/* set worker & tx threads quit flag */
+	printf("\nCore %u exiting rx task.\n", rte_lcore_id());
 	quit_signal = 1;
-	/*
-	 * worker threads may hang in get packet as
-	 * distributor process is not running, just make sure workers
-	 * get packets till quit_signal is actually been
-	 * received and they gracefully shutdown
-	 */
-	if (quit_workers(d, mem_pool) != 0)
-		return -1;
-	/* rx thread should quit at last */
 	return 0;
 }
 
-static inline void
-flush_one_port(struct output_buffer *outbuf, uint8_t outp)
-{
-	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-			outbuf->count);
-	app_stats.tx.tx_pkts += nb_tx;
 
-	if (unlikely(nb_tx < outbuf->count)) {
-		RTE_LOG_DP(DEBUG, DISTRAPP,
-			"%s:Packet loss with tx_burst\n", __func__);
-		do {
-			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
-		} while (++nb_tx < outbuf->count);
-	}
-	outbuf->count = 0;
-}
 
-static inline void
-flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+static int
+lcore_distributor(struct lcore_params *p)
 {
-	uint8_t outp;
-	for (outp = 0; outp < nb_ports; outp++) {
-		/* skip ports that are not enabled */
-		if ((enabled_port_mask & (1 << outp)) == 0)
-			continue;
-
-		if (tx_buffers[outp].count == 0)
-			continue;
-
-		flush_one_port(&tx_buffers[outp], outp);
+	struct rte_ring *in_r = p->rx_dist_ring;
+	struct rte_ring *out_r = p->dist_tx_ring;
+	struct rte_mbuf *bufs[BURST_SIZE * 4];
+	struct rte_distributor_burst *d = p->d;
+
+	printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
+	while (!quit_signal_dist) {
+		const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
+				(void *)bufs, BURST_SIZE*1);
+		if (nb_rx) {
+			app_stats.dist.in_pkts += nb_rx;
+/*
+ * This '#if' allows you to bypass the distributor. Incoming packets may be
+ * sent straight to the tx ring.
+ */
+#if 1
+
+#if BURST_API
+			/* Distribute the packets */
+			rte_distributor_process_burst(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts_burst(d,
+					bufs, BURST_SIZE*2);
+#else
+			/* Distribute the packets */
+			rte_distributor_process(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts(d,
+					bufs, BURST_SIZE*2);
+#endif
+
+#else
+			/* Bypass the distributor */
+			const unsigned int xor_val = (rte_eth_dev_count() > 1);
+			/* Touch the mbuf by xor'ing the port */
+			for (unsigned int i = 0; i < nb_rx; i++)
+				bufs[i]->port ^= xor_val;
+
+			const uint16_t nb_ret = nb_rx;
+#endif
+			if (unlikely(nb_ret == 0))
+				continue;
+			app_stats.dist.ret_pkts += nb_ret;
+
+			uint16_t sent = rte_ring_enqueue_burst(out_r,
+					(void *)bufs, nb_ret);
+			app_stats.dist.sent_pkts += sent;
+			if (unlikely(sent < nb_ret)) {
+				app_stats.dist.enqdrop_pkts += nb_ret - sent;
+				RTE_LOG(DEBUG, DISTRAPP,
+					"%s:Packet loss due to full out ring\n",
+					__func__);
+				while (sent < nb_ret)
+					rte_pktmbuf_free(bufs[sent++]);
+			}
+		}
 	}
+	printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
+	quit_signal_work = 1;
+
+#if BURST_API
+	/* Unblock any returns so workers can exit */
+	rte_distributor_clear_returns_burst(d);
+#endif
+	quit_signal_rx = 1;
+	return 0;
 }
 
+
 static int
 lcore_tx(struct rte_ring *in_r)
 {
@@ -327,9 +454,9 @@ lcore_tx(struct rte_ring *in_r)
 			if ((enabled_port_mask & (1 << port)) == 0)
 				continue;
 
-			struct rte_mbuf *bufs[BURST_SIZE];
+			struct rte_mbuf *bufs[BURST_SIZE_TX];
 			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-					(void *)bufs, BURST_SIZE);
+					(void *)bufs, BURST_SIZE_TX);
 			app_stats.tx.dequeue_pkts += nb_rx;
 
 			/* if we get no traffic, flush anything we have */
@@ -358,11 +485,12 @@ lcore_tx(struct rte_ring *in_r)
 
 				outbuf = &tx_buffers[outp];
 				outbuf->mbufs[outbuf->count++] = bufs[i];
-				if (outbuf->count == BURST_SIZE)
+				if (outbuf->count == BURST_SIZE_TX)
 					flush_one_port(outbuf, outp);
 			}
 		}
 	}
+	printf("\nCore %u exiting tx task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -371,52 +499,147 @@ int_handler(int sig_num)
 {
 	printf("Exiting on signal %d\n", sig_num);
 	/* set quit flag for rx thread to exit */
-	quit_signal_rx = 1;
+	quit_signal_dist = 1;
 }
 
 static void
 print_stats(void)
 {
 	struct rte_eth_stats eth_stats;
-	unsigned i;
-
-	printf("\nRX thread stats:\n");
-	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
-	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
-	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
-
-	printf("\nTX thread stats:\n");
-	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
-	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
+	unsigned int i, j;
+	const unsigned int num_workers = rte_lcore_count() - 4;
 
 	for (i = 0; i < rte_eth_dev_count(); i++) {
 		rte_eth_stats_get(i, &eth_stats);
-		printf("\nPort %u stats:\n", i);
-		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
-		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
-		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
-		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
-		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
+		app_stats.port_rx_pkts[i] = eth_stats.ipackets;
+		app_stats.port_tx_pkts[i] = eth_stats.opackets;
+	}
+
+	printf("\n\nRX Thread:\n");
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsin : %5.2f\n", i,
+				(app_stats.port_rx_pkts[i] -
+				prev_app_stats.port_rx_pkts[i])/1000000.0);
+		prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
+	}
+	printf(" - Received:    %5.2f\n",
+			(app_stats.rx.rx_pkts -
+			prev_app_stats.rx.rx_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.rx.returned_pkts -
+			prev_app_stats.rx.returned_pkts)/1000000.0);
+	printf(" - Enqueued:    %5.2f\n",
+			(app_stats.rx.enqueued_pkts -
+			prev_app_stats.rx.enqueued_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.rx.enqdrop_pkts -
+			prev_app_stats.rx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("Distributor thread:\n");
+	printf(" - In:          %5.2f\n",
+			(app_stats.dist.in_pkts -
+			prev_app_stats.dist.in_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.dist.ret_pkts -
+			prev_app_stats.dist.ret_pkts)/1000000.0);
+	printf(" - Sent:        %5.2f\n",
+			(app_stats.dist.sent_pkts -
+			prev_app_stats.dist.sent_pkts)/1000000.0);
+	printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.dist.enqdrop_pkts -
+			prev_app_stats.dist.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("TX thread:\n");
+	printf(" - Dequeued:    %5.2f\n",
+			(app_stats.tx.dequeue_pkts -
+			prev_app_stats.tx.dequeue_pkts)/1000000.0);
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsout: %5.2f\n",
+				i, (app_stats.port_tx_pkts[i] -
+				prev_app_stats.port_tx_pkts[i])/1000000.0);
+		prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
+	}
+	printf(" - Transmitted: %5.2f\n",
+			(app_stats.tx.tx_pkts -
+			prev_app_stats.tx.tx_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.tx.enqdrop_pkts -
+			prev_app_stats.tx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
+	prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
+	prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
+	prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
+	prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
+	prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
+	prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
+	prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
+	prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
+	prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
+	prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
+
+	for (i = 0; i < num_workers; i++) {
+		printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
+				(app_stats.worker_pkts[i] -
+				prev_app_stats.worker_pkts[i])/1000000.0);
+		for (j = 0; j < 8; j++)
+			printf("%ld ", app_stats.worker_bursts[i][j]);
+		printf("\n");
+		prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
 	}
 }
 
 static int
 lcore_worker(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
+	struct rte_distributor_burst *d = p->d;
 	const unsigned id = p->worker_id;
+	unsigned int num = 0;
+	unsigned int i;
+
 	/*
 	 * for single port, xor_val will be zero so we won't modify the output
 	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
 	 */
 	const unsigned xor_val = (rte_eth_dev_count() > 1);
-	struct rte_mbuf *buf = NULL;
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+
+	for (i = 0; i < 8; i++)
+		buf[i] = NULL;
+
+	app_stats.worker_pkts[p->worker_id] = 1;
+
 
 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
-	while (!quit_signal) {
-		buf = rte_distributor_get_pkt(d, id, buf);
-		buf->port ^= xor_val;
+	while (!quit_signal_work) {
+
+#if BURST_API
+		num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+		/* Do a little bit of work for each packet */
+		for (i = 0; i < num; i++) {
+			uint64_t t = __rdtsc()+100;
+
+			while (__rdtsc() < t)
+				rte_pause();
+			buf[i]->port ^= xor_val;
+		}
+#else
+		buf[0] = rte_distributor_get_pkt(d, id, buf[0]);
+		uint64_t t = __rdtsc() + 10;
+
+		while (__rdtsc() < t)
+			rte_pause();
+		buf[0]->port ^= xor_val;
+#endif
+
+		app_stats.worker_pkts[p->worker_id] += num;
+		if (num > 0)
+			app_stats.worker_bursts[p->worker_id][num-1]++;
 	}
+	printf("\nCore %u exiting worker task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -496,12 +719,14 @@ int
 main(int argc, char *argv[])
 {
 	struct rte_mempool *mbuf_pool;
-	struct rte_distributor *d;
-	struct rte_ring *output_ring;
+	struct rte_distributor_burst *d;
+	struct rte_ring *dist_tx_ring;
+	struct rte_ring *rx_dist_ring;
 	unsigned lcore_id, worker_id = 0;
 	unsigned nb_ports;
 	uint8_t portid;
 	uint8_t nb_ports_available;
+	uint64_t t, freq;
 
 	/* catch ctrl-c so we can print on exit */
 	signal(SIGINT, int_handler);
@@ -518,10 +743,12 @@ main(int argc, char *argv[])
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-	if (rte_lcore_count() < 3)
+	if (rte_lcore_count() < 5)
 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
-				"least 3 logical cores to run:\n"
-				"1 lcore for packet RX and distribution\n"
+				"least 5 logical cores to run:\n"
+				"1 lcore for stats (can be core 0)\n"
+				"1 lcore for packet RX\n"
+				"1 lcore for distribution\n"
 				"1 lcore for packet TX\n"
 				"and at least 1 lcore for worker threads\n");
 
@@ -560,41 +787,86 @@ main(int argc, char *argv[])
 				"All available ports are disabled. Please set portmask.\n");
 	}
 
+#if BURST_API
+	d = rte_distributor_create_burst("PKT_DIST", rte_socket_id(),
+			rte_lcore_count() - 4);
+#else
 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-			rte_lcore_count() - 2);
+			rte_lcore_count() - 4);
+#endif
 	if (d == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
 
 	/*
-	 * scheduler ring is read only by the transmitter core, but written to
-	 * by multiple threads
+	 * scheduler ring is read by the transmitter core, and written to
+	 * by scheduler core
 	 */
-	output_ring = rte_ring_create("Output_ring", RTE_RING_SZ,
-			rte_socket_id(), RING_F_SC_DEQ);
-	if (output_ring == NULL)
+	dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (dist_tx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
+
+	rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (rx_dist_ring == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-		if (worker_id == rte_lcore_count() - 2)
+		if (worker_id == rte_lcore_count() - 3) {
+			printf("Starting distributor on lcore_id %d\n",
+					lcore_id);
+			/* distributor core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d,
+					rx_dist_ring, dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch(
+					(lcore_function_t *)lcore_distributor,
+					p, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 4) {
+			printf("Starting tx  on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* tx core */
 			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
-					output_ring, lcore_id);
-		else {
+					dist_tx_ring, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 2) {
+			printf("Starting rx on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* rx core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+					p, lcore_id);
+		} else {
+			printf("Starting worker on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
 			struct lcore_params *p =
 					rte_malloc(NULL, sizeof(*p), 0);
 			if (!p)
 				rte_panic("malloc failure\n");
-			*p = (struct lcore_params){worker_id, d, output_ring, mbuf_pool};
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
 
 			rte_eal_remote_launch((lcore_function_t *)lcore_worker,
 					p, lcore_id);
 		}
 		worker_id++;
 	}
-	/* call lcore_main on master core only */
-	struct lcore_params p = { 0, d, output_ring, mbuf_pool};
 
-	if (lcore_rx(&p) != 0)
-		return -1;
+	freq = rte_get_timer_hz();
+	t = __rdtsc() + freq;
+	while (!quit_signal_dist) {
+		if (t < __rdtsc()) {
+			print_stats();
+			t = _rdtsc() + freq;
+		}
+	}
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 		if (rte_eal_wait_lcore(lcore_id) < 0)
-- 
2.7.4

^ permalink raw reply related

* Re: [PATCH v2 15/18] net/ixgbe: parse flow director filter
From: Zhao1, Wei @ 2017-01-03  3:05 UTC (permalink / raw)
  To: Xing, Beilei, dev@dpdk.org; +Cc: Lu, Wenzhuo
In-Reply-To: <94479800C636CB44BD422CB454846E013158CF29@SHSMSX101.ccr.corp.intel.com>

Hi, beilei



> -----Original Message-----

> From: Xing, Beilei

> Sent: Monday, January 2, 2017 11:24 PM

> To: Zhao1, Wei <wei.zhao1@intel.com>; dev@dpdk.org

> Cc: Zhao1, Wei <wei.zhao1@intel.com>; Lu, Wenzhuo

> <wenzhuo.lu@intel.com>

> Subject: RE: [dpdk-dev] [PATCH v2 15/18] net/ixgbe: parse flow director filter

>

>

> > -----Original Message-----

> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Wei Zhao

> > Sent: Friday, December 30, 2016 3:53 PM

> > To: dev@dpdk.org<mailto:dev@dpdk.org>

> > Cc: Zhao1, Wei <wei.zhao1@intel.com<mailto:wei.zhao1@intel.com>>; Lu, Wenzhuo

> > <wenzhuo.lu@intel.com<mailto:wenzhuo.lu@intel.com>>

> > Subject: [dpdk-dev] [PATCH v2 15/18] net/ixgbe: parse flow director

> > filter

> >

> > check if the rule is a flow director rule, and get the flow director info.

> >

> > Signed-off-by: Wei Zhao <wei.zhao1@intel.com<mailto:wei.zhao1@intel.com>>

> > Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com<mailto:wenzhuo.lu@intel.com>>

> >

> > ---

> >

> > v2:add new error set function

> > ---

> >  drivers/net/ixgbe/ixgbe_ethdev.c | 1467

> > +++++++++++++++++++++++++++++++++-----

> >  drivers/net/ixgbe/ixgbe_ethdev.h |   16 +

> >  drivers/net/ixgbe/ixgbe_fdir.c   |  247 ++++---

> >  lib/librte_ether/rte_flow.h      |   23 +

> >  4 files changed, 1495 insertions(+), 258 deletions(-)

> >

> > diff --git a/drivers/net/ixgbe/ixgbe_ethdev.c

> > b/drivers/net/ixgbe/ixgbe_ethdev.c

> > index 0a5ac4f..c98aa0d 100644

> > --- a/drivers/net/ixgbe/ixgbe_ethdev.c

> > +++ b/drivers/net/ixgbe/ixgbe_ethdev.c

> > @@ -438,6 +438,31 @@ ixgbe_validate_l2_tn_filter(struct rte_eth_dev

> *dev,

> >                                          struct rte_eth_l2_tunnel_conf *rule,

> >                                          struct rte_flow_error *error);

> >  static int

> > +ixgbe_validate_fdir_filter(struct rte_eth_dev *dev,

> > +                                      const struct rte_flow_attr *attr,

> > +                                      const struct rte_flow_item pattern[],

> > +                                      const struct rte_flow_action actions[],

> > +                                      struct ixgbe_fdir_rule *rule,

> > +                                      struct rte_flow_error *error);

> > +static int

> > +ixgbe_parse_fdir_filter_normal(const struct rte_flow_attr *attr,

> > +                      const struct rte_flow_item pattern[],

> > +                      const struct rte_flow_action actions[],

> > +                      struct ixgbe_fdir_rule *rule,

> > +                      struct rte_flow_error *error);

> > +static int

> > +ixgbe_parse_fdir_filter_tunnel(const struct rte_flow_attr *attr,

> > +                      const struct rte_flow_item pattern[],

> > +                      const struct rte_flow_action actions[],

> > +                      struct ixgbe_fdir_rule *rule,

> > +                      struct rte_flow_error *error);

> > +static int

> > +ixgbe_parse_fdir_filter(const struct rte_flow_attr *attr,

> > +                      const struct rte_flow_item pattern[],

> > +                      const struct rte_flow_action actions[],

> > +                      struct ixgbe_fdir_rule *rule,

> > +                      struct rte_flow_error *error);

> > +static int

> >  ixgbe_flow_validate(__rte_unused struct rte_eth_dev *dev,

> >                          const struct rte_flow_attr *attr,

> >                          const struct rte_flow_item pattern[], @@ -1475,6 +1500,8

> @@ static

> > int ixgbe_fdir_filter_init(struct rte_eth_dev

> > *eth_dev)

> >                                               "Failed to allocate memory for fdir hash map!");

> >                          return -ENOMEM;

> >          }

> > +      fdir_info->mask_added = FALSE;

> > +

> >          return 0;

> >  }

> >

> > @@ -8951,117 +8978,22 @@ ixgbe_parse_syn_filter(const struct

> > rte_flow_attr *attr,

> >          return 0;

> >  }

> >

> > -/**

> > - * Parse the rule to see if it is a L2 tunnel rule.

> > - * And get the L2 tunnel filter info BTW.

> > - * Only support E-tag now.

> > - */

> > +/* Parse to get the attr and action info of flow director rule. */

> >  static int

> > -cons_parse_l2_tn_filter(const struct rte_flow_attr *attr,

> > -                                       const struct rte_flow_item pattern[],

> > -                                       const struct rte_flow_action actions[],

> > -                                       struct rte_eth_l2_tunnel_conf *filter,

> > -                                       struct rte_flow_error *error)

>

> Why do u remove functions added in patch 14/18? If the functions don't be

> needed, how about changing patch 14/18?



This patch is generate in a very complicated way.

If you go on reading  this patch, you will find the following statement, path add the functions back itself latter

So the git seem to generate a patch in a  peculiar way.



+static int

+cons_parse_l2_tn_filter(const struct rte_flow_attr *attr,

+                                             const struct rte_flow_item pattern[],

+                                             const struct rte_flow_action actions[],

+                                             struct rte_eth_l2_tunnel_conf *filter,

+                                             struct rte_flow_error *error)

^ permalink raw reply

* [WARNING: A/V UNSCANNABLE][PATCH v3 6/6] doc: distributor library changes for new burst api
From: David Hunt @ 2017-01-02 10:22 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483352546-171068-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 doc/guides/prog_guide/packet_distrib_lib.rst | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/doc/guides/prog_guide/packet_distrib_lib.rst b/doc/guides/prog_guide/packet_distrib_lib.rst
index b5bdabb..dffd4ad 100644
--- a/doc/guides/prog_guide/packet_distrib_lib.rst
+++ b/doc/guides/prog_guide/packet_distrib_lib.rst
@@ -42,6 +42,10 @@ The model of operation is shown in the diagram below.
 
    Packet Distributor mode of operation
 
+There are two versions of the API in the distributor Library, one which sends one packet at a time to workers,
+and another which sends bursts of up to 8 packets at a time to workers. The functions names of the second API
+are identified by "_burst", and must not be intermixed with the single packet API. The operations described below
+apply to both API's, select which API you wish to use by including the relevant header file.
 
 Distributor Core Operation
 --------------------------
-- 
2.7.4

^ permalink raw reply related

* Re: [PATCH v2 16/18] net/ixgbe: create consistent filter
From: Zhao1, Wei @ 2017-01-03  3:09 UTC (permalink / raw)
  To: Xing, Beilei; +Cc: dev@dpdk.org
In-Reply-To: <94479800C636CB44BD422CB454846E013158D036@SHSMSX101.ccr.corp.intel.com>


Hi ,beilei
> -----Original Message-----
> From: Xing, Beilei
> Sent: Tuesday, January 3, 2017 10:34 AM
> To: Zhao1, Wei <wei.zhao1@intel.com>
> Subject: RE: [dpdk-dev] [PATCH v2 16/18] net/ixgbe: create consistent filter
> 
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Wei Zhao
> > Sent: Friday, December 30, 2016 3:53 PM
> > To: dev@dpdk.org
> > Cc: Zhao1, Wei <wei.zhao1@intel.com>; Lu, Wenzhuo
> > <wenzhuo.lu@intel.com>
> > Subject: [dpdk-dev] [PATCH v2 16/18] net/ixgbe: create consistent
> > filter
> >
> > This patch adds a function to create the flow directory filter.
> >
> > Signed-off-by: Wei Zhao <wei.zhao1@intel.com>
> > Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com>
> >
> > ---
> >
> > v2:
> > --add new error set function
> > ---
> >  drivers/net/ixgbe/ixgbe_ethdev.c | 240
> > ++++++++++++++++++++++++++++++++++++++-
> >  drivers/net/ixgbe/ixgbe_ethdev.h |   5 +
> >  2 files changed, 244 insertions(+), 1 deletion(-)
> >
> > @@ -1380,6 +1429,14 @@ eth_ixgbe_dev_init(struct rte_eth_dev
> *eth_dev)
> >
> >  	/* initialize l2 tunnel filter list & hash */
> >  	ixgbe_l2_tn_filter_init(eth_dev);
> > +
> > +	TAILQ_INIT(&filter_ntuple_list);
> > +	TAILQ_INIT(&filter_ethertype_list);
> > +	TAILQ_INIT(&filter_syn_list);
> > +	TAILQ_INIT(&filter_fdir_list);
> > +	TAILQ_INIT(&filter_l2_tunnel_list);
> > +	TAILQ_INIT(&ixgbe_flow_list);
> 
> You need to destroy these lists in dev_unint function.
> 
> > +
> >  	return 0;
> >  }
> >

Yes, I have add a function ixgbe_filterlist_flush()  in 18/18 in dev_unint patch to flush these list.

^ permalink raw reply

* Re: [PATCH v2 16/18] net/ixgbe: create consistent filter
From: Zhao1, Wei @ 2017-01-03  3:11 UTC (permalink / raw)
  To: Xing, Beilei, dev@dpdk.org; +Cc: Lu, Wenzhuo
In-Reply-To: <94479800C636CB44BD422CB454846E013158D005@SHSMSX101.ccr.corp.intel.com>

Hi,beilei

> -----Original Message-----
> From: Xing, Beilei
> Sent: Tuesday, January 3, 2017 10:04 AM
> To: Zhao1, Wei <wei.zhao1@intel.com>; dev@dpdk.org
> Cc: Zhao1, Wei <wei.zhao1@intel.com>; Lu, Wenzhuo
> <wenzhuo.lu@intel.com>
> Subject: RE: [dpdk-dev] [PATCH v2 16/18] net/ixgbe: create consistent filter
> 
> 
> 
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Wei Zhao
> > Sent: Friday, December 30, 2016 3:53 PM
> > To: dev@dpdk.org
> > Cc: Zhao1, Wei <wei.zhao1@intel.com>; Lu, Wenzhuo
> > <wenzhuo.lu@intel.com>
> > Subject: [dpdk-dev] [PATCH v2 16/18] net/ixgbe: create consistent
> > filter
> >
> > This patch adds a function to create the flow directory filter.
> >
> > Signed-off-by: Wei Zhao <wei.zhao1@intel.com>
> > Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com>
> >
> > ---
> >
> > v2:
> > --add new error set function
> > ---
> >  drivers/net/ixgbe/ixgbe_ethdev.c | 240
> > ++++++++++++++++++++++++++++++++++++++-
> >  drivers/net/ixgbe/ixgbe_ethdev.h |   5 +
> >  2 files changed, 244 insertions(+), 1 deletion(-)
> >
> > diff --git a/drivers/net/ixgbe/ixgbe_ethdev.c
> > b/drivers/net/ixgbe/ixgbe_ethdev.c
> > index c98aa0d..1c857fc 100644
> > --- a/drivers/net/ixgbe/ixgbe_ethdev.c
> > +++ b/drivers/net/ixgbe/ixgbe_ethdev.c
> > @@ -468,6 +468,11 @@ ixgbe_flow_validate(__rte_unused struct
> > rte_eth_dev *dev,
> >  		const struct rte_flow_item pattern[],
> >  		const struct rte_flow_action actions[],
> >  		struct rte_flow_error *error);
> > +static struct rte_flow *ixgbe_flow_create(struct rte_eth_dev *dev,
> > +		const struct rte_flow_attr *attr,
> > +		const struct rte_flow_item pattern[],
> > +		const struct rte_flow_action actions[],
> > +		struct rte_flow_error *error);
> >  static int ixgbe_flow_flush(struct rte_eth_dev *dev,
> >  		struct rte_flow_error *error);
> >  /*
> > @@ -850,11 +855,55 @@ static const struct rte_ixgbe_xstats_name_off
> > rte_ixgbevf_stats_strings[] = {
> >  		sizeof(rte_ixgbevf_stats_strings[0]))
> >  static const struct rte_flow_ops ixgbe_flow_ops = {
> >  	ixgbe_flow_validate,
> > -	NULL,
> > +	ixgbe_flow_create,
> >  	NULL,
> >  	ixgbe_flow_flush,
> >  	NULL,
> >  };
> > +/* ntuple filter list structure */
> > +struct ixgbe_ntuple_filter_ele {
> > +	TAILQ_ENTRY(ixgbe_ntuple_filter_ele) entries;
> > +	struct rte_eth_ntuple_filter filter_info; };
> > +/* ethertype filter list structure */ struct
> > +ixgbe_ethertype_filter_ele {
> > +	TAILQ_ENTRY(ixgbe_ethertype_filter_ele) entries;
> > +	struct rte_eth_ethertype_filter filter_info; };
> > +/* syn filter list structure */
> > +struct ixgbe_eth_syn_filter_ele {
> > +	TAILQ_ENTRY(ixgbe_eth_syn_filter_ele) entries;
> > +	struct rte_eth_syn_filter filter_info; };
> > +/* fdir filter list structure */
> > +struct ixgbe_fdir_rule_ele {
> > +	TAILQ_ENTRY(ixgbe_fdir_rule_ele) entries;
> > +	struct ixgbe_fdir_rule filter_info;
> > +};
> > +/* l2_tunnel filter list structure */ struct
> > +ixgbe_eth_l2_tunnel_conf_ele {
> > +	TAILQ_ENTRY(ixgbe_eth_l2_tunnel_conf_ele) entries;
> > +	struct rte_eth_l2_tunnel_conf filter_info; };
> > +/* ixgbe_flow memory list structure */
> 
> Why need to define these structures? There has been structures to store the
> filter before, isn't it?
> 

These list are created to store flow parameters, which is required by rte layer.
 
> > +struct ixgbe_flow_mem {
> > +	TAILQ_ENTRY(ixgbe_flow_mem) entries;
> > +	struct ixgbe_flow *flow;
> > +};
> > +
> > +TAILQ_HEAD(ixgbe_ntuple_filter_list, ixgbe_ntuple_filter_ele); struct
> > +ixgbe_ntuple_filter_list filter_ntuple_list;
> > +TAILQ_HEAD(ixgbe_ethertype_filter_list, ixgbe_ethertype_filter_ele);
> > +struct ixgbe_ethertype_filter_list filter_ethertype_list;
> > +TAILQ_HEAD(ixgbe_syn_filter_list, ixgbe_eth_syn_filter_ele); struct
> > +ixgbe_syn_filter_list filter_syn_list;
> > +TAILQ_HEAD(ixgbe_fdir_rule_filter_list, ixgbe_fdir_rule_ele); struct
> > +ixgbe_fdir_rule_filter_list filter_fdir_list;
> > +TAILQ_HEAD(ixgbe_l2_tunnel_filter_list,
> > +ixgbe_eth_l2_tunnel_conf_ele); struct ixgbe_l2_tunnel_filter_list
> > +filter_l2_tunnel_list; TAILQ_HEAD(ixgbe_flow_mem_list,
> > +ixgbe_flow_mem); struct ixgbe_flow_mem_list ixgbe_flow_list;
> > +
> >  /**
> >   * Atomically reads the link status information from global
> >   * structure rte_eth_dev.
> > @@ -1380,6 +1429,14 @@ eth_ixgbe_dev_init(struct rte_eth_dev
> *eth_dev)
> >
> >  	/* initialize l2 tunnel filter list & hash */
> >  	ixgbe_l2_tn_filter_init(eth_dev);
> > +
> > +	TAILQ_INIT(&filter_ntuple_list);
> > +	TAILQ_INIT(&filter_ethertype_list);
> > +	TAILQ_INIT(&filter_syn_list);
> > +	TAILQ_INIT(&filter_fdir_list);
> > +	TAILQ_INIT(&filter_l2_tunnel_list);
> > +	TAILQ_INIT(&ixgbe_flow_list);
> > +
> >  	return 0;
> >  }
> >

^ permalink raw reply

* Re: [PATCH v2 02/18] net/ixgbe: store flow director filter
From: Zhao1, Wei @ 2017-01-03  3:14 UTC (permalink / raw)
  To: Xing, Beilei, dev@dpdk.org; +Cc: Lu, Wenzhuo
In-Reply-To: <94479800C636CB44BD422CB454846E013158CE0E@SHSMSX101.ccr.corp.intel.com>

Hi, beilei

> -----Original Message-----
> From: Xing, Beilei
> Sent: Monday, January 2, 2017 5:59 PM
> To: Zhao1, Wei <wei.zhao1@intel.com>; dev@dpdk.org
> Cc: Lu, Wenzhuo <wenzhuo.lu@intel.com>; Zhao1, Wei
> <wei.zhao1@intel.com>
> Subject: RE: [dpdk-dev] [PATCH v2 02/18] net/ixgbe: store flow director filter
> 
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Wei Zhao
> > Sent: Friday, December 30, 2016 3:53 PM
> > To: dev@dpdk.org
> > Cc: Lu, Wenzhuo <wenzhuo.lu@intel.com>; Zhao1, Wei
> > <wei.zhao1@intel.com>
> > Subject: [dpdk-dev] [PATCH v2 02/18] net/ixgbe: store flow director
> > filter
> >
> > Add support for storing flow director filter in SW.
> >
> > Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com>
> > Signed-off-by: Wei Zhao <wei.zhao1@intel.com>
> > ---
> >
> > v2:
> > --add a fdir initialization function in device start process
> > ---
> 
> >
> > +static inline struct ixgbe_fdir_filter *
> > +ixgbe_fdir_filter_lookup(struct ixgbe_hw_fdir_info *fdir_info,
> > +			 union ixgbe_atr_input *key)
> > +{
> > +	int ret = 0;
> 
> Needless initialization here since ret will be set below.
> 

Ok, I will make some change in v3

> > +	ret = rte_hash_lookup(fdir_info->hash_handle, (const void *)key);
> > +	if (ret < 0)
> > +		return NULL;
> > +
> > +	return fdir_info->hash_map[ret];
> > +}
> > +
> > +static inline int
> > +ixgbe_insert_fdir_filter(struct ixgbe_hw_fdir_info *fdir_info,
> > +			 struct ixgbe_fdir_filter *fdir_filter) {
> > +	int ret = 0;
> 
> Same above.
> 
> > +
> > +	ret = rte_hash_add_key(fdir_info->hash_handle,
> > +			       &fdir_filter->ixgbe_fdir);
> > +
> > +	if (ret < 0) {
> > +		PMD_DRV_LOG(ERR,
> > +			    "Failed to insert fdir filter to hash table %d!",
> > +			    ret);
> > +		return ret;
> > +	}
> > +
> > +	fdir_info->hash_map[ret] = fdir_filter;
> > +
> > +	TAILQ_INSERT_TAIL(&fdir_info->fdir_list, fdir_filter, entries);
> > +
> > +	return 0;
> > +}
> > +
> > +static inline int
> > +ixgbe_remove_fdir_filter(struct ixgbe_hw_fdir_info *fdir_info,
> > +			 union ixgbe_atr_input *key)
> > +{
> > +	int ret = 0;
> 
> Same above.
> 

Ok, I will make some change in v3

> > +	struct ixgbe_fdir_filter *fdir_filter;
> > +
> > +	ret = rte_hash_del_key(fdir_info->hash_handle, key);
> > +
> > +	if (ret < 0) {
> > +		PMD_DRV_LOG(ERR, "No such fdir filter to delete %d!", ret);
> > +		return ret;
> > +	}
> > +
> > +	fdir_filter = fdir_info->hash_map[ret];
> > +	fdir_info->hash_map[ret] = NULL;
> > +
> > +	TAILQ_REMOVE(&fdir_info->fdir_list, fdir_filter, entries);
> > +	rte_free(fdir_filter);
> > +
> > +	return 0;
> > +}
> > +
> >  /*
> >   * ixgbe_add_del_fdir_filter - add or remove a flow diretor filter.
> >   * @dev: pointer to the structure rte_eth_dev @@ -1098,6 +1158,8 @@
> > ixgbe_add_del_fdir_filter(struct rte_eth_dev *dev,
> >  	struct ixgbe_hw_fdir_info *info =
> >  		IXGBE_DEV_PRIVATE_TO_FDIR_INFO(dev->data-
> >dev_private);
> >  	enum rte_fdir_mode fdir_mode = dev->data-
> >dev_conf.fdir_conf.mode;
> > +	struct ixgbe_fdir_filter *node;
> > +	bool add_node = FALSE;
> >
> >  	if (fdir_mode == RTE_FDIR_MODE_NONE)
> >  		return -ENOTSUP;
> > @@ -1148,6 +1210,10 @@ ixgbe_add_del_fdir_filter(struct rte_eth_dev
> *dev,
> >  						      dev->data-
> >dev_conf.fdir_conf.pballoc);
> >
> >  	if (del) {
> > +		err = ixgbe_remove_fdir_filter(info, &input);
> > +		if (err < 0)
> > +			return err;
> > +
> >  		err = fdir_erase_filter_82599(hw, fdirhash);
> >  		if (err < 0)
> >  			PMD_DRV_LOG(ERR, "Fail to delete FDIR filter!");
> @@ -1172,6
> > +1238,37 @@ ixgbe_add_del_fdir_filter(struct rte_eth_dev *dev,
> >  	else
> >  		return -EINVAL;
> >
> > +	node = ixgbe_fdir_filter_lookup(info, &input);
> > +	if (node) {
> > +		if (update) {
> > +			node->fdirflags = fdircmd_flags;
> > +			node->fdirhash = fdirhash;
> > +			node->queue = queue;
> > +		} else {
> > +			PMD_DRV_LOG(ERR, "Conflict with existing fdir
> filter!");
> > +			return -EINVAL;
> > +		}
> > +	} else {
> > +		add_node = TRUE;
> > +		node = rte_zmalloc("ixgbe_fdir",
> > +				   sizeof(struct ixgbe_fdir_filter),
> > +				   0);
> > +		if (!node)
> > +			return -ENOMEM;
> > +		(void)rte_memcpy(&node->ixgbe_fdir,
> > +				 &input,
> > +				 sizeof(union ixgbe_atr_input));
> > +		node->fdirflags = fdircmd_flags;
> > +		node->fdirhash = fdirhash;
> > +		node->queue = queue;
> > +
> > +		err = ixgbe_insert_fdir_filter(info, node);
> > +		if (err < 0) {
> > +			rte_free(node);
> > +			return err;
> > +		}
> > +	}
> > +
> >  	if (is_perfect) {
> >  		err = fdir_write_perfect_filter_82599(hw, &input, queue,
> >  						      fdircmd_flags, fdirhash,
> > @@ -1180,10 +1277,14 @@ ixgbe_add_del_fdir_filter(struct rte_eth_dev
> > *dev,
> >  		err = fdir_add_signature_filter_82599(hw, &input, queue,
> >  						      fdircmd_flags, fdirhash);
> >  	}
> > -	if (err < 0)
> > +	if (err < 0) {
> >  		PMD_DRV_LOG(ERR, "Fail to add FDIR filter!");
> > -	else
> > +
> > +		if (add_node)
> > +			(void)ixgbe_remove_fdir_filter(info, &input);
> > +	} else {
> >  		PMD_DRV_LOG(DEBUG, "Success to add FDIR filter");
> > +	}
> >
> >  	return err;
> >  }
> > --
> > 2.5.5


^ permalink raw reply

* Re: [PATCH v2 15/18] net/ixgbe: parse flow director filter
From: Zhao1, Wei @ 2017-01-03  3:19 UTC (permalink / raw)
  To: Xing, Beilei, dev@dpdk.org; +Cc: Lu, Wenzhuo
In-Reply-To: <94479800C636CB44BD422CB454846E013158CF29@SHSMSX101.ccr.corp.intel.com>

Hi, beilei

> -----Original Message-----
> From: Xing, Beilei
> Sent: Monday, January 2, 2017 11:24 PM
> To: Zhao1, Wei <wei.zhao1@intel.com>; dev@dpdk.org
> Cc: Zhao1, Wei <wei.zhao1@intel.com>; Lu, Wenzhuo
> <wenzhuo.lu@intel.com>
> Subject: RE: [dpdk-dev] [PATCH v2 15/18] net/ixgbe: parse flow director filter
> 
> 
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Wei Zhao
> > Sent: Friday, December 30, 2016 3:53 PM
> > To: dev@dpdk.org
> > Cc: Zhao1, Wei <wei.zhao1@intel.com>; Lu, Wenzhuo
> > <wenzhuo.lu@intel.com>
> > Subject: [dpdk-dev] [PATCH v2 15/18] net/ixgbe: parse flow director
> > filter
> >
> > check if the rule is a flow director rule, and get the flow director info.
> >
> > Signed-off-by: Wei Zhao <wei.zhao1@intel.com>
> > Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com>
> >
> > ---
> >
> > v2:add new error set function
> > ---
> >  drivers/net/ixgbe/ixgbe_ethdev.c | 1467
> > +++++++++++++++++++++++++++++++++-----
> >  drivers/net/ixgbe/ixgbe_ethdev.h |   16 +
> >  drivers/net/ixgbe/ixgbe_fdir.c   |  247 ++++---
> >  lib/librte_ether/rte_flow.h      |   23 +
> >  4 files changed, 1495 insertions(+), 258 deletions(-)
> >
> > diff --git a/drivers/net/ixgbe/ixgbe_ethdev.c
> > b/drivers/net/ixgbe/ixgbe_ethdev.c
> > index 0a5ac4f..c98aa0d 100644
> > --- a/drivers/net/ixgbe/ixgbe_ethdev.c
> > +++ b/drivers/net/ixgbe/ixgbe_ethdev.c
> > @@ -438,6 +438,31 @@ ixgbe_validate_l2_tn_filter(struct rte_eth_dev
> *dev,
> >  			struct rte_eth_l2_tunnel_conf *rule,
> >  			struct rte_flow_error *error);
> >  static int
> > +ixgbe_validate_fdir_filter(struct rte_eth_dev *dev,
> > +			const struct rte_flow_attr *attr,
> > +			const struct rte_flow_item pattern[],
> > +			const struct rte_flow_action actions[],
> > +			struct ixgbe_fdir_rule *rule,
> > +			struct rte_flow_error *error);
> > +static int
> > +ixgbe_parse_fdir_filter_normal(const struct rte_flow_attr *attr,
> > +		const struct rte_flow_item pattern[],
> > +		const struct rte_flow_action actions[],
> > +		struct ixgbe_fdir_rule *rule,
> > +		struct rte_flow_error *error);
> > +static int
> > +ixgbe_parse_fdir_filter_tunnel(const struct rte_flow_attr *attr,
> > +		const struct rte_flow_item pattern[],
> > +		const struct rte_flow_action actions[],
> > +		struct ixgbe_fdir_rule *rule,
> > +		struct rte_flow_error *error);
> > +static int
> > +ixgbe_parse_fdir_filter(const struct rte_flow_attr *attr,
> > +		const struct rte_flow_item pattern[],
> > +		const struct rte_flow_action actions[],
> > +		struct ixgbe_fdir_rule *rule,
> > +		struct rte_flow_error *error);
> > +static int
> >  ixgbe_flow_validate(__rte_unused struct rte_eth_dev *dev,
> >  		const struct rte_flow_attr *attr,
> >  		const struct rte_flow_item pattern[], @@ -1475,6 +1500,8
> @@ static
> > int ixgbe_fdir_filter_init(struct rte_eth_dev
> > *eth_dev)
> >  			     "Failed to allocate memory for fdir hash map!");
> >  		return -ENOMEM;
> >  	}
> > +	fdir_info->mask_added = FALSE;
> > +
> >  	return 0;
> >  }
> >
> > @@ -8951,117 +8978,22 @@ ixgbe_parse_syn_filter(const struct
> > rte_flow_attr *attr,
> >  	return 0;
> >  }
> >
> > -/**
> > - * Parse the rule to see if it is a L2 tunnel rule.
> > - * And get the L2 tunnel filter info BTW.
> > - * Only support E-tag now.
> > - */
> > +/* Parse to get the attr and action info of flow director rule. */
> >  static int
> > -cons_parse_l2_tn_filter(const struct rte_flow_attr *attr,
> > -			const struct rte_flow_item pattern[],
> > -			const struct rte_flow_action actions[],
> > -			struct rte_eth_l2_tunnel_conf *filter,
> > -			struct rte_flow_error *error)
> 
> Why do u remove functions added in patch 14/18? If the functions don't be
> needed, how about changing patch 14/18?

This patch is generate in a very complicated way. 
If you go on reading  this patch, you will find the following statement, path add the functions back itself latter
So the git seem to generate a patch in a  peculiar way.

+static int
+cons_parse_l2_tn_filter(const struct rte_flow_attr *attr,
+                                             const struct rte_flow_item pattern[],
+                                             const struct rte_flow_action actions[],
+                                             struct rte_eth_l2_tunnel_conf *filter,
+                                             struct rte_flow_error *error)

^ permalink raw reply

* Re: [PATCH v4 17/17] net/i40e: flush tunnel filters
From: Guo, Jia @ 2017-01-03  3:25 UTC (permalink / raw)
  To: Beilei Xing, jingjing.wu, helin.zhang; +Cc: dev
In-Reply-To: <1483068352-32272-18-git-send-email-beilei.xing@intel.com>



On 12/30/2016 11:25 AM, Beilei Xing wrote:
> This patch adds i40e_tunnel_filter_flush function
> to flush all tunnel filters, including filters in
> SW and HW.
>
> Signed-off-by: Beilei Xing <beilei.xing@intel.com>
> ---
>   drivers/net/i40e/i40e_flow.c | 37 +++++++++++++++++++++++++++++++++++++
>   1 file changed, 37 insertions(+)
>
> diff --git a/drivers/net/i40e/i40e_flow.c b/drivers/net/i40e/i40e_flow.c
> index 5c16fb8..fd0f0ec 100644
> --- a/drivers/net/i40e/i40e_flow.c
> +++ b/drivers/net/i40e/i40e_flow.c
> @@ -105,6 +105,7 @@ static int i40e_dev_destroy_tunnel_filter(struct i40e_pf *pf,
>   					  struct i40e_tunnel_filter *filter);
>   static int i40e_fdir_filter_flush(struct i40e_pf *pf);
>   static int i40e_ethertype_filter_flush(struct i40e_pf *pf);
> +static int i40e_tunnel_filter_flush(struct i40e_pf *pf);
>   
>   const struct rte_flow_ops i40e_flow_ops = {
>   	.validate = i40e_flow_validate,
> @@ -1671,6 +1672,14 @@ i40e_flow_flush(struct rte_eth_dev *dev, struct rte_flow_error *error)
>   		return -rte_errno;
>   	}
>   
> +	ret = i40e_tunnel_filter_flush(pf);
> +	if (ret) {
> +		rte_flow_error_set(error, -ret,
> +				   RTE_FLOW_ERROR_TYPE_HANDLE, NULL,
> +				   "Failed to flush tunnel flows.");
> +		return -rte_errno;
> +	}
> +
>   	return ret;
>   }
>   
> @@ -1733,3 +1742,31 @@ i40e_ethertype_filter_flush(struct i40e_pf *pf)
>   
>   	return ret;
>   }
> +
> +/* Flush all tunnel filters */
> +static int
> +i40e_tunnel_filter_flush(struct i40e_pf *pf)
> +{
> +	struct i40e_tunnel_filter_list
> +		*tunnel_list = &pf->tunnel.tunnel_list;
> +	struct i40e_tunnel_filter *f;
Just a mini comment, could you replace the variables name from "f" to 
any other obvious name , to let code to be more readable.
>   
> +	struct i40e_flow *flow;
> +	void *temp;
> +	int ret = 0;
> +
> +	while ((f = TAILQ_FIRST(tunnel_list))) {
> +		ret = i40e_dev_destroy_tunnel_filter(pf, f);
> +		if (ret)
> +			return ret;
> +	}
> +
> +	/* Delete tunnel flows in flow list. */
> +	TAILQ_FOREACH_SAFE(flow, &pf->flow_list, node, temp) {
> +		if (flow->filter_type == RTE_ETH_FILTER_TUNNEL) {
> +			TAILQ_REMOVE(&pf->flow_list, flow, node);
> +			rte_free(flow);
> +		}
> +	}
> +
> +	return ret;
> +}

^ permalink raw reply

* Re: [PATCH v4 17/17] net/i40e: flush tunnel filters
From: Xing, Beilei @ 2017-01-03  4:49 UTC (permalink / raw)
  To: Guo, Jia, Wu, Jingjing, Zhang, Helin; +Cc: dev@dpdk.org
In-Reply-To: <3f58d1aa-a688-0281-6ffe-db37124c3680@intel.com>

Hi Jeff,

> -----Original Message-----
> From: Guo, Jia
> Sent: Tuesday, January 3, 2017 11:25 AM
> To: Xing, Beilei <beilei.xing@intel.com>; Wu, Jingjing
> <jingjing.wu@intel.com>; Zhang, Helin <helin.zhang@intel.com>
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v4 17/17] net/i40e: flush tunnel filters
> 
> 
> 
> On 12/30/2016 11:25 AM, Beilei Xing wrote:
> > This patch adds i40e_tunnel_filter_flush function to flush all tunnel
> > filters, including filters in SW and HW.
> >
> > Signed-off-by: Beilei Xing <beilei.xing@intel.com>
> > ---
> >   drivers/net/i40e/i40e_flow.c | 37
> +++++++++++++++++++++++++++++++++++++
> >   1 file changed, 37 insertions(+)
> >
> > diff --git a/drivers/net/i40e/i40e_flow.c
> > b/drivers/net/i40e/i40e_flow.c index 5c16fb8..fd0f0ec 100644
> > --- a/drivers/net/i40e/i40e_flow.c
> > +++ b/drivers/net/i40e/i40e_flow.c
> > @@ -105,6 +105,7 @@ static int i40e_dev_destroy_tunnel_filter(struct
> i40e_pf *pf,
> >   					  struct i40e_tunnel_filter *filter);
> >   static int i40e_fdir_filter_flush(struct i40e_pf *pf);
> >   static int i40e_ethertype_filter_flush(struct i40e_pf *pf);
> > +static int i40e_tunnel_filter_flush(struct i40e_pf *pf);
> >
> >   const struct rte_flow_ops i40e_flow_ops = {
> >   	.validate = i40e_flow_validate,
> > @@ -1671,6 +1672,14 @@ i40e_flow_flush(struct rte_eth_dev *dev, struct
> rte_flow_error *error)
> >   		return -rte_errno;
> >   	}
> >
> > +	ret = i40e_tunnel_filter_flush(pf);
> > +	if (ret) {
> > +		rte_flow_error_set(error, -ret,
> > +				   RTE_FLOW_ERROR_TYPE_HANDLE, NULL,
> > +				   "Failed to flush tunnel flows.");
> > +		return -rte_errno;
> > +	}
> > +
> >   	return ret;
> >   }
> >
> > @@ -1733,3 +1742,31 @@ i40e_ethertype_filter_flush(struct i40e_pf *pf)
> >
> >   	return ret;
> >   }
> > +
> > +/* Flush all tunnel filters */
> > +static int
> > +i40e_tunnel_filter_flush(struct i40e_pf *pf) {
> > +	struct i40e_tunnel_filter_list
> > +		*tunnel_list = &pf->tunnel.tunnel_list;
> > +	struct i40e_tunnel_filter *f;
> Just a mini comment, could you replace the variables name from "f" to any
> other obvious name , to let code to be more readable.

Thanks for comments, and will update in next version.

> >
> > +	struct i40e_flow *flow;
> > +	void *temp;
> > +	int ret = 0;
> > +
> > +	while ((f = TAILQ_FIRST(tunnel_list))) {
> > +		ret = i40e_dev_destroy_tunnel_filter(pf, f);
> > +		if (ret)
> > +			return ret;
> > +	}
> > +
> > +	/* Delete tunnel flows in flow list. */
> > +	TAILQ_FOREACH_SAFE(flow, &pf->flow_list, node, temp) {
> > +		if (flow->filter_type == RTE_ETH_FILTER_TUNNEL) {
> > +			TAILQ_REMOVE(&pf->flow_list, flow, node);
> > +			rte_free(flow);
> > +		}
> > +	}
> > +
> > +	return ret;
> > +}

^ permalink raw reply


This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox