[dpdk-dev,RFC,11/11] ring: reuse typed ring enqueue and dequeue functions

Message ID 1484147125-5948-12-git-send-email-bruce.richardson@intel.com (mailing list archive)
State RFC, archived
Delegated to: Ferruh Yigit
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel compilation fail Compilation issues

Commit Message

Bruce Richardson Jan. 11, 2017, 3:05 p.m. UTC
  reduce code duplicated between rte_ring.h and rte_typed_ring.h by having
the enqueue and dequeue code reused by rte_ring from rte_typed_ring.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_ring.h | 380 +--------------------------------------------
 1 file changed, 8 insertions(+), 372 deletions(-)
  

Patch

diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 4e74efd..25c4849 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -231,366 +231,10 @@  int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
  */
 void rte_ring_dump(FILE *f, const struct rte_ring *r);
 
-/**
- * @internal Enqueue several objects on the ring (multi-producers safe).
- *
- * This function uses a "compare and set" instruction to move the
- * producer index atomically.
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects).
- * @param n
- *   The number of objects to add in the ring from the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t prod_head, prod_next;
-	uint32_t cons_tail, free_entries;
-	const unsigned max = n;
-	int success;
-	unsigned i, rep = 0;
-	uint32_t mask = r->prod.mask;
-	int ret;
-
-	/* Avoid the unnecessary cmpset operation below, which is also
-	 * potentially harmful when n equals 0. */
-	if (n == 0)
-		return 0;
-
-	/* move prod.head atomically */
-	do {
-		/* Reset n to the initial burst count */
-		n = max;
-
-		prod_head = r->prod.head;
-		cons_tail = r->cons.tail;
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * prod_head > cons_tail). So 'free_entries' is always between 0
-		 * and size(ring)-1. */
-		free_entries = (mask + cons_tail - prod_head);
-
-		/* check that we have enough room in ring */
-		if (unlikely(n > free_entries)) {
-			if (behavior == RTE_RING_QUEUE_FIXED) {
-				__RING_STAT_ADD(r, enq_fail, n);
-				return -ENOBUFS;
-			}
-			else {
-				/* No free entry available */
-				if (unlikely(free_entries == 0)) {
-					__RING_STAT_ADD(r, enq_fail, n);
-					return 0;
-				}
-
-				n = free_entries;
-			}
-		}
-
-		prod_next = prod_head + n;
-		success = rte_atomic32_cmpset(&r->prod.head, prod_head,
-					      prod_next);
-	} while (unlikely(success == 0));
-
-	/* write entries in ring */
-	ENQUEUE_PTRS();
-	rte_smp_wmb();
-
-	/* if we exceed the watermark */
-	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-				(int)(n | RTE_RING_QUOT_EXCEED);
-		__RING_STAT_ADD(r, enq_quota, n);
-	}
-	else {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-		__RING_STAT_ADD(r, enq_success, n);
-	}
-
-	/*
-	 * If there are other enqueues in progress that preceded us,
-	 * we need to wait for them to complete
-	 */
-	while (unlikely(r->prod.tail != prod_head)) {
-		rte_pause();
-
-		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-		 * for other thread finish. It gives pre-empted thread a chance
-		 * to proceed and finish with ring dequeue operation. */
-		if (RTE_RING_PAUSE_REP_COUNT &&
-		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
-			rep = 0;
-			sched_yield();
-		}
-	}
-	r->prod.tail = prod_next;
-	return ret;
-}
-
-/**
- * @internal Enqueue several objects on a ring (NOT multi-producers safe).
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects).
- * @param n
- *   The number of objects to add in the ring from the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t prod_head, cons_tail;
-	uint32_t prod_next, free_entries;
-	unsigned i;
-	uint32_t mask = r->prod.mask;
-	int ret;
-
-	prod_head = r->prod.head;
-	cons_tail = r->cons.tail;
-	/* The subtraction is done between two unsigned 32bits value
-	 * (the result is always modulo 32 bits even if we have
-	 * prod_head > cons_tail). So 'free_entries' is always between 0
-	 * and size(ring)-1. */
-	free_entries = mask + cons_tail - prod_head;
-
-	/* check that we have enough room in ring */
-	if (unlikely(n > free_entries)) {
-		if (behavior == RTE_RING_QUEUE_FIXED) {
-			__RING_STAT_ADD(r, enq_fail, n);
-			return -ENOBUFS;
-		}
-		else {
-			/* No free entry available */
-			if (unlikely(free_entries == 0)) {
-				__RING_STAT_ADD(r, enq_fail, n);
-				return 0;
-			}
-
-			n = free_entries;
-		}
-	}
-
-	prod_next = prod_head + n;
-	r->prod.head = prod_next;
-
-	/* write entries in ring */
-	ENQUEUE_PTRS();
-	rte_smp_wmb();
-
-	/* if we exceed the watermark */
-	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-			(int)(n | RTE_RING_QUOT_EXCEED);
-		__RING_STAT_ADD(r, enq_quota, n);
-	}
-	else {
-		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-		__RING_STAT_ADD(r, enq_success, n);
-	}
-
-	r->prod.tail = prod_next;
-	return ret;
-}
-
-/**
- * @internal Dequeue several objects from a ring (multi-consumers safe). When
- * the request objects are more than the available objects, only dequeue the
- * actual number of objects
- *
- * This function uses a "compare and set" instruction to move the
- * consumer index atomically.
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
- * @param n
- *   The number of objects to dequeue from the ring to the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
- */
-
-static inline int __attribute__((always_inline))
-__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t cons_head, prod_tail;
-	uint32_t cons_next, entries;
-	const unsigned max = n;
-	int success;
-	unsigned i, rep = 0;
-	uint32_t mask = r->prod.mask;
-
-	/* Avoid the unnecessary cmpset operation below, which is also
-	 * potentially harmful when n equals 0. */
-	if (n == 0)
-		return 0;
-
-	/* move cons.head atomically */
-	do {
-		/* Restore n as it may change every loop */
-		n = max;
-
-		cons_head = r->cons.head;
-		prod_tail = r->prod.tail;
-		/* The subtraction is done between two unsigned 32bits value
-		 * (the result is always modulo 32 bits even if we have
-		 * cons_head > prod_tail). So 'entries' is always between 0
-		 * and size(ring)-1. */
-		entries = (prod_tail - cons_head);
-
-		/* Set the actual entries for dequeue */
-		if (n > entries) {
-			if (behavior == RTE_RING_QUEUE_FIXED) {
-				__RING_STAT_ADD(r, deq_fail, n);
-				return -ENOENT;
-			}
-			else {
-				if (unlikely(entries == 0)){
-					__RING_STAT_ADD(r, deq_fail, n);
-					return 0;
-				}
-
-				n = entries;
-			}
-		}
-
-		cons_next = cons_head + n;
-		success = rte_atomic32_cmpset(&r->cons.head, cons_head,
-					      cons_next);
-	} while (unlikely(success == 0));
-
-	/* copy in table */
-	DEQUEUE_PTRS();
-	rte_smp_rmb();
-
-	/*
-	 * If there are other dequeues in progress that preceded us,
-	 * we need to wait for them to complete
-	 */
-	while (unlikely(r->cons.tail != cons_head)) {
-		rte_pause();
-
-		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-		 * for other thread finish. It gives pre-empted thread a chance
-		 * to proceed and finish with ring dequeue operation. */
-		if (RTE_RING_PAUSE_REP_COUNT &&
-		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
-			rep = 0;
-			sched_yield();
-		}
-	}
-	__RING_STAT_ADD(r, deq_success, n);
-	r->cons.tail = cons_next;
-
-	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
-}
-
-/**
- * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
- * When the request objects are more than the available objects, only dequeue
- * the actual number of objects
- *
- * @param r
- *   A pointer to the ring structure.
- * @param obj_table
- *   A pointer to a table of void * pointers (objects) that will be filled.
- * @param n
- *   The number of objects to dequeue from the ring to the obj_table.
- * @param behavior
- *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
- *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
- * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
- */
-static inline int __attribute__((always_inline))
-__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
-{
-	uint32_t cons_head, prod_tail;
-	uint32_t cons_next, entries;
-	unsigned i;
-	uint32_t mask = r->prod.mask;
-
-	cons_head = r->cons.head;
-	prod_tail = r->prod.tail;
-	/* The subtraction is done between two unsigned 32bits value
-	 * (the result is always modulo 32 bits even if we have
-	 * cons_head > prod_tail). So 'entries' is always between 0
-	 * and size(ring)-1. */
-	entries = prod_tail - cons_head;
-
-	if (n > entries) {
-		if (behavior == RTE_RING_QUEUE_FIXED) {
-			__RING_STAT_ADD(r, deq_fail, n);
-			return -ENOENT;
-		}
-		else {
-			if (unlikely(entries == 0)){
-				__RING_STAT_ADD(r, deq_fail, n);
-				return 0;
-			}
-
-			n = entries;
-		}
-	}
-
-	cons_next = cons_head + n;
-	r->cons.head = cons_next;
-
-	/* copy in table */
-	DEQUEUE_PTRS();
-	rte_smp_rmb();
-
-	__RING_STAT_ADD(r, deq_success, n);
-	r->cons.tail = cons_next;
-	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
-}
+#define __rte_ring_mp_do_enqueue rte_void___ring_mp_do_enqueue
+#define __rte_ring_sp_do_enqueue rte_void___ring_sp_do_enqueue
+#define __rte_ring_mc_do_dequeue rte_void___ring_mc_do_dequeue
+#define __rte_ring_sc_do_dequeue rte_void___ring_sc_do_dequeue
 
 /**
  * Enqueue several objects on the ring (multi-producers safe).
@@ -882,9 +526,7 @@  rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline int
 rte_ring_full(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
+	return rte_void_ring_full(r);
 }
 
 /**
@@ -899,9 +541,7 @@  rte_ring_full(const struct rte_ring *r)
 static inline int
 rte_ring_empty(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return !!(cons_tail == prod_tail);
+	return rte_void_ring_empty(r);
 }
 
 /**
@@ -915,9 +555,7 @@  rte_ring_empty(const struct rte_ring *r)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return (prod_tail - cons_tail) & r->prod.mask;
+	return rte_void_ring_count(r);
 }
 
 /**
@@ -931,9 +569,7 @@  rte_ring_count(const struct rte_ring *r)
 static inline unsigned
 rte_ring_free_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return (cons_tail - prod_tail - 1) & r->prod.mask;
+	return rte_void_ring_free_count(r);
 }
 
 /**