@@ -482,7 +482,7 @@ free_rxtx_cbs(struct rte_eth_rxtx_callback *cbs)
static void
reset_queue_local(struct rte_eth_queue_local *ql)
{
- free_rxtx_cbs(ql->cbs);
+ free_rxtx_cbs(ql->cbs.head);
memset(ql, 0, sizeof(*ql));
}
@@ -3172,6 +3172,79 @@ rte_eth_dev_filter_ctrl(uint16_t port_id, enum rte_filter_type filter_type,
return (*dev->dev_ops->filter_ctrl)(dev, filter_type, filter_op, arg);
}
+#ifdef RTE_ETHDEV_RXTX_CALLBACKS
+
+/*
+ * Helper routine - contains common code to add RX/TX queue callbacks
+ * to the list.
+ */
+static struct rte_eth_rxtx_callback *
+add_rxtx_callback(struct rte_eth_rxtx_cbs *cbs, int32_t first,
+ struct rte_eth_rxtx_callback *cb, rte_spinlock_t *lock)
+{
+ struct rte_eth_rxtx_callback *tail, **pt;
+
+ rte_spinlock_lock(lock);
+
+ /* Add callback to the head of the list. */
+ if (first != 0) {
+ cb->next = cbs->head;
+ rte_smp_wmb();
+ cbs->head = cb;
+
+ /* Add callback to the tail of the list. */
+ } else {
+ for (pt = &cbs->head; *pt != NULL; pt = &tail->next)
+ tail = *pt;
+
+ *pt = cb;
+ }
+
+ rte_spinlock_unlock(lock);
+ return cb;
+}
+
+/*
+ * Helper routine - contains common code to delete RX/TX queue callbacks
+ * from the FIFO list.
+ */
+static int
+del_rxtx_callback(struct rte_eth_rxtx_cbs *cbs,
+ struct rte_eth_rxtx_callback *user_cb, rte_spinlock_t *lock)
+{
+ int32_t ret;
+ struct rte_eth_rxtx_callback *cb, **prev_cb;
+
+ ret = -EINVAL;
+ rte_spinlock_lock(lock);
+
+ for (prev_cb = &cbs->head; *prev_cb != NULL; prev_cb = &cb->next) {
+
+ cb = *prev_cb;
+ if (cb == user_cb) {
+ /* Remove the user cb from the callback list. */
+ *prev_cb = cb->next;
+ ret = 0;
+ break;
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ /*
+ * first make sure datapath doesn't use removed callback anymore,
+ * then free the callback structure.
+ */
+ if (ret == 0) {
+ __rte_eth_rxtx_cbs_wait(cbs);
+ rte_free(cb);
+ }
+
+ return ret;
+}
+
+#endif /* RTE_ETHDEV_RXTX_CALLBACKS */
+
void *
rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
rte_rx_callback_fn fn, void *user_param)
@@ -3180,14 +3253,16 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
rte_errno = ENOTSUP;
return NULL;
#endif
+ struct rte_eth_rxtx_callback *cb;
+
/* check input parameters */
if (!rte_eth_dev_is_valid_port(port_id) || fn == NULL ||
queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
rte_errno = EINVAL;
return NULL;
}
- struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
+ cb = rte_zmalloc(NULL, sizeof(*cb), 0);
if (cb == NULL) {
rte_errno = ENOMEM;
return NULL;
@@ -3196,22 +3271,8 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
cb->fn.rx = fn;
cb->param = user_param;
- rte_spinlock_lock(&rte_eth_rx_cb_lock);
- /* Add the callbacks in fifo order. */
- struct rte_eth_rxtx_callback *tail =
- rte_eth_devices[port_id].rx_ql[queue_id].cbs;
-
- if (!tail) {
- rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
-
- } else {
- while (tail->next)
- tail = tail->next;
- tail->next = cb;
- }
- rte_spinlock_unlock(&rte_eth_rx_cb_lock);
-
- return cb;
+ return add_rxtx_callback(&rte_eth_devices[port_id].rx_ql[queue_id].cbs,
+ 0, cb, &rte_eth_rx_cb_lock);
}
void *
@@ -3222,6 +3283,8 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
rte_errno = ENOTSUP;
return NULL;
#endif
+ struct rte_eth_rxtx_callback *cb;
+
/* check input parameters */
if (!rte_eth_dev_is_valid_port(port_id) || fn == NULL ||
queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
@@ -3229,7 +3292,7 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
return NULL;
}
- struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
+ cb = rte_zmalloc(NULL, sizeof(*cb), 0);
if (cb == NULL) {
rte_errno = ENOMEM;
@@ -3239,14 +3302,8 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
cb->fn.rx = fn;
cb->param = user_param;
- rte_spinlock_lock(&rte_eth_rx_cb_lock);
- /* Add the callbacks at fisrt position*/
- cb->next = rte_eth_devices[port_id].rx_ql[queue_id].cbs;
- rte_smp_wmb();
- rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
- rte_spinlock_unlock(&rte_eth_rx_cb_lock);
-
- return cb;
+ return add_rxtx_callback(&rte_eth_devices[port_id].rx_ql[queue_id].cbs,
+ 1, cb, &rte_eth_rx_cb_lock);
}
void *
@@ -3257,6 +3314,8 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
rte_errno = ENOTSUP;
return NULL;
#endif
+ struct rte_eth_rxtx_callback *cb;
+
/* check input parameters */
if (!rte_eth_dev_is_valid_port(port_id) || fn == NULL ||
queue_id >= rte_eth_devices[port_id].data->nb_tx_queues) {
@@ -3264,8 +3323,7 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
return NULL;
}
- struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
-
+ cb = rte_zmalloc(NULL, sizeof(*cb), 0);
if (cb == NULL) {
rte_errno = ENOMEM;
return NULL;
@@ -3274,22 +3332,8 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
cb->fn.tx = fn;
cb->param = user_param;
- rte_spinlock_lock(&rte_eth_tx_cb_lock);
- /* Add the callbacks in fifo order. */
- struct rte_eth_rxtx_callback *tail =
- rte_eth_devices[port_id].tx_ql[queue_id].cbs;
-
- if (!tail) {
- rte_eth_devices[port_id].tx_ql[queue_id].cbs = cb;
-
- } else {
- while (tail->next)
- tail = tail->next;
- tail->next = cb;
- }
- rte_spinlock_unlock(&rte_eth_tx_cb_lock);
-
- return cb;
+ return add_rxtx_callback(&rte_eth_devices[port_id].tx_ql[queue_id].cbs,
+ 0, cb, &rte_eth_tx_cb_lock);
}
int
@@ -3299,31 +3343,16 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
#ifndef RTE_ETHDEV_RXTX_CALLBACKS
return -ENOTSUP;
#endif
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+
/* Check input parameters. */
RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
if (user_cb == NULL ||
queue_id >= rte_eth_devices[port_id].data->nb_rx_queues)
return -EINVAL;
- struct rte_eth_dev *dev = &rte_eth_devices[port_id];
- struct rte_eth_rxtx_callback *cb;
- struct rte_eth_rxtx_callback **prev_cb;
- int ret = -EINVAL;
-
- rte_spinlock_lock(&rte_eth_rx_cb_lock);
- prev_cb = &dev->rx_ql[queue_id].cbs;
- for (; *prev_cb != NULL; prev_cb = &cb->next) {
- cb = *prev_cb;
- if (cb == user_cb) {
- /* Remove the user cb from the callback list. */
- *prev_cb = cb->next;
- ret = 0;
- break;
- }
- }
- rte_spinlock_unlock(&rte_eth_rx_cb_lock);
-
- return ret;
+ return del_rxtx_callback(&dev->rx_ql[queue_id].cbs, user_cb,
+ &rte_eth_rx_cb_lock);
}
int
@@ -3333,31 +3362,16 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t queue_id,
#ifndef RTE_ETHDEV_RXTX_CALLBACKS
return -ENOTSUP;
#endif
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+
/* Check input parameters. */
RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
if (user_cb == NULL ||
queue_id >= rte_eth_devices[port_id].data->nb_tx_queues)
return -EINVAL;
- struct rte_eth_dev *dev = &rte_eth_devices[port_id];
- int ret = -EINVAL;
- struct rte_eth_rxtx_callback *cb;
- struct rte_eth_rxtx_callback **prev_cb;
-
- rte_spinlock_lock(&rte_eth_tx_cb_lock);
- prev_cb = &dev->tx_ql[queue_id].cbs;
- for (; *prev_cb != NULL; prev_cb = &cb->next) {
- cb = *prev_cb;
- if (cb == user_cb) {
- /* Remove the user cb from the callback list. */
- *prev_cb = cb->next;
- ret = 0;
- break;
- }
- }
- rte_spinlock_unlock(&rte_eth_tx_cb_lock);
-
- return ret;
+ return del_rxtx_callback(&dev->tx_ql[queue_id].cbs, user_cb,
+ &rte_eth_tx_cb_lock);
}
int
@@ -1709,6 +1709,23 @@ struct rte_eth_rxtx_callback {
};
/**
+ * @internal
+ * Structure used to hold list of RX/TX callbacks, plus usage counter.
+ * Usage counter is incremented each time (rx|tx)_burst starts/stops
+ * using callback list.
+ */
+struct rte_eth_rxtx_cbs {
+ struct rte_eth_rxtx_callback *head; /**< head of callbacks list */
+ uint32_t use; /**< usage counter */
+};
+
+/*
+ * Odd number means that callback list is used by datapath (RX/TX)
+ * Even number means that callback list is not used by datapath (RX/TX)
+ */
+#define RTE_ETH_RXTX_CBS_INUSE 1
+
+/**
* A set of values to describe the possible states of an eth device.
*/
enum rte_eth_dev_state {
@@ -1731,7 +1748,7 @@ struct rte_eth_queue_local {
eth_tx_burst_t tx_pkt_burst; /**< transmit function pointer. */
eth_tx_prep_t tx_pkt_prepare; /**< transmit prepare function pointer. */
- struct rte_eth_rxtx_callback *cbs;
+ struct rte_eth_rxtx_cbs cbs;
/**< list of user supplied callbacks */
} __rte_cache_aligned;
@@ -2814,6 +2831,60 @@ int rte_eth_dev_get_vlan_offload(uint16_t port_id);
int rte_eth_dev_set_vlan_pvid(uint16_t port_id, uint16_t pvid, int on);
/**
+ * @internal
+ * Marks given callback list as used by datapath (RX/TX).
+ * @param cbs
+ * Pointer to the callback list structure.
+ */
+static __rte_always_inline void
+__rte_eth_rxtx_cbs_inuse(struct rte_eth_rxtx_cbs *cbs)
+{
+ cbs->use++;
+ /* make sure no store/load reordering could happen */
+ rte_smp_mb();
+}
+
+/**
+ * @internal
+ * Marks given callback list as not used by datapath (RX/TX).
+ * @param cbs
+ * Pointer to the callback list structure.
+ */
+static __rte_always_inline void
+__rte_eth_rxtx_cbs_unuse(struct rte_eth_rxtx_cbs *cbs)
+{
+ /* make sure all previous loads are completed */
+ rte_smp_rmb();
+ cbs->use++;
+}
+
+/**
+ * @internal
+ * Waits till datapath (RX/TX) finished using given callback list.
+ * @param cbs
+ * Pointer to the callback list structure.
+ */
+static inline void
+__rte_eth_rxtx_cbs_wait(const struct rte_eth_rxtx_cbs *cbs)
+{
+ uint32_t nuse, puse;
+
+ /* make sure all previous loads and stores are completed */
+ rte_smp_mb();
+
+ puse = cbs->use;
+
+ /* in use, busy wait till current RX/TX iteration is finished */
+ if ((puse & RTE_ETH_RXTX_CBS_INUSE) != 0) {
+ do {
+ rte_pause();
+ rte_compiler_barrier();
+ nuse = cbs->use;
+ } while (nuse == puse);
+ }
+}
+
+/**
*
* Retrieve a burst of input packets from a receive queue of an Ethernet
* device. The retrieved packets are stored in *rte_mbuf* structures whose
@@ -2911,6 +2982,7 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
return 0;
}
#endif
+
nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
rx_pkts, nb_pkts);
@@ -2920,15 +2992,15 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
struct rte_eth_rxtx_callback *cb;
ql = dev->rx_ql + queue_id;
- cb = ql->cbs;
-
- if (unlikely(cb != NULL)) {
- do {
- nb_rx = cb->fn.rx(port_id, queue_id,
- rx_pkts, nb_rx,
- nb_pkts, cb->param);
- cb = cb->next;
- } while (cb != NULL);
+ if (unlikely(ql->cbs.head != NULL)) {
+
+ __rte_eth_rxtx_cbs_inuse(&ql->cbs);
+
+ for (cb = ql->cbs.head; cb != NULL; cb = cb->next)
+ nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts,
+ nb_rx, nb_pkts, cb->param);
+
+ __rte_eth_rxtx_cbs_unuse(&ql->cbs);
}
}
#endif
@@ -3187,20 +3259,21 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
struct rte_eth_rxtx_callback *cb;
ql = dev->tx_ql + queue_id;
- cb = ql->cbs;
-
- if (unlikely(cb != NULL)) {
- do {
- nb_pkts = cb->fn.tx(port_id, queue_id,
- tx_pkts, nb_pkts,
- cb->param);
- cb = cb->next;
- } while (cb != NULL);
+ if (unlikely(ql->cbs.head != NULL)) {
+
+ __rte_eth_rxtx_cbs_inuse(&ql->cbs);
+
+ for (cb = ql->cbs.head; cb != NULL; cb = cb->next)
+ nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts,
+ nb_pkts, cb->param);
+
+ __rte_eth_rxtx_cbs_unuse(&ql->cbs);
}
}
#endif
- return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
+ return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts,
+ nb_pkts);
}
/**
@@ -4199,16 +4272,10 @@ void *rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
* This function is used to removed callbacks that were added to a NIC port
* queue using rte_eth_add_rx_callback().
*
- * Note: the callback is removed from the callback list but it isn't freed
- * since the it may still be in use. The memory for the callback can be
- * subsequently freed back by the application by calling rte_free():
- *
- * - Immediately - if the port is stopped, or the user knows that no
- * callbacks are in flight e.g. if called from the thread doing RX/TX
- * on that queue.
- *
- * - After a short delay - where the delay is sufficient to allow any
- * in-flight callbacks to complete.
+ * Note: that after callback is removed from the callback list associated
+ * with it memory is freed, and user shouldn't refer it any more.
+ * After successfull completion of that function user can safely release
+ * any resources associated with that callback.
*
* @param port_id
* The port identifier of the Ethernet device.