[dpdk-dev] [RFC] lib/librte_ether: consistent PMD batching behavior

Zhiyong Yang zhiyong.yang at intel.com
Fri Jan 20 10:51:16 CET 2017


The rte_eth_tx_burst() function in the file Rte_ethdev.h is invoked to
transmit output packets on the output queue for DPDK applications as
follows.

static inline uint16_t
rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
                 struct rte_mbuf **tx_pkts, uint16_t nb_pkts);

Note: The fourth parameter nb_pkts: The number of packets to transmit.
The rte_eth_tx_burst() function returns the number of packets it actually
sent. The return value equal to *nb_pkts* means that all packets have been
sent, and this is likely to signify that other output packets could be
immediately transmitted again. Applications that implement a "send as many
packets to transmit as possible" policy can check this specific case and
keep invoking the rte_eth_tx_burst() function until a value less than
*nb_pkts* is returned.

When you call TX only once in rte_eth_tx_burst, you may get different
behaviors from different PMDs. One problem that every DPDK user has to
face is that they need to take the policy into consideration at the app-
lication level when using any specific PMD to send the packets whether or
not it is necessary, which brings usage complexities and makes DPDK users
easily confused since they have to learn the details on TX function limit
of specific PMDs and have to handle the different return value: the number
of packets transmitted successfully for various PMDs. Some PMDs Tx func-
tions have a limit of sending at most 32 packets for every invoking, some
PMDs have another limit of at most 64 packets once, another ones have imp-
lemented to send as many packets to transmit as possible, etc. This will
easily cause wrong usage for DPDK users.

This patch proposes to implement the above policy in DPDK lib in order to
simplify the application implementation and avoid the incorrect invoking
as well. So, DPDK Users don't need to consider the implementation policy
and to write duplicated code at the application level again when sending
packets. In addition to it, the users don't need to know the difference of
specific PMD TX and can transmit the arbitrary number of packets as they
expect when invoking TX API rte_eth_tx_burst, then check the return value
to get the number of packets actually sent.

How to implement the policy in DPDK lib? Two solutions are proposed below.

Solution 1:
Implement the wrapper functions to remove some limits for each specific
PMDs as i40e_xmit_pkts_simple and ixgbe_xmit_pkts_simple do like that.

Solution 2:
Implement the policy in the function rte_eth_tx_burst() at the ethdev lay-
er in a more consistent batching way. Make best effort to send *nb_pkts*
packets with bursts of no more than 32 by default since many DPDK TX PMDs
are using this max TX burst size(32). In addition, one data member which
defines the max TX burst size such as "uint16_t max_tx_burst_pkts;"will be
added to rte_eth_dev_data, which drivers can override if they work with
bursts of 64 or other NB(thanks for Bruce <bruce.richardson at intel.com>'s
suggestion). This can reduce the performance impacting to the lowest limit.

I prefer the latter between the 2 solutions because it makes DPDK code more
consistent and easier and avoids to write too much duplicate logic in DPDK
source code. In addition, I think no or a little performance drop is
brought by solution 2. But ABI change will be introduced.

In fact, the current rte_eth_rx_burst() function is using the similar
mechanism and faces the same problem as rte_eth_tx_burst().

static inline uint16_t
rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
                 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts);

Applications are responsible of implementing the policy "retrieve as many
received packets as possible", and check this specific case and keep
invoking the rte_eth_rx_burst() function until a value less than *nb_pkts*
is returned.

The patch proposes to apply the above method to rte_eth_rx_burst() as well.

In summary, The purpose of the RFC makes the job easier and more simple for
driver writers and avoids to write too much duplicate code at the applica-
tion level.

Signed-off-by: Zhiyong Yang <zhiyong.yang at intel.com>
---
 lib/librte_ether/rte_ethdev.h | 41 +++++++++++++++++++++++++++++++++++++++--
 1 file changed, 39 insertions(+), 2 deletions(-)

diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 1c356c1..6fa83cf 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1712,6 +1712,9 @@ struct rte_eth_dev_data {
 	uint32_t min_rx_buf_size;
 	/**< Common rx buffer size handled by all queues */
 
+	uint16_t max_rx_burst_pkts;
+	uint16_t max_tx_burst_pkts;
+
 	uint64_t rx_mbuf_alloc_failed; /**< RX ring mbuf allocation failures. */
 	struct ether_addr* mac_addrs;/**< Device Ethernet Link address. */
 	uint64_t mac_pool_sel[ETH_NUM_RECEIVE_MAC_ADDR];
@@ -2695,11 +2698,15 @@ int rte_eth_dev_set_vlan_pvid(uint8_t port_id, uint16_t pvid, int on);
  *   of pointers to *rte_mbuf* structures effectively supplied to the
  *   *rx_pkts* array.
  */
+
 static inline uint16_t
 rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
 		 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
 {
 	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	int16_t nb_rx = 0;
+	uint16_t pkts = 0;
+	uint16_t rx_nb_pkts = nb_pkts;
 
 #ifdef RTE_LIBRTE_ETHDEV_DEBUG
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
@@ -2710,8 +2717,20 @@ rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
 		return 0;
 	}
 #endif
-	int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
+	if (likely(nb_pkts <= dev->data->max_rx_burst_pkts))
+		return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
 			rx_pkts, nb_pkts);
+	while (rx_nb_pkts) {
+		uint16_t num_burst = RTE_MIN(nb_pkts,
+					      dev->data->max_rx_burst_pkts);
+
+		pkts = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
+						&rx_pkts[nb_rx], num_burst);
+		nb_rx += pkts;
+		rx_nb_pkts -= pkts;
+		if (pkts < num_burst)
+			break;
+	}
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
 	struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
@@ -2833,11 +2852,13 @@ rte_eth_rx_descriptor_done(uint8_t port_id, uint16_t queue_id, uint16_t offset)
  *   the transmit ring. The return value can be less than the value of the
  *   *tx_pkts* parameter when the transmit ring is full or has been filled up.
  */
+
 static inline uint16_t
 rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
 		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 {
 	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	uint16_t nb_tx = 0;
 
 #ifdef RTE_LIBRTE_ETHDEV_DEBUG
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
@@ -2860,8 +2881,24 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
 		} while (cb != NULL);
 	}
 #endif
+	if (likely(nb_pkts <= dev->data->max_tx_burst_pkts))
+		return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id],
+						tx_pkts, nb_pkts);
+
+	while (nb_pkts) {
+		uint16_t num_burst = RTE_MIN(nb_pkts,
+					     dev->data->max_tx_burst_pkts);
+		uint16_t pkts;
+
+		pkts = (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id],
+						&tx_pkts[nb_tx], num_burst);
+		nb_tx += pkts;
+		nb_pkts -= pkts;
+		if (pkts < num_burst)
+			break;
+	}
 
-	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
+	return nb_tx;
 }
 
 /**
-- 
2.7.4



More information about the dev mailing list