[dpdk-dev] [PATCHv2 1/5] librte_ether: protect add/remove of rxtx callbacks with spinlocks

Reshma Pattan reshma.pattan at intel.com
Tue May 10 11:39:58 CEST 2016


* added spinlocks around add/remove logic of rxtx callbacks to
  avoid corruption of callback lists in multithreaded context.

* added new public api rte_eth_add_first_rx_callback to add given
  callback as head of list.

* converted rte_eth_dev_get_port_by_name to public API.

* add new fields to rte_eth_dev_info struct
  New fields nb_rx_queues and nb_tx_queues are added to
  rte_eth_dev_info structure.
  Changes to API rte_eth_dev_info_get() are done to update
  these new fields to rte_eth_dev_info object.

Signed-off-by: Reshma Pattan <reshma.pattan at intel.com>
---
 lib/librte_ether/rte_ethdev.c          | 121 +++++++++++++++++++++------------
 lib/librte_ether/rte_ethdev.h          |  45 ++++++++++++
 lib/librte_ether/rte_ether_version.map |   9 +++
 3 files changed, 132 insertions(+), 43 deletions(-)

diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index a31018e..cd2bc17 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -77,6 +77,12 @@ static uint8_t nb_ports;
 /* spinlock for eth device callbacks */
 static rte_spinlock_t rte_eth_dev_cb_lock = RTE_SPINLOCK_INITIALIZER;
 
+/* spinlock for add/remove rx callbacks */
+static rte_spinlock_t rte_eth_rx_cb_lock = RTE_SPINLOCK_INITIALIZER;
+
+/* spinlock for add/remove tx callbacks */
+static rte_spinlock_t rte_eth_tx_cb_lock = RTE_SPINLOCK_INITIALIZER;
+
 /* store statistics names and its offset in stats structure  */
 struct rte_eth_xstats_name_off {
 	char name[RTE_ETH_XSTATS_NAME_SIZE];
@@ -421,7 +427,7 @@ rte_eth_dev_get_name_by_port(uint8_t port_id, char *name)
 	return 0;
 }
 
-static int
+int
 rte_eth_dev_get_port_by_name(const char *name, uint8_t *port_id)
 {
 	int i;
@@ -1639,7 +1645,6 @@ rte_eth_dev_set_rx_queue_stats_mapping(uint8_t port_id, uint16_t rx_queue_id,
 			STAT_QMAP_RX);
 }
 
-
 void
 rte_eth_dev_info_get(uint8_t port_id, struct rte_eth_dev_info *dev_info)
 {
@@ -1661,6 +1666,8 @@ rte_eth_dev_info_get(uint8_t port_id, struct rte_eth_dev_info *dev_info)
 	(*dev->dev_ops->dev_infos_get)(dev, dev_info);
 	dev_info->pci_dev = dev->pci_dev;
 	dev_info->driver_name = dev->data->drv_name;
+	dev_info->nb_rx_queues = dev->data->nb_rx_queues;
+	dev_info->nb_tx_queues = dev->data->nb_tx_queues;
 }
 
 int
@@ -2925,7 +2932,6 @@ rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
 		rte_errno = EINVAL;
 		return NULL;
 	}
-
 	struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
 
 	if (cb == NULL) {
@@ -2936,6 +2942,7 @@ rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
 	cb->fn.rx = fn;
 	cb->param = user_param;
 
+	rte_spinlock_lock(&rte_eth_rx_cb_lock);
 	/* Add the callbacks in fifo order. */
 	struct rte_eth_rxtx_callback *tail =
 		rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
@@ -2948,6 +2955,42 @@ rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
 			tail = tail->next;
 		tail->next = cb;
 	}
+	rte_spinlock_unlock(&rte_eth_rx_cb_lock);
+
+	return cb;
+}
+
+void *
+rte_eth_add_first_rx_callback(uint8_t port_id, uint16_t queue_id,
+		rte_rx_callback_fn fn, void *user_param)
+{
+#ifndef RTE_ETHDEV_RXTX_CALLBACKS
+	rte_errno = ENOTSUP;
+	return NULL;
+#endif
+	/* check input parameters */
+	if (!rte_eth_dev_is_valid_port(port_id) || fn == NULL ||
+		    queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
+
+	if (cb == NULL) {
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	cb->fn.rx = fn;
+	cb->param = user_param;
+
+	rte_spinlock_lock(&rte_eth_rx_cb_lock);
+	/* Add the callbacks at fisrt position*/
+	cb->next = rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+	rte_smp_wmb();
+	rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+	rte_spinlock_unlock(&rte_eth_rx_cb_lock);
 
 	return cb;
 }
@@ -2977,6 +3020,7 @@ rte_eth_add_tx_callback(uint8_t port_id, uint16_t queue_id,
 	cb->fn.tx = fn;
 	cb->param = user_param;
 
+	rte_spinlock_lock(&rte_eth_tx_cb_lock);
 	/* Add the callbacks in fifo order. */
 	struct rte_eth_rxtx_callback *tail =
 		rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
@@ -2989,6 +3033,7 @@ rte_eth_add_tx_callback(uint8_t port_id, uint16_t queue_id,
 			tail = tail->next;
 		tail->next = cb;
 	}
+	rte_spinlock_unlock(&rte_eth_tx_cb_lock);
 
 	return cb;
 }
@@ -3007,29 +3052,24 @@ rte_eth_remove_rx_callback(uint8_t port_id, uint16_t queue_id,
 	}
 
 	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
-	struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
-	struct rte_eth_rxtx_callback *prev_cb;
-
-	/* Reset head pointer and remove user cb if first in the list. */
-	if (cb == user_cb) {
-		dev->post_rx_burst_cbs[queue_id] = user_cb->next;
-		return 0;
-	}
-
-	/* Remove the user cb from the callback list. */
-	do {
-		prev_cb = cb;
-		cb = cb->next;
-
+	struct rte_eth_rxtx_callback *cb;
+	struct rte_eth_rxtx_callback **prev_cb;
+	int ret = -EINVAL;
+
+	rte_spinlock_lock(&rte_eth_rx_cb_lock);
+	prev_cb = &dev->post_rx_burst_cbs[queue_id];
+	for (; *prev_cb != NULL; prev_cb = &cb->next) {
+		cb = *prev_cb;
 		if (cb == user_cb) {
-			prev_cb->next = user_cb->next;
-			return 0;
+			/* Remove the user cb from the callback list. */
+			*prev_cb = cb->next;
+			ret = 0;
+			break;
 		}
+	}
+	rte_spinlock_unlock(&rte_eth_rx_cb_lock);
 
-	} while (cb != NULL);
-
-	/* Callback wasn't found. */
-	return -EINVAL;
+	return ret;
 }
 
 int
@@ -3046,29 +3086,24 @@ rte_eth_remove_tx_callback(uint8_t port_id, uint16_t queue_id,
 	}
 
 	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
-	struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
-	struct rte_eth_rxtx_callback *prev_cb;
-
-	/* Reset head pointer and remove user cb if first in the list. */
-	if (cb == user_cb) {
-		dev->pre_tx_burst_cbs[queue_id] = user_cb->next;
-		return 0;
-	}
-
-	/* Remove the user cb from the callback list. */
-	do {
-		prev_cb = cb;
-		cb = cb->next;
-
+	int ret = -EINVAL;
+	struct rte_eth_rxtx_callback *cb;
+	struct rte_eth_rxtx_callback **prev_cb;
+
+	rte_spinlock_lock(&rte_eth_tx_cb_lock);
+	prev_cb = &dev->pre_tx_burst_cbs[queue_id];
+	for (; *prev_cb != NULL; prev_cb = &cb->next) {
+		cb = *prev_cb;
 		if (cb == user_cb) {
-			prev_cb->next = user_cb->next;
-			return 0;
+			/* Remove the user cb from the callback list. */
+			*prev_cb = cb->next;
+			ret = 0;
+			break;
 		}
+	}
+	rte_spinlock_unlock(&rte_eth_tx_cb_lock);
 
-	} while (cb != NULL);
-
-	/* Callback wasn't found. */
-	return -EINVAL;
+	return ret;
 }
 
 int
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 2757510..499e694 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -882,6 +882,9 @@ struct rte_eth_dev_info {
 	struct rte_eth_desc_lim rx_desc_lim;  /**< RX descriptors limits */
 	struct rte_eth_desc_lim tx_desc_lim;  /**< TX descriptors limits */
 	uint32_t speed_capa;  /**< Supported speeds bitmap (ETH_LINK_SPEED_). */
+	/** Configured number of rx/tx queues */
+	uint16_t nb_rx_queues; /**< Number of RX queues. */
+	uint16_t nb_tx_queues; /**< Number of TX queues. */
 };
 
 /**
@@ -3826,6 +3829,33 @@ void *rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
 		rte_rx_callback_fn fn, void *user_param);
 
 /**
+ * Add a callback that must be called first on packet RX on a given port and queue.
+ *
+ * This API configures a first function to be called for each burst of
+ * packets received on a given NIC port queue. The return value is a pointer
+ * that can be used to later remove the callback using
+ * rte_eth_remove_rx_callback().
+ *
+ * Multiple functions are called in the order that they are added.
+ *
+ * @param port_id
+ *   The port identifier of the Ethernet device.
+ * @param queue_id
+ *   The queue on the Ethernet device on which the callback is to be added.
+ * @param fn
+ *   The callback function
+ * @param user_param
+ *   A generic pointer parameter which will be passed to each invocation of the
+ *   callback function on this port and queue.
+ *
+ * @return
+ *   NULL on error.
+ *   On success, a pointer value which can later be used to remove the callback.
+ */
+void *rte_eth_add_first_rx_callback(uint8_t port_id, uint16_t queue_id,
+		rte_rx_callback_fn fn, void *user_param);
+
+/**
  * Add a callback to be called on packet TX on a given port and queue.
  *
  * This API configures a function to be called for each burst of
@@ -4253,6 +4283,21 @@ rte_eth_dev_l2_tunnel_offload_set(uint8_t port_id,
 				  uint32_t mask,
 				  uint8_t en);
 
+/**
+ * Get the port id from pci adrress or device name
+ * Ex: 0000:2:00.0 or vdev name eth_pcap0
+ *
+ * @param name
+ *  pci address or name of the device
+ * @param port_id
+ *   pointer to port identifier of the device
+ * @return
+ *   - (0) if successful.
+ *   - (-ENODEV) on failure.
+ */
+int
+rte_eth_dev_get_port_by_name(const char *name, uint8_t *port_id);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_ether/rte_ether_version.map b/lib/librte_ether/rte_ether_version.map
index 214ecc7..4de1491 100644
--- a/lib/librte_ether/rte_ether_version.map
+++ b/lib/librte_ether/rte_ether_version.map
@@ -132,3 +132,12 @@ DPDK_16.04 {
 	rte_eth_tx_buffer_set_err_callback;
 
 } DPDK_2.2;
+
+DPDK_16.07 {
+	global:
+
+	rte_eth_add_first_rx_callback;
+	rte_eth_dev_get_port_by_name;
+	rte_eth_dev_info_get;
+
+} DPDK_16.04;
-- 
2.5.0



More information about the dev mailing list