[dpdk-dev] [RFC PATCH 1/3] ethdev: introduce eth_queue_local

Konstantin Ananyev konstantin.ananyev at intel.com
Fri Dec 1 15:48:00 CET 2017


Introduce a separate data structure (rte_eth_queue_local)
to store local to given process (i.e. no-shareable) information
for each configured rx/tx queue.
Memory for that structure is allocated/freed dynamically during
rte_eth_dev_configure().
Put a placeholder pointers for queue specific (rx|tx)_pkt_burst(),
tx_pkt_prepare() functions inside that structure.
Move rx/tx callback related information inside that structure.
That introduces a change in current behavior: all callbacks for
un-configured queues will be automatically removed.
Let say: rte_eth_dev_configure(port, 0, 0, ...);
would wipe out all installed callbacks for that port.


Signed-off-by: Konstantin Ananyev <konstantin.ananyev at intel.com>
---
 lib/librte_ether/rte_ethdev.c | 228 +++++++++++++++++++++++++++++-------------
 lib/librte_ether/rte_ethdev.h |  85 +++++++++++-----
 2 files changed, 216 insertions(+), 97 deletions(-)

diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 318af2869..ff8571d60 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -241,6 +241,14 @@ rte_eth_dev_allocate(const char *name)
 	return eth_dev;
 }
 
+static struct rte_eth_queue_local *
+alloc_queue_local(uint16_t nb_queues)
+{
+	struct rte_eth_queue_local *ql;
+	ql = rte_zmalloc(NULL, sizeof(ql[0]) * nb_queues, RTE_CACHE_LINE_SIZE);
+	return ql;
+}
+
 /*
  * Attach to a port already registered by the primary process, which
  * makes sure that the same device would have the same port id both
@@ -269,6 +277,16 @@ rte_eth_dev_attach_secondary(const char *name)
 	eth_dev = eth_dev_get(i);
 	RTE_ASSERT(eth_dev->data->port_id == i);
 
+	/*
+	 * obviously it is quite error prone:
+	 * if the primary process will decide to (re)configure device later,
+	 * there is no way for secondary one to notice that and update
+	 * it's local data (queue_local, rx|tx_pkt_burst, etc.).
+	 * We probably need an eth_dev_configure_secondary() or so.
+	 */
+	eth_dev->rx_ql = alloc_queue_local(eth_dev->data->nb_rx_queues);
+	eth_dev->tx_ql = alloc_queue_local(eth_dev->data->nb_tx_queues);
+
 	return eth_dev;
 }
 
@@ -444,52 +462,92 @@ rte_eth_dev_detach(uint16_t port_id, char *name)
 	return ret;
 }
 
+/*
+ * Helper routine - removes all registered RX/TX queue callbacks
+ * from the FIFO list.
+ * It is a caller responsibility to make sure no actual RX/TX happens
+ * simultanesoulsy on that queue.
+ */
+static void
+free_rxtx_cbs(struct rte_eth_rxtx_callback *cbs)
+{
+	struct rte_eth_rxtx_callback *cb, *next;
+
+	for (cb = cbs; cb != NULL; cb = next) {
+		next = cb->next;
+		rte_free(cb);
+	}
+}
+
+static void
+reset_queue_local(struct rte_eth_queue_local *ql)
+{
+	free_rxtx_cbs(ql->cbs);
+	memset(ql, 0, sizeof(*ql));
+}
+
 static int
 rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 {
-	uint16_t old_nb_queues = dev->data->nb_rx_queues;
+	uint32_t diff, i, old_nb_queues;
+	struct rte_eth_queue_local *rlq;
 	void **rxq;
-	unsigned i;
 
-	if (dev->data->rx_queues == NULL && nb_queues != 0) { /* first time configuration */
-		dev->data->rx_queues = rte_zmalloc("ethdev->rx_queues",
-				sizeof(dev->data->rx_queues[0]) * nb_queues,
-				RTE_CACHE_LINE_SIZE);
-		if (dev->data->rx_queues == NULL) {
-			dev->data->nb_rx_queues = 0;
-			return -(ENOMEM);
-		}
-	} else if (dev->data->rx_queues != NULL && nb_queues != 0) { /* re-configure */
-		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP);
+	old_nb_queues = dev->data->nb_rx_queues;
+
+	/* same # of queues nothing need to be done */
+	if (nb_queues == old_nb_queues)
+		return 0;
+
+	rxq = dev->data->rx_queues;
+	rlq = dev->rx_ql;
 
-		rxq = dev->data->rx_queues;
+	/* reduce # of queues */
+	if (old_nb_queues > nb_queues) {
 
-		for (i = nb_queues; i < old_nb_queues; i++)
+		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
+			-ENOTSUP);
+
+		dev->data->nb_rx_queues = nb_queues;
+
+		/* free resources used by the queues we are going to remove */
+		for (i = nb_queues; i < old_nb_queues; i++) {
+			reset_queue_local(rlq + i);
 			(*dev->dev_ops->rx_queue_release)(rxq[i]);
-		rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues,
-				RTE_CACHE_LINE_SIZE);
-		if (rxq == NULL)
-			return -(ENOMEM);
-		if (nb_queues > old_nb_queues) {
-			uint16_t new_qs = nb_queues - old_nb_queues;
-
-			memset(rxq + old_nb_queues, 0,
-				sizeof(rxq[0]) * new_qs);
+			rxq[i] = 0;
 		}
 
-		dev->data->rx_queues = rxq;
+		if (nb_queues == 0) {
+			dev->data->rx_queues = NULL;
+			dev->rx_ql = NULL;
+			rte_free(rxq);
+			rte_free(rlq);
+		}
+
+		return 0;
+	}
 
-	} else if (dev->data->rx_queues != NULL && nb_queues == 0) {
-		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP);
+	/* increase # of queues */
 
-		rxq = dev->data->rx_queues;
+	diff = nb_queues - old_nb_queues;
 
-		for (i = nb_queues; i < old_nb_queues; i++)
-			(*dev->dev_ops->rx_queue_release)(rxq[i]);
+	rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues,
+			RTE_CACHE_LINE_SIZE);
+	rlq = rte_realloc(rlq, sizeof(rlq[0]) * nb_queues,
+			RTE_CACHE_LINE_SIZE);
 
-		rte_free(dev->data->rx_queues);
-		dev->data->rx_queues = NULL;
+	if (rxq != NULL) {
+		memset(rxq + old_nb_queues, 0, sizeof(rxq[0]) * diff);
+		dev->data->rx_queues = rxq;
+	}
+	if (rlq != NULL) {
+		memset(rlq + old_nb_queues, 0, sizeof(rlq[0]) * diff);
+		dev->rx_ql = rlq;
 	}
+
+	if (rxq == NULL || rlq == NULL)
+		return -ENOMEM;
+
 	dev->data->nb_rx_queues = nb_queues;
 	return 0;
 }
@@ -601,49 +659,65 @@ rte_eth_dev_tx_queue_stop(uint16_t port_id, uint16_t tx_queue_id)
 static int
 rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 {
-	uint16_t old_nb_queues = dev->data->nb_tx_queues;
+	uint32_t diff, i, old_nb_queues;
+	struct rte_eth_queue_local *tlq;
 	void **txq;
-	unsigned i;
 
-	if (dev->data->tx_queues == NULL && nb_queues != 0) { /* first time configuration */
-		dev->data->tx_queues = rte_zmalloc("ethdev->tx_queues",
-						   sizeof(dev->data->tx_queues[0]) * nb_queues,
-						   RTE_CACHE_LINE_SIZE);
-		if (dev->data->tx_queues == NULL) {
-			dev->data->nb_tx_queues = 0;
-			return -(ENOMEM);
-		}
-	} else if (dev->data->tx_queues != NULL && nb_queues != 0) { /* re-configure */
-		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
+	old_nb_queues = dev->data->nb_tx_queues;
+
+	/* same # of queues nothing need to be done */
+	if (nb_queues == old_nb_queues)
+		return 0;
+
+	txq = dev->data->tx_queues;
+	tlq = dev->tx_ql;
+
+	/* reduce # of queues */
+	if (old_nb_queues > nb_queues) {
+
+		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release,
+			-ENOTSUP);
 
-		txq = dev->data->tx_queues;
+		dev->data->nb_tx_queues = nb_queues;
 
-		for (i = nb_queues; i < old_nb_queues; i++)
+		/* free resources used by the queues we are going to remove */
+		for (i = nb_queues; i < old_nb_queues; i++) {
+			reset_queue_local(tlq + i);
 			(*dev->dev_ops->tx_queue_release)(txq[i]);
-		txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
-				  RTE_CACHE_LINE_SIZE);
-		if (txq == NULL)
-			return -ENOMEM;
-		if (nb_queues > old_nb_queues) {
-			uint16_t new_qs = nb_queues - old_nb_queues;
-
-			memset(txq + old_nb_queues, 0,
-			       sizeof(txq[0]) * new_qs);
+			txq[i] = 0;
 		}
 
-		dev->data->tx_queues = txq;
+		if (nb_queues == 0) {
+			dev->data->tx_queues = NULL;
+			dev->tx_ql = NULL;
+			rte_free(txq);
+			rte_free(tlq);
+		}
+
+		return 0;
+	}
 
-	} else if (dev->data->tx_queues != NULL && nb_queues == 0) {
-		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
+	/* increase # of queues */
 
-		txq = dev->data->tx_queues;
+	diff = nb_queues - old_nb_queues;
 
-		for (i = nb_queues; i < old_nb_queues; i++)
-			(*dev->dev_ops->tx_queue_release)(txq[i]);
+	txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
+			RTE_CACHE_LINE_SIZE);
+	tlq = rte_realloc(tlq, sizeof(tlq[0]) * nb_queues,
+			RTE_CACHE_LINE_SIZE);
 
-		rte_free(dev->data->tx_queues);
-		dev->data->tx_queues = NULL;
+	if (txq != NULL) {
+		memset(txq + old_nb_queues, 0, sizeof(txq[0]) * diff);
+		dev->data->tx_queues = txq;
+	}
+	if (tlq != NULL) {
+		memset(tlq + old_nb_queues, 0, sizeof(tlq[0]) * diff);
+		dev->tx_ql = tlq;
 	}
+
+	if (txq == NULL || tlq == NULL)
+		return -ENOMEM;
+
 	dev->data->nb_tx_queues = nb_queues;
 	return 0;
 }
@@ -1084,6 +1158,7 @@ void
 rte_eth_dev_close(uint16_t port_id)
 {
 	struct rte_eth_dev *dev;
+	uint32_t i;
 
 	RTE_ETH_VALID_PORTID_OR_RET(port_id);
 	dev = &rte_eth_devices[port_id];
@@ -1092,12 +1167,25 @@ rte_eth_dev_close(uint16_t port_id)
 	dev->data->dev_started = 0;
 	(*dev->dev_ops->dev_close)(dev);
 
+	/* free local resources for RX/TX queues */
+
+	for (i = 0; i != dev->data->nb_rx_queues; i++)
+		reset_queue_local(dev->rx_ql + i);
+
+	for (i = 0; i != dev->data->nb_tx_queues; i++)
+		reset_queue_local(dev->tx_ql + i);
+
 	dev->data->nb_rx_queues = 0;
 	rte_free(dev->data->rx_queues);
 	dev->data->rx_queues = NULL;
+	rte_free(dev->rx_ql);
+	dev->rx_ql = NULL;
+
 	dev->data->nb_tx_queues = 0;
 	rte_free(dev->data->tx_queues);
 	dev->data->tx_queues = NULL;
+	rte_free(dev->tx_ql);
+	dev->tx_ql = NULL;
 }
 
 int
@@ -3111,10 +3199,10 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
 	rte_spinlock_lock(&rte_eth_rx_cb_lock);
 	/* Add the callbacks in fifo order. */
 	struct rte_eth_rxtx_callback *tail =
-		rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+		rte_eth_devices[port_id].rx_ql[queue_id].cbs;
 
 	if (!tail) {
-		rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+		rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
 
 	} else {
 		while (tail->next)
@@ -3153,9 +3241,9 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
 
 	rte_spinlock_lock(&rte_eth_rx_cb_lock);
 	/* Add the callbacks at fisrt position*/
-	cb->next = rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+	cb->next = rte_eth_devices[port_id].rx_ql[queue_id].cbs;
 	rte_smp_wmb();
-	rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+	rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
 	rte_spinlock_unlock(&rte_eth_rx_cb_lock);
 
 	return cb;
@@ -3189,10 +3277,10 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
 	rte_spinlock_lock(&rte_eth_tx_cb_lock);
 	/* Add the callbacks in fifo order. */
 	struct rte_eth_rxtx_callback *tail =
-		rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
+		rte_eth_devices[port_id].tx_ql[queue_id].cbs;
 
 	if (!tail) {
-		rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb;
+		rte_eth_devices[port_id].tx_ql[queue_id].cbs = cb;
 
 	} else {
 		while (tail->next)
@@ -3223,7 +3311,7 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
 	int ret = -EINVAL;
 
 	rte_spinlock_lock(&rte_eth_rx_cb_lock);
-	prev_cb = &dev->post_rx_burst_cbs[queue_id];
+	prev_cb = &dev->rx_ql[queue_id].cbs;
 	for (; *prev_cb != NULL; prev_cb = &cb->next) {
 		cb = *prev_cb;
 		if (cb == user_cb) {
@@ -3257,7 +3345,7 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t queue_id,
 	struct rte_eth_rxtx_callback **prev_cb;
 
 	rte_spinlock_lock(&rte_eth_tx_cb_lock);
-	prev_cb = &dev->pre_tx_burst_cbs[queue_id];
+	prev_cb = &dev->tx_ql[queue_id].cbs;
 	for (; *prev_cb != NULL; prev_cb = &cb->next) {
 		cb = *prev_cb;
 		if (cb == user_cb) {
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 341c2d624..d62e1bcc3 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1694,6 +1694,10 @@ typedef uint16_t (*rte_tx_callback_fn)(uint16_t port, uint16_t queue,
  * @internal
  * Structure used to hold information about the callbacks to be called for a
  * queue on RX and TX.
+ * On RX: user-supplied functions called from rte_eth_rx_burst to post-process
+ * received packets before passing them to the user
+ * On TX: user-supplied functions called from rte_eth_tx_burst to pre-process
+ * received packets before passing them to the driver for transmission.
  */
 struct rte_eth_rxtx_callback {
 	struct rte_eth_rxtx_callback *next;
@@ -1715,6 +1719,24 @@ enum rte_eth_dev_state {
 
 /**
  * @internal
+ * Structure to hold process non-shareable information
+ * about RX/TX queue of the ethernet device.
+ */
+struct rte_eth_queue_local {
+	/**
+	 * placeholder for queue specific rx/tx and tx_preapare
+	 * functions pointers
+	 */
+	eth_rx_burst_t rx_pkt_burst; /**< receive function pointer. */
+	eth_tx_burst_t tx_pkt_burst; /**< transmit function pointer. */
+	eth_tx_prep_t tx_pkt_prepare; /**< transmit prepare function pointer. */
+
+	struct rte_eth_rxtx_callback *cbs;
+	/**< list of user supplied callbacks */
+} __rte_cache_aligned;
+
+/**
+ * @internal
  * The generic data structure associated with each ethernet device.
  *
  * Pointers to burst-oriented packet receive and transmit functions are
@@ -1730,19 +1752,13 @@ struct rte_eth_dev {
 	struct rte_eth_dev_data *data;  /**< Pointer to device data */
 	const struct eth_dev_ops *dev_ops; /**< Functions exported by PMD */
 	struct rte_device *device; /**< Backing device */
+
+	struct rte_eth_queue_local *rx_ql;  /**< RX queues local data */
+	struct rte_eth_queue_local *tx_ql;  /**< TX queues local data */
+
 	struct rte_intr_handle *intr_handle; /**< Device interrupt handle */
 	/** User application callbacks for NIC interrupts */
 	struct rte_eth_dev_cb_list link_intr_cbs;
-	/**
-	 * User-supplied functions called from rx_burst to post-process
-	 * received packets before passing them to the user
-	 */
-	struct rte_eth_rxtx_callback *post_rx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
-	/**
-	 * User-supplied functions called from tx_burst to pre-process
-	 * received packets before passing them to the driver for transmission.
-	 */
-	struct rte_eth_rxtx_callback *pre_tx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
 	enum rte_eth_dev_state state; /**< Flag indicating the port state */
 	void *security_ctx; /**< Context for security ops */
 } __rte_cache_aligned;
@@ -2883,29 +2899,37 @@ static inline uint16_t
 rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
 		 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
 {
+	uint16_t nb_rx;
 	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
 
 #ifdef RTE_LIBRTE_ETHDEV_DEBUG
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
 	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);
 
-	if (queue_id >= dev->data->nb_rx_queues) {
+	if (queue_id >= dev->data->nb_rx_queues || dev->rx_ql == NULL) {
 		RTE_PMD_DEBUG_TRACE("Invalid RX queue_id=%d\n", queue_id);
 		return 0;
 	}
 #endif
-	int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
+	nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
 			rx_pkts, nb_pkts);
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-	struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
+	{
+		struct rte_eth_queue_local *ql;
+		struct rte_eth_rxtx_callback *cb;
 
-	if (unlikely(cb != NULL)) {
-		do {
-			nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
+		ql = dev->rx_ql + queue_id;
+		cb = ql->cbs;
+
+		if (unlikely(cb != NULL)) {
+			do {
+				nb_rx = cb->fn.rx(port_id, queue_id,
+						rx_pkts, nb_rx,
 						nb_pkts, cb->param);
-			cb = cb->next;
-		} while (cb != NULL);
+				cb = cb->next;
+			} while (cb != NULL);
+		}
 	}
 #endif
 
@@ -3151,21 +3175,28 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
 	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);
 
-	if (queue_id >= dev->data->nb_tx_queues) {
+	if (queue_id >= dev->data->nb_tx_queues || dev->tx_ql == NULL) {
 		RTE_PMD_DEBUG_TRACE("Invalid TX queue_id=%d\n", queue_id);
 		return 0;
 	}
 #endif
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-	struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
-
-	if (unlikely(cb != NULL)) {
-		do {
-			nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
-					cb->param);
-			cb = cb->next;
-		} while (cb != NULL);
+	{
+		struct rte_eth_queue_local *ql;
+		struct rte_eth_rxtx_callback *cb;
+
+		ql = dev->tx_ql + queue_id;
+		cb = ql->cbs;
+
+		if (unlikely(cb != NULL)) {
+			do {
+				nb_pkts = cb->fn.tx(port_id, queue_id,
+						tx_pkts, nb_pkts,
+						cb->param);
+				cb = cb->next;
+			} while (cb != NULL);
+		}
 	}
 #endif
 
-- 
2.13.5



More information about the dev mailing list