[dpdk-dev] [PATCH] ring: assert on zero objects dequeue/enqueue
Lazaros Koromilas
l at nofutznetworks.com
Tue Mar 15 17:58:45 CET 2016
Issuing a zero objects dequeue with a single consumer has no effect.
Doing so with multiple consumers, can get more than one thread to succeed
the compare-and-set operation and observe starvation or even deadlock in
the while loop that checks for preceding dequeues. The problematic piece
of code when n = 0:
cons_next = cons_head + n;
success = rte_atomic32_cmpset(&r->cons.head, cons_head, cons_next);
The same is possible on the enqueue path.
Signed-off-by: Lazaros Koromilas <l at nofutznetworks.com>
---
lib/librte_ring/rte_ring.h | 26 ++++++++++++++++++++++++++
1 file changed, 26 insertions(+)
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 943c97c..2bf9ce3 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -100,6 +100,7 @@ extern "C" {
#include <rte_lcore.h>
#include <rte_atomic.h>
#include <rte_branch_prediction.h>
+#include <rte_debug.h>
#define RTE_TAILQ_RING_NAME "RTE_RING"
@@ -211,6 +212,19 @@ struct rte_ring {
#endif
/**
+ * @internal Assert macro.
+ * @param exp
+ * The expression to evaluate.
+ */
+#define RTE_RING_ASSERT(exp) do { \
+ if (!(exp)) { \
+ rte_panic("line%d\t" \
+ "assert \"" #exp "\" failed\n", \
+ __LINE__); \
+ } \
+ } while (0)
+
+/**
* Calculate the memory size needed for a ring
*
* This function returns the number of bytes needed for a ring, given
@@ -406,6 +420,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * Must be greater than zero.
* @param behavior
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
@@ -431,6 +446,8 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
uint32_t mask = r->prod.mask;
int ret;
+ RTE_RING_ASSERT(n > 0);
+
/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
@@ -510,6 +527,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
+ * Must be greater than zero.
* @param behavior
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
@@ -533,6 +551,8 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
uint32_t mask = r->prod.mask;
int ret;
+ RTE_RING_ASSERT(n > 0);
+
prod_head = r->prod.head;
cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32bits value
@@ -594,6 +614,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
* A pointer to a table of void * pointers (objects) that will be filled.
* @param n
* The number of objects to dequeue from the ring to the obj_table.
+ * Must be greater than zero.
* @param behavior
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
@@ -618,6 +639,8 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned i, rep = 0;
uint32_t mask = r->prod.mask;
+ RTE_RING_ASSERT(n > 0);
+
/* move cons.head atomically */
do {
/* Restore n as it may change every loop */
@@ -689,6 +712,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
* A pointer to a table of void * pointers (objects) that will be filled.
* @param n
* The number of objects to dequeue from the ring to the obj_table.
+ * Must be greater than zero.
* @param behavior
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
@@ -710,6 +734,8 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned i;
uint32_t mask = r->prod.mask;
+ RTE_RING_ASSERT(n > 0);
+
cons_head = r->cons.head;
prod_tail = r->prod.tail;
/* The subtraction is done between two unsigned 32bits value
--
1.9.1
More information about the dev
mailing list