[dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api

Tomasz Kulasek tomaszx.kulasek at intel.com
Fri Jan 15 15:26:26 CET 2016


Date: Fri, 15 Jan 2016 15:25:31 +0100
Message-Id: <1452867932-5548-2-git-send-email-tomaszx.kulasek at intel.com>
X-Mailer: git-send-email 2.1.4
In-Reply-To: <1452867932-5548-1-git-send-email-tomaszx.kulasek at intel.com>
References: <1452867932-5548-1-git-send-email-tomaszx.kulasek at intel.com>

Many sample apps include internal buffering for single-packet-at-a-time

operation. Since this is such a common paradigm, this functionality is

better suited to being inside the core ethdev API.

The new APIs in the ethdev library are:

* rte_eth_tx_buffer - buffer up a single packet for future transmission

* rte_eth_tx_buffer_flush - flush any unsent buffered packets

* rte_eth_tx_buffer_set_err_callback - set up a callback to be called in

  case transmitting a buffered burst fails. By default, we just free the

  unsent packets.



As well as these, an additional reference callback is provided, which

frees the packets (as the default callback does), as well as updating a

user-provided counter, so that the number of dropped packets can be

tracked.



Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>

Signed-off-by: Tomasz Kulasek <tomaszx.kulasek at intel.com>

---

 config/common_bsdapp                   |    1 +

 config/common_linuxapp                 |    1 +

 lib/librte_ether/rte_ethdev.c          |   63 +++++++++-

 lib/librte_ether/rte_ethdev.h          |  211 +++++++++++++++++++++++++++++++-

 lib/librte_ether/rte_ether_version.map |    8 ++

 5 files changed, 279 insertions(+), 5 deletions(-)



diff --git a/config/common_bsdapp b/config/common_bsdapp

index ed7c31c..8a2e4c5 100644

--- a/config/common_bsdapp

+++ b/config/common_bsdapp

@@ -148,6 +148,7 @@ CONFIG_RTE_MAX_QUEUES_PER_PORT=1024

 CONFIG_RTE_LIBRTE_IEEE1588=n

 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16

 CONFIG_RTE_ETHDEV_RXTX_CALLBACKS=y

+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32

 

 #

 # Support NIC bypass logic

diff --git a/config/common_linuxapp b/config/common_linuxapp

index 74bc515..6229cab 100644

--- a/config/common_linuxapp

+++ b/config/common_linuxapp

@@ -146,6 +146,7 @@ CONFIG_RTE_MAX_QUEUES_PER_PORT=1024

 CONFIG_RTE_LIBRTE_IEEE1588=n

 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16

 CONFIG_RTE_ETHDEV_RXTX_CALLBACKS=y

+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32

 

 #

 # Support NIC bypass logic

diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c

index ed971b4..27dac1b 100644

--- a/lib/librte_ether/rte_ethdev.c

+++ b/lib/librte_ether/rte_ethdev.c

@@ -1,7 +1,7 @@

 /*-

  *   BSD LICENSE

  *

- *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.

+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.

  *   All rights reserved.

  *

  *   Redistribution and use in source and binary forms, with or without

@@ -826,11 +826,42 @@ rte_eth_dev_tx_queue_stop(uint8_t port_id, uint16_t tx_queue_id)

 

 }

 

+void

+rte_eth_count_unsent_packet_callback(struct rte_mbuf **pkts, uint16_t unsent,

+		void *userdata)

+{

+	unsigned long *count = userdata;

+	unsigned i;

+

+	for (i = 0; i < unsent; i++)

+		rte_pktmbuf_free(pkts[i]);

+

+	*count += unsent;

+}

+

+int

+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,

+		buffer_tx_error_fn cbfn, void *userdata)

+{

+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];

+

+	if (!rte_eth_dev_is_valid_port(port_id) ||

+			queue_id >= dev->data->nb_tx_queues) {

+		rte_errno = EINVAL;

+		return -1;

+	}

+

+	dev->tx_buf_err_cb[queue_id].userdata = userdata;

+	dev->tx_buf_err_cb[queue_id].flush_cb = cbfn;

+	return 0;

+}

+

 static int

 rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)

 {

 	uint16_t old_nb_queues = dev->data->nb_tx_queues;

 	void **txq;

+	struct rte_eth_dev_tx_buffer *new_bufs;

 	unsigned i;

 

 	if (dev->data->tx_queues == NULL) { /* first time configuration */

@@ -841,17 +872,40 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)

 			dev->data->nb_tx_queues = 0;

 			return -(ENOMEM);

 		}

+

+		dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",

+				sizeof(*dev->data->txq_bufs) * nb_queues, 0);

+		if (dev->data->txq_bufs == NULL) {

+			dev->data->nb_tx_queues = 0;

+			rte_free(dev->data->tx_queues);

+			return -(ENOMEM);

+		}

+

 	} else { /* re-configure */

+

+		/* flush the packets queued for all queues*/

+		for (i = 0; i < old_nb_queues; i++)

+			rte_eth_tx_buffer_flush(dev->data->port_id, i);

+

 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);

 

+		/* get new buffer space first, but keep old space around */

+		new_bufs = rte_zmalloc("ethdev->txq_bufs",

+				sizeof(*dev->data->txq_bufs) * nb_queues, 0);

+		if (new_bufs == NULL)

+			return -(ENOMEM);

+

 		txq = dev->data->tx_queues;

 

 		for (i = nb_queues; i < old_nb_queues; i++)

 			(*dev->dev_ops->tx_queue_release)(txq[i]);

 		txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,

 				  RTE_CACHE_LINE_SIZE);

-		if (txq == NULL)

-			return -ENOMEM;

+		if (txq == NULL) {

+			rte_free(new_bufs);

+			return -(ENOMEM);

+		}

+

 		if (nb_queues > old_nb_queues) {

 			uint16_t new_qs = nb_queues - old_nb_queues;

 

@@ -861,6 +915,9 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)

 

 		dev->data->tx_queues = txq;

 

+		/* now replace old buffers with new */

+		rte_free(dev->data->txq_bufs);

+		dev->data->txq_bufs = new_bufs;

 	}

 	dev->data->nb_tx_queues = nb_queues;

 	return 0;

diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h

index bada8ad..23faa6a 100644

--- a/lib/librte_ether/rte_ethdev.h

+++ b/lib/librte_ether/rte_ethdev.h

@@ -1,7 +1,7 @@

 /*-

  *   BSD LICENSE

  *

- *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.

+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.

  *   All rights reserved.

  *

  *   Redistribution and use in source and binary forms, with or without

@@ -182,6 +182,7 @@ extern "C" {

 #include <rte_pci.h>

 #include <rte_dev.h>

 #include <rte_devargs.h>

+#include <rte_branch_prediction.h>

 #include "rte_ether.h"

 #include "rte_eth_ctrl.h"

 #include "rte_dev_info.h"

@@ -1519,6 +1520,34 @@ enum rte_eth_dev_type {

 	RTE_ETH_DEV_MAX		/**< max value of this enum */

 };

 

+typedef void (*buffer_tx_error_fn)(struct rte_mbuf **unsent, uint16_t count,

+		void *userdata);

+

+/**

+ * @internal

+ * Structure used to buffer packets for future TX

+ * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush

+ */

+struct rte_eth_dev_tx_buffer {

+	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];

+	unsigned nb_pkts;

+	uint64_t errors;

+	/**< Total number of queue packets to sent that are dropped. */

+};

+

+/**

+ * @internal

+ * Structure to hold a callback to be used on error when a tx_buffer_flush

+ * call fails to send all packets.

+ * This needs to be a separate structure, as it must go in the ethdev structure

+ * rather than ethdev_data, due to the use of a function pointer, which is not

+ * multi-process safe.

+ */

+struct rte_eth_dev_tx_buffer_err_cb {

+	buffer_tx_error_fn flush_cb; /* callback for when tx_burst fails */

+	void *userdata;              /* userdata for callback */

+};

+

 /**

  * @internal

  * The generic data structure associated with each ethernet device.

@@ -1550,6 +1579,9 @@ struct rte_eth_dev {

 	struct rte_eth_rxtx_callback *pre_tx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];

 	uint8_t attached; /**< Flag indicating the port is attached */

 	enum rte_eth_dev_type dev_type; /**< Flag indicating the device type */

+

+	/** Callbacks to be used on a tx_buffer_flush error */

+	struct rte_eth_dev_tx_buffer_err_cb tx_buf_err_cb[RTE_MAX_QUEUES_PER_PORT];

 };

 

 struct rte_eth_dev_sriov {

@@ -1610,6 +1642,8 @@ struct rte_eth_dev_data {

 	enum rte_kernel_driver kdrv;    /**< Kernel driver passthrough */

 	int numa_node;  /**< NUMA node connection */

 	const char *drv_name;   /**< Driver name */

+	struct rte_eth_dev_tx_buffer *txq_bufs;

+	/**< space to allow buffered transmits */

 };

 

 /** Device supports hotplug detach */

@@ -2661,8 +2695,181 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,

 }

 

 /**

- * The eth device event type for interrupt, and maybe others in the future.

+ * Buffer a single packet for future transmission on a port and queue

+ *

+ * This function takes a single mbuf/packet and buffers it for later

+ * transmission on the particular port and queue specified. Once the buffer is

+ * full of packets, an attempt will be made to transmit all the buffered

+ * packets. In case of error, where not all packets can be transmitted, a

+ * callback is called with the unsent packets as a parameter. If no callback

+ * is explicitly set up, the unsent packets are just freed back to the owning

+ * mempool. The function returns the number of packets actually sent i.e.

+ * 0 if no buffer flush occurred, otherwise the number of packets successfully

+ * flushed

+ *

+ * @param port_id

+ *   The port identifier of the Ethernet device.

+ * @param queue_id

+ *   The index of the transmit queue through which output packets must be

+ *   sent.

+ *   The value must be in the range [0, nb_tx_queue - 1] previously supplied

+ *   to rte_eth_dev_configure().

+ * @param tx_pkt

+ *   Pointer to the packet mbuf to be sent.

+ * @return

+ *   0 = packet has been buffered for later transmission

+ *   N > 0 = packet has been buffered, and the buffer was subsequently flushed,

+ *     causing N packets to be sent, and the error callback to be called for

+ *     the rest.

+ */

+static inline uint16_t __attribute__((always_inline))

+rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id, struct rte_mbuf *tx_pkt)

+{

+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];

+	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];

+	uint16_t i;

+

+	qbuf->pkts[qbuf->nb_pkts++] = tx_pkt;

+	if (qbuf->nb_pkts < RTE_ETHDEV_TX_BUFSIZE)

+		return 0;

+

+	const uint16_t sent = rte_eth_tx_burst(port_id, queue_id, qbuf->pkts,

+			RTE_ETHDEV_TX_BUFSIZE);

+

+	qbuf->nb_pkts = 0;

+

+	/* All packets sent, or to be dealt with by callback below */

+	if (unlikely(sent != RTE_ETHDEV_TX_BUFSIZE)) {

+		if (dev->tx_buf_err_cb[queue_id].flush_cb)

+			dev->tx_buf_err_cb[queue_id].flush_cb(&qbuf->pkts[sent],

+					RTE_ETHDEV_TX_BUFSIZE - sent,

+					dev->tx_buf_err_cb[queue_id].userdata);

+		else {

+			qbuf->errors += RTE_ETHDEV_TX_BUFSIZE - sent;

+			for (i = sent; i < RTE_ETHDEV_TX_BUFSIZE; i++)

+				rte_pktmbuf_free(qbuf->pkts[i]);

+		}

+	}

+

+	return sent;

+}

+

+/**

+ * Send any packets queued up for transmission on a port and HW queue

+ *

+ * This causes an explicit flush of packets previously buffered via the

+ * rte_eth_tx_buffer() function. It returns the number of packets successfully

+ * sent to the NIC, and calls the error callback for any unsent packets. Unless

+ * explicitly set up otherwise, the default callback simply frees the unsent

+ * packets back to the owning mempool.

+ *

+ * @param port_id

+ *   The port identifier of the Ethernet device.

+ * @param queue_id

+ *   The index of the transmit queue through which output packets must be

+ *   sent.

+ *   The value must be in the range [0, nb_tx_queue - 1] previously supplied

+ *   to rte_eth_dev_configure().

+ * @return

+ *   The number of packets successfully sent to the Ethernet device. The error

+ *   callback is called for any packets which could not be sent.

+ */

+static inline uint16_t

+rte_eth_tx_buffer_flush(uint8_t port_id, uint16_t queue_id)

+{

+	uint16_t i;

+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];

+	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];

+

+	if (qbuf->nb_pkts == 0)

+		return 0;

+

+	const uint16_t to_send = qbuf->nb_pkts;

+

+	const uint16_t sent = rte_eth_tx_burst(port_id, queue_id, qbuf->pkts,

+			to_send);

+

+	qbuf->nb_pkts = 0;

+

+	/* All packets sent, or to be dealt with by callback below */

+	if (unlikely(sent != to_send)) {

+		if (dev->tx_buf_err_cb[queue_id].flush_cb)

+			dev->tx_buf_err_cb[queue_id].flush_cb(&qbuf->pkts[sent],

+					to_send - sent,

+					dev->tx_buf_err_cb[queue_id].userdata);

+		else {

+			qbuf->errors += to_send - sent;

+			for (i = sent; i < to_send; i++)

+				rte_pktmbuf_free(qbuf->pkts[i]);

+		}

+	}

+

+	return sent;

+}

+

+/**

+ * Configure a callback for buffered packets which cannot be sent

+ *

+ * Register a specific callback to be called when an attempt is made to send

+ * all packets buffered on an ethernet port, but not all packets can

+ * successfully be sent. The callback registered here will be called only

+ * from calls to rte_eth_tx_buffer() and rte_eth_tx_buffer_flush() APIs.

+ * The default callback configured for each queue by default just frees the

+ * packets back to the calling mempool. If additional behaviour is required,

+ * for example, to count dropped packets, or to retry transmission of packets

+ * which cannot be sent, this function should be used to register a suitable

+ * callback function to implement the desired behaviour.

+ * The example callback "rte_eth_count_unsent_packet_callback()" is also

+ * provided as reference.

+ *

+ * @param port_id

+ *   The port identifier of the Ethernet device.

+ * @param queue_id

+ *   The index of the transmit queue through which output packets must be

+ *   sent.

+ *   The value must be in the range [0, nb_tx_queue - 1] previously supplied

+ *   to rte_eth_dev_configure().

+ * @param cbfn

+ *   The function to be used as the callback.

+ * @param userdata

+ *   Arbitrary parameter to be passed to the callback function

+ * @return

+ *   0 on success, or -1 on error with rte_errno set appropriately

  */

+int

+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,

+		buffer_tx_error_fn cbfn, void *userdata);

+

+/**

+ * Callback function for tracking unsent buffered packets.

+ *

+ * This function can be passed to rte_eth_tx_buffer_set_err_callback() to

+ * adjust the default behaviour when buffered packets cannot be sent. This

+ * function drops any unsent packets, but also updates a user-supplied counter

+ * to track the overall number of packets dropped. The counter should be an

+ * unsigned long variable.

+ *

+ * NOTE: this function should not be called directly, instead it should be used

+ *       as a callback for packet buffering.

+ *

+ * NOTE: when configuring this function as a callback with

+ *       rte_eth_tx_buffer_set_err_callback(), the final, userdata parameter

+ *       should point to an unsigned long value.

+ *

+ * @param pkts

+ *   The previously buffered packets which could not be sent

+ * @param unsent

+ *   The number of unsent packets in the pkts array

+ * @param userdata

+ *   Pointer to an unsigned long value, which will be incremented by unsent

+ */

+void

+rte_eth_count_unsent_packet_callback(struct rte_mbuf **pkts, uint16_t unsent,

+		void *userdata);

+

+/**

+* The eth device event type for interrupt, and maybe others in the future.

+*/

 enum rte_eth_event_type {

 	RTE_ETH_EVENT_UNKNOWN,  /**< unknown event type */

 	RTE_ETH_EVENT_INTR_LSC, /**< lsc interrupt event */

diff --git a/lib/librte_ether/rte_ether_version.map b/lib/librte_ether/rte_ether_version.map

index d8db24d..c2019d6 100644

--- a/lib/librte_ether/rte_ether_version.map

+++ b/lib/librte_ether/rte_ether_version.map

@@ -117,3 +117,11 @@ DPDK_2.2 {

 

 	local: *;

 };

+

+DPDK_2.3 {

+	global:

+

+	rte_eth_count_unsent_packet_callback;

+	rte_eth_tx_buffer_set_err_callback;

+

+} DPDK_2.2;

-- 

1.7.9.5





More information about the dev mailing list