[dpdk-dev] [PATCH v3 3/4] net/bond: dedicated hw queues for LACP control traffic

Declan Doherty declan.doherty at intel.com
Tue Jul 4 18:46:26 CEST 2017


From: Tomasz Kulasek <tomaszx.kulasek at intel.com>

Add support for hardware flow classification of LACP control plane
traffic to be redirect to a dedicated receive queue on each slave which
is not visible to application. Also enables a dedicate transmit queue
for LACP traffic which allows complete decoupling of control and data
paths.

This only applies to bonding devices running in mode 4
(link-aggegration-802.3ad).

Introduce two new APIs to support enable/disabled of dedicated
queues.

- rte_eth_bond_8023ad_dedicated_queues_enable
- rte_eth_bond_8023ad_dedicated_queues_disable

rte_eth_bond_8023ad_dedicated_queues_enable must be called before
bonding port is configured or started to reserved and configure the
dedicated queuesh.

When this option is enabled all slaves must support flow filtering 
by ethernet type and support one additional tx and rx queue on 
each slave.

Signed-off-by: Tomasz Kulasek <tomaszx.kulasek at intel.com>
Signed-off-by: Declan Doherty <declan.doherty at intel.com>
---
 drivers/net/bonding/rte_eth_bond_8023ad.c         | 167 +++++++--
 drivers/net/bonding/rte_eth_bond_8023ad.h         |  42 +++
 drivers/net/bonding/rte_eth_bond_8023ad_private.h |  27 ++
 drivers/net/bonding/rte_eth_bond_pmd.c            | 419 ++++++++++++++++++++--
 drivers/net/bonding/rte_eth_bond_version.map      |   9 +
 5 files changed, 612 insertions(+), 52 deletions(-)

diff --git a/drivers/net/bonding/rte_eth_bond_8023ad.c b/drivers/net/bonding/rte_eth_bond_8023ad.c
index 65dc75b..a2313b3 100644
--- a/drivers/net/bonding/rte_eth_bond_8023ad.c
+++ b/drivers/net/bonding/rte_eth_bond_8023ad.c
@@ -632,16 +632,29 @@ tx_machine(struct bond_dev_private *internals, uint8_t slave_id)
 	lacpdu->tlv_type_terminator = TLV_TYPE_TERMINATOR_INFORMATION;
 	lacpdu->terminator_length = 0;
 
-	if (rte_ring_enqueue(port->tx_ring, lacp_pkt) == -ENOBUFS) {
-		/* If TX ring full, drop packet and free message. Retransmission
-		 * will happen in next function call. */
-		rte_pktmbuf_free(lacp_pkt);
-		set_warning_flags(port, WRN_TX_QUEUE_FULL);
-		return;
+	MODE4_DEBUG("Sending LACP frame\n");
+	BOND_PRINT_LACP(lacpdu);
+
+	if (internals->mode4.dedicated_queues.enabled == 0) {
+		int retval = rte_ring_enqueue(port->tx_ring, lacp_pkt);
+		if (retval != 0) {
+			/* If TX ring full, drop packet and free message.
+			   Retransmission will happen in next function call. */
+			rte_pktmbuf_free(lacp_pkt);
+			set_warning_flags(port, WRN_TX_QUEUE_FULL);
+			return;
+		}
+	} else {
+		uint16_t pkts_sent = rte_eth_tx_burst(slave_id,
+				internals->mode4.dedicated_queues.tx_qid,
+				&lacp_pkt, 1);
+		if (pkts_sent != 1) {
+			rte_pktmbuf_free(lacp_pkt);
+			set_warning_flags(port, WRN_TX_QUEUE_FULL);
+			return;
+		}
 	}
 
-	MODE4_DEBUG("sending LACP frame\n");
-	BOND_PRINT_LACP(lacpdu);
 
 	timer_set(&port->tx_machine_timer, internals->mode4.tx_period_timeout);
 	SM_FLAG_CLR(port, NTT);
@@ -741,6 +754,22 @@ link_speed_key(uint16_t speed) {
 }
 
 static void
+rx_machine_update(struct bond_dev_private *internals, uint8_t slave_id,
+		struct rte_mbuf *lacp_pkt) {
+	struct lacpdu_header *lacp;
+
+	if (lacp_pkt != NULL) {
+		lacp = rte_pktmbuf_mtod(lacp_pkt, struct lacpdu_header *);
+		RTE_ASSERT(lacp->lacpdu.subtype == SLOW_SUBTYPE_LACP);
+
+		/* This is LACP frame so pass it to rx_machine */
+		rx_machine(internals, slave_id, &lacp->lacpdu);
+		rte_pktmbuf_free(lacp_pkt);
+	} else
+		rx_machine(internals, slave_id, NULL);
+}
+
+static void
 bond_mode_8023ad_periodic_cb(void *arg)
 {
 	struct rte_eth_dev *bond_dev = arg;
@@ -748,8 +777,8 @@ bond_mode_8023ad_periodic_cb(void *arg)
 	struct port *port;
 	struct rte_eth_link link_info;
 	struct ether_addr slave_addr;
+	struct rte_mbuf *lacp_pkt = NULL;
 
-	void *pkt = NULL;
 	uint8_t i, slave_id;
 
 
@@ -809,20 +838,28 @@ bond_mode_8023ad_periodic_cb(void *arg)
 
 		SM_FLAG_SET(port, LACP_ENABLED);
 
-		/* Find LACP packet to this port. Do not check subtype, it is done in
-		 * function that queued packet */
-		if (rte_ring_dequeue(port->rx_ring, &pkt) == 0) {
-			struct rte_mbuf *lacp_pkt = pkt;
-			struct lacpdu_header *lacp;
+		if (internals->mode4.dedicated_queues.enabled == 0) {
+			/* Find LACP packet to this port. Do not check subtype,
+			 * it is done in function that queued packet
+			 */
+			int retval = rte_ring_dequeue(port->rx_ring,
+					(void **)&lacp_pkt);
 
-			lacp = rte_pktmbuf_mtod(lacp_pkt, struct lacpdu_header *);
-			RTE_ASSERT(lacp->lacpdu.subtype == SLOW_SUBTYPE_LACP);
+			if (retval != 0)
+				lacp_pkt = NULL;
 
-			/* This is LACP frame so pass it to rx_machine */
-			rx_machine(internals, slave_id, &lacp->lacpdu);
-			rte_pktmbuf_free(lacp_pkt);
-		} else
-			rx_machine(internals, slave_id, NULL);
+			rx_machine_update(internals, slave_id, lacp_pkt);
+		} else {
+			uint16_t rx_count = rte_eth_rx_burst(slave_id,
+					internals->mode4.dedicated_queues.rx_qid,
+					&lacp_pkt, 1);
+
+			if (rx_count == 1)
+				bond_mode_8023ad_handle_slow_pkt(internals,
+						slave_id, lacp_pkt);
+			else
+				rx_machine_update(internals, slave_id, NULL);
+		}
 
 		periodic_machine(internals, slave_id);
 		mux_machine(internals, slave_id);
@@ -1067,6 +1104,10 @@ bond_mode_8023ad_conf_assign(struct mode8023ad_private *mode4,
 	mode4->tx_period_timeout = conf->tx_period_ms * ms_ticks;
 	mode4->rx_marker_timeout = conf->rx_marker_period_ms * ms_ticks;
 	mode4->update_timeout_us = conf->update_timeout_ms * 1000;
+
+	mode4->dedicated_queues.enabled = 0;
+	mode4->dedicated_queues.rx_qid = UINT16_MAX;
+	mode4->dedicated_queues.tx_qid = UINT16_MAX;
 }
 
 static void
@@ -1191,18 +1232,36 @@ bond_mode_8023ad_handle_slow_pkt(struct bond_dev_private *internals,
 		m_hdr->marker.tlv_type_marker = MARKER_TLV_TYPE_RESP;
 		rte_eth_macaddr_get(slave_id, &m_hdr->eth_hdr.s_addr);
 
-		if (unlikely(rte_ring_enqueue(port->tx_ring, pkt) == -ENOBUFS)) {
-			/* reset timer */
-			port->rx_marker_timer = 0;
-			wrn = WRN_TX_QUEUE_FULL;
-			goto free_out;
+		if (internals->mode4.dedicated_queues.enabled == 0) {
+			int retval = rte_ring_enqueue(port->tx_ring, pkt);
+			if (retval != 0) {
+				/* reset timer */
+				port->rx_marker_timer = 0;
+				wrn = WRN_TX_QUEUE_FULL;
+				goto free_out;
+			}
+		} else {
+			/* Send packet directly to the slow queue */
+			uint16_t tx_count = rte_eth_tx_burst(slave_id,
+					internals->mode4.dedicated_queues.tx_qid,
+					&pkt, 1);
+			if (tx_count != 1) {
+				/* reset timer */
+				port->rx_marker_timer = 0;
+				wrn = WRN_TX_QUEUE_FULL;
+				goto free_out;
+			}
 		}
 	} else if (likely(subtype == SLOW_SUBTYPE_LACP)) {
-		if (unlikely(rte_ring_enqueue(port->rx_ring, pkt) == -ENOBUFS)) {
-			/* If RX fing full free lacpdu message and drop packet */
-			wrn = WRN_RX_QUEUE_FULL;
-			goto free_out;
-		}
+		if (internals->mode4.dedicated_queues.enabled == 0) {
+			int retval = rte_ring_enqueue(port->rx_ring, pkt);
+			if (retval != 0) {
+				/* If RX fing full free lacpdu message and drop packet */
+				wrn = WRN_RX_QUEUE_FULL;
+				goto free_out;
+			}
+		} else
+			rx_machine_update(internals, slave_id, pkt);
 	} else {
 		wrn = WRN_UNKNOWN_SLOW_TYPE;
 		goto free_out;
@@ -1507,3 +1566,49 @@ bond_mode_8023ad_ext_periodic_cb(void *arg)
 	rte_eal_alarm_set(internals->mode4.update_timeout_us,
 			bond_mode_8023ad_ext_periodic_cb, arg);
 }
+
+int
+rte_eth_bond_8023ad_dedicated_queues_enable(uint8_t port)
+{
+	int retval = 0;
+	struct rte_eth_dev *dev = &rte_eth_devices[port];
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		dev->data->dev_private;
+
+	if (check_for_bonded_ethdev(dev) != 0)
+		return -1;
+
+	if (bond_8023ad_slow_pkt_hw_filter_supported(port) != 0)
+		return -1;
+
+	/* Device must be stopped to set up slow queue */
+	if (dev->data->dev_started)
+		return -1;
+
+	internals->mode4.dedicated_queues.enabled = 1;
+
+	bond_ethdev_mode_set(dev, internals->mode);
+	return retval;
+}
+
+int
+rte_eth_bond_8023ad_dedicated_queues_disable(uint8_t port)
+{
+	int retval = 0;
+	struct rte_eth_dev *dev = &rte_eth_devices[port];
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		dev->data->dev_private;
+
+	if (check_for_bonded_ethdev(dev) != 0)
+		return -1;
+
+	/* Device must be stopped to set up slow queue */
+	if (dev->data->dev_started)
+		return -1;
+
+	internals->mode4.dedicated_queues.enabled = 0;
+
+	bond_ethdev_mode_set(dev, internals->mode);
+
+	return retval;
+}
diff --git a/drivers/net/bonding/rte_eth_bond_8023ad.h b/drivers/net/bonding/rte_eth_bond_8023ad.h
index 6b8ff57..5c61e66 100644
--- a/drivers/net/bonding/rte_eth_bond_8023ad.h
+++ b/drivers/net/bonding/rte_eth_bond_8023ad.h
@@ -302,4 +302,46 @@ int
 rte_eth_bond_8023ad_ext_slowtx(uint8_t port_id, uint8_t slave_id,
 		struct rte_mbuf *lacp_pkt);
 
+/**
+ * Enable dedicated hw queues for 802.3ad control plane traffic on on slaves
+ *
+ * This function creates an additional tx and rx queue on each slave for
+ * dedicated 802.3ad control plane traffic . A flow filtering rule is
+ * programmed on each slave to redirect all LACP slow packets to that rx queue
+ * for processing in the LACP state machine, this removes the need to filter
+ * these packets in the bonded devices data path. The additional tx queue is
+ * used to enable the LACP state machine to enqueue LACP packets directly to
+ * slave hw independently of the bonded devices data path.
+ *
+ * To use this feature all slaves must support the programming of the flow
+ * filter rule required for rx and have enough queues that one rx and tx queue
+ * can be reserved for the LACP state machines control packets.
+ *
+ * Bonding port must be stopped to change this configuration.
+ *
+ * @param port_id      Bonding device id
+ *
+ * @return
+ *   0 on success, negative value otherwise.
+ */
+int
+rte_eth_bond_8023ad_dedicated_queues_enable(uint8_t port_id);
+
+/**
+ * Disable slow queue on slaves
+ *
+ * This function disables hardware slow packet filter.
+ *
+ * Bonding port must be stopped to change this configuration.
+ *
+ * @see rte_eth_bond_8023ad_slow_pkt_hw_filter_enable
+ *
+ * @param port_id      Bonding device id
+ * @return
+ *   0 on success, negative value otherwise.
+ *
+ */
+int
+rte_eth_bond_8023ad_dedicated_queues_disable(uint8_t port_id);
+
 #endif /* RTE_ETH_BOND_8023AD_H_ */
diff --git a/drivers/net/bonding/rte_eth_bond_8023ad_private.h b/drivers/net/bonding/rte_eth_bond_8023ad_private.h
index ca8858b..c16dba8 100644
--- a/drivers/net/bonding/rte_eth_bond_8023ad_private.h
+++ b/drivers/net/bonding/rte_eth_bond_8023ad_private.h
@@ -39,6 +39,7 @@
 #include <rte_ether.h>
 #include <rte_byteorder.h>
 #include <rte_atomic.h>
+#include <rte_flow.h>
 
 #include "rte_eth_bond_8023ad.h"
 
@@ -162,6 +163,9 @@ struct port {
 
 	uint64_t warning_timer;
 	volatile uint16_t warnings_to_show;
+
+	/** Memory pool used to allocate slow queues */
+	struct rte_mempool *slow_pool;
 };
 
 struct mode8023ad_private {
@@ -175,6 +179,19 @@ struct mode8023ad_private {
 	uint64_t update_timeout_us;
 	rte_eth_bond_8023ad_ext_slowrx_fn slowrx_cb;
 	uint8_t external_sm;
+
+	/**
+	 * Configuration of dedicated hardware queues for control plane
+	 * traffic
+	 */
+	struct {
+		uint8_t enabled;
+
+		struct rte_flow *flow[RTE_MAX_ETHPORTS];
+
+		uint16_t rx_qid;
+		uint16_t tx_qid;
+	} dedicated_queues;
 };
 
 /**
@@ -295,4 +312,14 @@ bond_mode_8023ad_deactivate_slave(struct rte_eth_dev *dev, uint8_t slave_pos);
 void
 bond_mode_8023ad_mac_address_update(struct rte_eth_dev *bond_dev);
 
+int
+bond_ethdev_8023ad_flow_verify(struct rte_eth_dev *bond_dev,
+		uint8_t slave_port);
+
+int
+bond_ethdev_8023ad_flow_set(struct rte_eth_dev *bond_dev, uint8_t slave_port);
+
+int
+bond_8023ad_slow_pkt_hw_filter_supported(uint8_t port_id);
+
 #endif /* RTE_ETH_BOND_8023AD_H_ */
diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c b/drivers/net/bonding/rte_eth_bond_pmd.c
index 9730ae0..4d1b262 100644
--- a/drivers/net/bonding/rte_eth_bond_pmd.c
+++ b/drivers/net/bonding/rte_eth_bond_pmd.c
@@ -133,6 +133,254 @@ is_lacp_packets(uint16_t ethertype, uint8_t subtype, uint16_t vlan_tci)
 		(subtype == SLOW_SUBTYPE_MARKER || subtype == SLOW_SUBTYPE_LACP));
 }
 
+/*****************************************************************************
+ * Flow director's setup for mode 4 optimization
+ */
+
+static struct rte_flow_item_eth flow_item_eth_type_8023ad = {
+	.dst.addr_bytes = { 0 },
+	.src.addr_bytes = { 0 },
+	.type = RTE_BE16(ETHER_TYPE_SLOW),
+};
+
+static struct rte_flow_item_eth flow_item_eth_mask_type_8023ad = {
+	.dst.addr_bytes = { 0 },
+	.src.addr_bytes = { 0 },
+	.type = 0xFFFF,
+};
+
+static struct rte_flow_item flow_item_8023ad[] = {
+	{
+		.type = RTE_FLOW_ITEM_TYPE_ETH,
+		.spec = &flow_item_eth_type_8023ad,
+		.last = NULL,
+		.mask = &flow_item_eth_mask_type_8023ad,
+	},
+	{
+		.type = RTE_FLOW_ITEM_TYPE_END,
+		.spec = NULL,
+		.last = NULL,
+		.mask = NULL,
+	}
+};
+
+const struct rte_flow_attr flow_attr_8023ad = {
+	.group = 0,
+	.priority = 0,
+	.ingress = 1,
+	.egress = 0,
+	.reserved = 0,
+};
+
+int
+bond_ethdev_8023ad_flow_verify(struct rte_eth_dev *bond_dev,
+		uint8_t slave_port) {
+	struct rte_flow_error error;
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+			(bond_dev->data->dev_private);
+
+	struct rte_flow_action_queue lacp_queue_conf = {
+		.index = internals->mode4.dedicated_queues.rx_qid,
+	};
+
+	const struct rte_flow_action actions[] = {
+		{
+			.type = RTE_FLOW_ACTION_TYPE_QUEUE,
+			.conf = &lacp_queue_conf
+		},
+		{
+			.type = RTE_FLOW_ACTION_TYPE_END,
+		}
+	};
+
+	int ret = rte_flow_validate(slave_port, &flow_attr_8023ad,
+			flow_item_8023ad, actions, &error);
+	if (ret < 0)
+		return -1;
+
+	return 0;
+}
+
+int
+bond_8023ad_slow_pkt_hw_filter_supported(uint8_t port_id) {
+	struct rte_eth_dev *bond_dev = &rte_eth_devices[port_id];
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+			(bond_dev->data->dev_private);
+	struct rte_eth_dev_info bond_info, slave_info;
+	uint8_t idx;
+
+	/* Verify if all slaves in bonding supports flow director and */
+	if (internals->slave_count > 0) {
+		rte_eth_dev_info_get(bond_dev->data->port_id, &bond_info);
+
+		internals->mode4.dedicated_queues.rx_qid = bond_info.nb_rx_queues;
+		internals->mode4.dedicated_queues.tx_qid = bond_info.nb_tx_queues;
+
+		for (idx = 0; idx < internals->slave_count; idx++) {
+			rte_eth_dev_info_get(internals->slaves[idx].port_id,
+					&slave_info);
+
+			if (bond_ethdev_8023ad_flow_verify(bond_dev,
+					internals->slaves[idx].port_id) != 0)
+				return -1;
+		}
+	}
+
+	return 0;
+}
+
+int
+bond_ethdev_8023ad_flow_set(struct rte_eth_dev *bond_dev, uint8_t slave_port) {
+
+	struct rte_flow_error error;
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+			(bond_dev->data->dev_private);
+
+	struct rte_flow_action_queue lacp_queue_conf = {
+		.index = internals->mode4.dedicated_queues.rx_qid,
+	};
+
+	const struct rte_flow_action actions[] = {
+		{
+			.type = RTE_FLOW_ACTION_TYPE_QUEUE,
+			.conf = &lacp_queue_conf
+		},
+		{
+			.type = RTE_FLOW_ACTION_TYPE_END,
+		}
+	};
+
+	internals->mode4.dedicated_queues.flow[slave_port] = rte_flow_create(slave_port,
+			&flow_attr_8023ad, flow_item_8023ad, actions, &error);
+	if (internals->mode4.dedicated_queues.flow[slave_port] == NULL) {
+		RTE_BOND_LOG(ERR, "bond_ethdev_8023ad_flow_set: %s "
+				"(slave_port=%d queue_id=%d)",
+				error.message, slave_port,
+				internals->mode4.dedicated_queues.rx_qid);
+		return -1;
+	}
+
+	return 0;
+}
+
+static uint16_t
+bond_ethdev_rx_burst_8023ad_fast_queue(void *queue, struct rte_mbuf **bufs,
+		uint16_t nb_pkts)
+{
+	struct bond_rx_queue *bd_rx_q = (struct bond_rx_queue *)queue;
+	struct bond_dev_private *internals = bd_rx_q->dev_private;
+	uint16_t num_rx_total = 0;	/* Total number of received packets */
+	uint8_t slaves[RTE_MAX_ETHPORTS];
+	uint8_t slave_count;
+
+	uint8_t i, idx;
+
+	/* Copy slave list to protect against slave up/down changes during tx
+	 * bursting */
+	slave_count = internals->active_slave_count;
+	memcpy(slaves, internals->active_slaves,
+			sizeof(internals->active_slaves[0]) * slave_count);
+
+	for (i = 0, idx = internals->active_slave;
+			i < slave_count && num_rx_total < nb_pkts; i++, idx++) {
+		idx = idx % slave_count;
+
+		/* Read packets from this slave */
+		num_rx_total += rte_eth_rx_burst(slaves[idx], bd_rx_q->queue_id,
+				&bufs[num_rx_total], nb_pkts - num_rx_total);
+	}
+
+	internals->active_slave = idx;
+
+	return num_rx_total;
+}
+
+static uint16_t
+bond_ethdev_tx_burst_8023ad_fast_queue(void *queue, struct rte_mbuf **bufs,
+		uint16_t nb_pkts)
+{
+	struct bond_dev_private *internals;
+	struct bond_tx_queue *bd_tx_q;
+
+	uint8_t num_of_slaves;
+	uint8_t slaves[RTE_MAX_ETHPORTS];
+	 /* positions in slaves, not ID */
+	uint8_t distributing_offsets[RTE_MAX_ETHPORTS];
+	uint8_t distributing_count;
+
+	uint16_t num_tx_slave, num_tx_total = 0, num_tx_fail_total = 0;
+	uint16_t i, op_slave_idx;
+
+	struct rte_mbuf *slave_bufs[RTE_MAX_ETHPORTS][nb_pkts];
+
+	/* Total amount of packets in slave_bufs */
+	uint16_t slave_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
+	/* Slow packets placed in each slave */
+
+	if (unlikely(nb_pkts == 0))
+		return 0;
+
+	bd_tx_q = (struct bond_tx_queue *)queue;
+	internals = bd_tx_q->dev_private;
+
+	/* Copy slave list to protect against slave up/down changes during tx
+	 * bursting */
+	num_of_slaves = internals->active_slave_count;
+	if (num_of_slaves < 1)
+		return num_tx_total;
+
+	memcpy(slaves, internals->active_slaves, sizeof(slaves[0]) *
+			num_of_slaves);
+
+	distributing_count = 0;
+	for (i = 0; i < num_of_slaves; i++) {
+		struct port *port = &mode_8023ad_ports[slaves[i]];
+		if (ACTOR_STATE(port, DISTRIBUTING))
+			distributing_offsets[distributing_count++] = i;
+	}
+
+	if (likely(distributing_count > 0)) {
+		/* Populate slaves mbuf with the packets which are to be sent */
+		for (i = 0; i < nb_pkts; i++) {
+			/* Select output slave using hash based on xmit policy */
+			op_slave_idx = internals->xmit_hash(bufs[i],
+					distributing_count);
+
+			/* Populate slave mbuf arrays with mbufs for that slave.
+			 * Use only slaves that are currently distributing.
+			 */
+			uint8_t slave_offset =
+					distributing_offsets[op_slave_idx];
+			slave_bufs[slave_offset][slave_nb_pkts[slave_offset]] =
+					bufs[i];
+			slave_nb_pkts[slave_offset]++;
+		}
+	}
+
+	/* Send packet burst on each slave device */
+	for (i = 0; i < num_of_slaves; i++) {
+		if (slave_nb_pkts[i] == 0)
+			continue;
+
+		num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+				slave_bufs[i], slave_nb_pkts[i]);
+
+		num_tx_total += num_tx_slave;
+		num_tx_fail_total += slave_nb_pkts[i] - num_tx_slave;
+
+		/* If tx burst fails move packets to end of bufs */
+		if (unlikely(num_tx_slave < slave_nb_pkts[i])) {
+			uint16_t j = nb_pkts - num_tx_fail_total;
+			for ( ; num_tx_slave < slave_nb_pkts[i]; j++,
+					num_tx_slave++)
+				bufs[j] = slave_bufs[i][num_tx_slave];
+		}
+	}
+
+	return num_tx_total;
+}
+
+
 static uint16_t
 bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 		uint16_t nb_pkts)
@@ -1302,11 +1550,19 @@ bond_ethdev_mode_set(struct rte_eth_dev *eth_dev, int mode)
 		if (bond_mode_8023ad_enable(eth_dev) != 0)
 			return -1;
 
-		eth_dev->rx_pkt_burst = bond_ethdev_rx_burst_8023ad;
-		eth_dev->tx_pkt_burst = bond_ethdev_tx_burst_8023ad;
-		RTE_LOG(WARNING, PMD,
-				"Using mode 4, it is necessary to do TX burst and RX burst "
-				"at least every 100ms.\n");
+		if (internals->mode4.dedicated_queues.enabled == 0) {
+			eth_dev->rx_pkt_burst = bond_ethdev_rx_burst_8023ad;
+			eth_dev->tx_pkt_burst = bond_ethdev_tx_burst_8023ad;
+			RTE_LOG(WARNING, PMD,
+				"Using mode 4, it is necessary to do TX burst "
+				"and RX burst at least every 100ms.\n");
+		} else {
+			/* Use flow director's optimization */
+			eth_dev->rx_pkt_burst =
+					bond_ethdev_rx_burst_8023ad_fast_queue;
+			eth_dev->tx_pkt_burst =
+					bond_ethdev_tx_burst_8023ad_fast_queue;
+		}
 		break;
 	case BONDING_MODE_TLB:
 		eth_dev->tx_pkt_burst = bond_ethdev_tx_burst_tlb;
@@ -1328,15 +1584,81 @@ bond_ethdev_mode_set(struct rte_eth_dev *eth_dev, int mode)
 	return 0;
 }
 
+
+static int
+slave_configure_slow_queue(struct rte_eth_dev *bonded_eth_dev,
+		struct rte_eth_dev *slave_eth_dev)
+{
+	int errval = 0;
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		bonded_eth_dev->data->dev_private;
+	struct port *port = &mode_8023ad_ports[slave_eth_dev->data->port_id];
+
+	if (port->slow_pool == NULL) {
+		char mem_name[256];
+		int slave_id = slave_eth_dev->data->port_id;
+
+		snprintf(mem_name, RTE_DIM(mem_name), "slave_port%u_slow_pool",
+				slave_id);
+		port->slow_pool = rte_pktmbuf_pool_create(mem_name, 8191,
+			250, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
+			slave_eth_dev->data->numa_node);
+
+		/* Any memory allocation failure in initialization is critical because
+		 * resources can't be free, so reinitialization is impossible. */
+		if (port->slow_pool == NULL) {
+			rte_panic("Slave %u: Failed to create memory pool '%s': %s\n",
+				slave_id, mem_name, rte_strerror(rte_errno));
+		}
+	}
+
+	if (internals->mode4.dedicated_queues.enabled == 1) {
+		/* Configure slow Rx queue */
+
+		errval = rte_eth_rx_queue_setup(slave_eth_dev->data->port_id,
+				internals->mode4.dedicated_queues.rx_qid, 128,
+				rte_eth_dev_socket_id(slave_eth_dev->data->port_id),
+				NULL, port->slow_pool);
+		if (errval != 0) {
+			RTE_BOND_LOG(ERR,
+					"rte_eth_rx_queue_setup: port=%d queue_id %d, err (%d)",
+					slave_eth_dev->data->port_id,
+					internals->mode4.dedicated_queues.rx_qid,
+					errval);
+			return errval;
+		}
+
+		errval = rte_eth_tx_queue_setup(slave_eth_dev->data->port_id,
+				internals->mode4.dedicated_queues.tx_qid, 512,
+				rte_eth_dev_socket_id(slave_eth_dev->data->port_id),
+				NULL);
+		if (errval != 0) {
+			RTE_BOND_LOG(ERR,
+				"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
+				slave_eth_dev->data->port_id,
+				internals->mode4.dedicated_queues.tx_qid,
+				errval);
+			return errval;
+		}
+	}
+	return 0;
+}
+
 int
 slave_configure(struct rte_eth_dev *bonded_eth_dev,
 		struct rte_eth_dev *slave_eth_dev)
 {
 	struct bond_rx_queue *bd_rx_q;
 	struct bond_tx_queue *bd_tx_q;
+	uint16_t nb_rx_queues;
+	uint16_t nb_tx_queues;
 
 	int errval;
 	uint16_t q_id;
+	struct rte_flow_error flow_error;
+
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		bonded_eth_dev->data->dev_private;
 
 	/* Stop slave */
 	rte_eth_dev_stop(slave_eth_dev->data->port_id);
@@ -1366,10 +1688,19 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
 	slave_eth_dev->data->dev_conf.rxmode.hw_vlan_filter =
 			bonded_eth_dev->data->dev_conf.rxmode.hw_vlan_filter;
 
+	nb_rx_queues = bonded_eth_dev->data->nb_rx_queues;
+	nb_tx_queues = bonded_eth_dev->data->nb_tx_queues;
+
+	if (internals->mode == BONDING_MODE_8023AD) {
+		if (internals->mode4.dedicated_queues.enabled == 1) {
+			nb_rx_queues++;
+			nb_tx_queues++;
+		}
+	}
+
 	/* Configure device */
 	errval = rte_eth_dev_configure(slave_eth_dev->data->port_id,
-			bonded_eth_dev->data->nb_rx_queues,
-			bonded_eth_dev->data->nb_tx_queues,
+			nb_rx_queues, nb_tx_queues,
 			&(slave_eth_dev->data->dev_conf));
 	if (errval != 0) {
 		RTE_BOND_LOG(ERR, "Cannot configure slave device: port %u , err (%d)",
@@ -1403,12 +1734,35 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
 				&bd_tx_q->tx_conf);
 		if (errval != 0) {
 			RTE_BOND_LOG(ERR,
-					"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
-					slave_eth_dev->data->port_id, q_id, errval);
+				"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
+				slave_eth_dev->data->port_id, q_id, errval);
 			return errval;
 		}
 	}
 
+	if (internals->mode == BONDING_MODE_8023AD &&
+			internals->mode4.dedicated_queues.enabled == 1) {
+		if (slave_configure_slow_queue(bonded_eth_dev, slave_eth_dev)
+				!= 0)
+			return errval;
+
+		if (bond_ethdev_8023ad_flow_verify(bonded_eth_dev,
+				slave_eth_dev->data->port_id) != 0) {
+			RTE_BOND_LOG(ERR,
+				"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
+				slave_eth_dev->data->port_id, q_id, errval);
+			return -1;
+		}
+
+		if (internals->mode4.dedicated_queues.flow[slave_eth_dev->data->port_id] != NULL)
+			rte_flow_destroy(slave_eth_dev->data->port_id,
+					internals->mode4.dedicated_queues.flow[slave_eth_dev->data->port_id],
+					&flow_error);
+
+		bond_ethdev_8023ad_flow_set(bonded_eth_dev,
+				slave_eth_dev->data->port_id);
+	}
+
 	/* Start device */
 	errval = rte_eth_dev_start(slave_eth_dev->data->port_id);
 	if (errval != 0) {
@@ -1567,13 +1921,26 @@ bond_ethdev_start(struct rte_eth_dev *eth_dev)
 	if (internals->promiscuous_en)
 		bond_ethdev_promiscuous_enable(eth_dev);
 
+	if (internals->mode == BONDING_MODE_8023AD) {
+		if (internals->mode4.dedicated_queues.enabled == 1) {
+			internals->mode4.dedicated_queues.rx_qid =
+					eth_dev->data->nb_rx_queues;
+			internals->mode4.dedicated_queues.tx_qid =
+					eth_dev->data->nb_tx_queues;
+		}
+	}
+
+
 	/* Reconfigure each slave device if starting bonded device */
 	for (i = 0; i < internals->slave_count; i++) {
-		if (slave_configure(eth_dev,
-				&(rte_eth_devices[internals->slaves[i].port_id])) != 0) {
+		struct rte_eth_dev *slave_ethdev =
+				&(rte_eth_devices[internals->slaves[i].port_id]);
+		if (slave_configure(eth_dev, slave_ethdev) != 0) {
 			RTE_BOND_LOG(ERR,
-					"bonded port (%d) failed to reconfigure slave device (%d)",
-					eth_dev->data->port_id, internals->slaves[i].port_id);
+					"bonded port (%d) failed to reconfigure"
+					"slave device (%d)",
+					eth_dev->data->port_id,
+					internals->slaves[i].port_id);
 			return -1;
 		}
 		/* We will need to poll for link status if any slave doesn't
@@ -1698,21 +2065,21 @@ static void
 bond_ethdev_info(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info)
 {
 	struct bond_dev_private *internals = dev->data->dev_private;
+
 	uint16_t max_nb_rx_queues = UINT16_MAX;
 	uint16_t max_nb_tx_queues = UINT16_MAX;
 
 	dev_info->max_mac_addrs = 1;
 
-	dev_info->max_rx_pktlen = internals->candidate_max_rx_pktlen
-				  ? internals->candidate_max_rx_pktlen
-				  : ETHER_MAX_JUMBO_FRAME_LEN;
+	dev_info->max_rx_pktlen = internals->candidate_max_rx_pktlen ?
+			internals->candidate_max_rx_pktlen :
+			ETHER_MAX_JUMBO_FRAME_LEN;
 
+	/* Max number of tx/rx queues that the bonded device can support is the
+	 * minimum values of the bonded slaves, as all slaves must be capable
+	 * of supporting the same number of tx/rx queues.
+	 */
 	if (internals->slave_count > 0) {
-		/* Max number of tx/rx queues that the bonded device can
-		 * support is the minimum values of the bonded slaves, as
-		 * all slaves must be capable of supporting the same number
-		 * of tx/rx queues.
-		 */
 		struct rte_eth_dev_info slave_info;
 		uint8_t idx;
 
@@ -1731,6 +2098,16 @@ bond_ethdev_info(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info)
 	dev_info->max_rx_queues = max_nb_rx_queues;
 	dev_info->max_tx_queues = max_nb_tx_queues;
 
+	/**
+	 * If dedicated hw queues enabled for link bonding device in LACP mode
+	 * then we need to reduce the maximum number of data path queues by 1.
+	 */
+	if (internals->mode == BONDING_MODE_8023AD &&
+		internals->mode4.dedicated_queues.enabled == 1) {
+		dev_info->max_rx_queues--;
+		dev_info->max_tx_queues--;
+	}
+
 	dev_info->min_rx_bufsize = 0;
 
 	dev_info->rx_offload_capa = internals->rx_offload_capa;
diff --git a/drivers/net/bonding/rte_eth_bond_version.map b/drivers/net/bonding/rte_eth_bond_version.map
index 2de0a7d..9c15864 100644
--- a/drivers/net/bonding/rte_eth_bond_version.map
+++ b/drivers/net/bonding/rte_eth_bond_version.map
@@ -43,3 +43,12 @@ DPDK_16.07 {
 	rte_eth_bond_8023ad_setup;
 
 } DPDK_16.04;
+
+DPDK_17.08 {
+	global:
+
+	rte_eth_bond_8023ad_dedicated_queues_enable;
+	rte_eth_bond_8023ad_dedicated_queues_disable;
+
+	local: *;
+} DPDK_17.05;
-- 
2.9.4



More information about the dev mailing list