[dpdk-dev,v5,07/14] ring: make bulk and burst fn return vals consistent

Message ID 20170329130941.31190-8-bruce.richardson@intel.com (mailing list archive)
State Accepted, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Bruce Richardson March 29, 2017, 1:09 p.m. UTC
  The bulk fns for rings returns 0 for all elements enqueued and negative
for no space. Change that to make them consistent with the burst functions
in returning the number of elements enqueued/dequeued, i.e. 0 or N.
This change also allows the return value from enq/deq to be used directly
without a branch for error checking.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Reviewed-by: Yuanhan Liu <yuanhan.liu@linux.intel.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
---
 doc/guides/rel_notes/release_17_05.rst             |  11 +++
 doc/guides/sample_app_ug/server_node_efd.rst       |   2 +-
 examples/load_balancer/runtime.c                   |  16 ++-
 .../client_server_mp/mp_client/client.c            |   8 +-
 .../client_server_mp/mp_server/main.c              |   2 +-
 examples/qos_sched/app_thread.c                    |   8 +-
 examples/server_node_efd/node/node.c               |   2 +-
 examples/server_node_efd/server/main.c             |   2 +-
 lib/librte_mempool/rte_mempool_ring.c              |  12 ++-
 lib/librte_ring/rte_ring.h                         | 109 +++++++--------------
 test/test-pipeline/pipeline_hash.c                 |   2 +-
 test/test-pipeline/runtime.c                       |   8 +-
 test/test/test_ring.c                              |  46 +++++----
 test/test/test_ring_perf.c                         |   8 +-
 14 files changed, 106 insertions(+), 130 deletions(-)
  

Comments

Zhihong Wang April 13, 2017, 6:42 a.m. UTC | #1
Hi Bruce,

This patch changes the behavior and causes some existing code to
malfunction, e.g. bond_ethdev_stop() will get stuck here:

while (rte_ring_dequeue(port->rx_ring, &pkt) != -ENOENT)
		rte_pktmbuf_free(pkt);

Another example in test/test/virtual_pmd.c: virtual_ethdev_stop().


Thanks
Zhihong

> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Bruce Richardson
> Sent: Wednesday, March 29, 2017 9:10 PM
> To: olivier.matz@6wind.com
> Cc: dev@dpdk.org; Richardson, Bruce <bruce.richardson@intel.com>
> Subject: [dpdk-dev] [PATCH v5 07/14] ring: make bulk and burst fn return
> vals consistent
> 
> The bulk fns for rings returns 0 for all elements enqueued and negative
> for no space. Change that to make them consistent with the burst functions
> in returning the number of elements enqueued/dequeued, i.e. 0 or N.
> This change also allows the return value from enq/deq to be used directly
> without a branch for error checking.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Reviewed-by: Yuanhan Liu <yuanhan.liu@linux.intel.com>
> Acked-by: Olivier Matz <olivier.matz@6wind.com>
> ---
>  doc/guides/rel_notes/release_17_05.rst             |  11 +++
>  doc/guides/sample_app_ug/server_node_efd.rst       |   2 +-
>  examples/load_balancer/runtime.c                   |  16 ++-
>  .../client_server_mp/mp_client/client.c            |   8 +-
>  .../client_server_mp/mp_server/main.c              |   2 +-
>  examples/qos_sched/app_thread.c                    |   8 +-
>  examples/server_node_efd/node/node.c               |   2 +-
>  examples/server_node_efd/server/main.c             |   2 +-
>  lib/librte_mempool/rte_mempool_ring.c              |  12 ++-
>  lib/librte_ring/rte_ring.h                         | 109 +++++++--------------
>  test/test-pipeline/pipeline_hash.c                 |   2 +-
>  test/test-pipeline/runtime.c                       |   8 +-
>  test/test/test_ring.c                              |  46 +++++----
>  test/test/test_ring_perf.c                         |   8 +-
>  14 files changed, 106 insertions(+), 130 deletions(-)
> 
> diff --git a/doc/guides/rel_notes/release_17_05.rst
> b/doc/guides/rel_notes/release_17_05.rst
> index 084b359..6da2612 100644
> --- a/doc/guides/rel_notes/release_17_05.rst
> +++ b/doc/guides/rel_notes/release_17_05.rst
> @@ -137,6 +137,17 @@ API Changes
>    * removed the build-time setting
> ``CONFIG_RTE_RING_PAUSE_REP_COUNT``
>    * removed the function ``rte_ring_set_water_mark`` as part of a general
>      removal of watermarks support in the library.
> +  * changed the return value of the enqueue and dequeue bulk functions to
> +    match that of the burst equivalents. In all cases, ring functions which
> +    operate on multiple packets now return the number of elements
> enqueued
> +    or dequeued, as appropriate. The updated functions are:
> +
> +    - ``rte_ring_mp_enqueue_bulk``
> +    - ``rte_ring_sp_enqueue_bulk``
> +    - ``rte_ring_enqueue_bulk``
> +    - ``rte_ring_mc_dequeue_bulk``
> +    - ``rte_ring_sc_dequeue_bulk``
> +    - ``rte_ring_dequeue_bulk``
> 
>  ABI Changes
>  -----------
> diff --git a/doc/guides/sample_app_ug/server_node_efd.rst
> b/doc/guides/sample_app_ug/server_node_efd.rst
> index 9b69cfe..e3a63c8 100644
> --- a/doc/guides/sample_app_ug/server_node_efd.rst
> +++ b/doc/guides/sample_app_ug/server_node_efd.rst
> @@ -286,7 +286,7 @@ repeated infinitely.
> 
>          cl = &nodes[node];
>          if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
> -                cl_rx_buf[node].count) != 0){
> +                cl_rx_buf[node].count) != cl_rx_buf[node].count){
>              for (j = 0; j < cl_rx_buf[node].count; j++)
>                  rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
>              cl->stats.rx_drop += cl_rx_buf[node].count;
> diff --git a/examples/load_balancer/runtime.c
> b/examples/load_balancer/runtime.c
> index 6944325..82b10bc 100644
> --- a/examples/load_balancer/runtime.c
> +++ b/examples/load_balancer/runtime.c
> @@ -146,7 +146,7 @@ app_lcore_io_rx_buffer_to_send (
>  		(void **) lp->rx.mbuf_out[worker].array,
>  		bsz);
> 
> -	if (unlikely(ret == -ENOBUFS)) {
> +	if (unlikely(ret == 0)) {
>  		uint32_t k;
>  		for (k = 0; k < bsz; k ++) {
>  			struct rte_mbuf *m = lp-
> >rx.mbuf_out[worker].array[k];
> @@ -312,7 +312,7 @@ app_lcore_io_rx_flush(struct app_lcore_params_io
> *lp, uint32_t n_workers)
>  			(void **) lp->rx.mbuf_out[worker].array,
>  			lp->rx.mbuf_out[worker].n_mbufs);
> 
> -		if (unlikely(ret < 0)) {
> +		if (unlikely(ret == 0)) {
>  			uint32_t k;
>  			for (k = 0; k < lp->rx.mbuf_out[worker].n_mbufs; k
> ++) {
>  				struct rte_mbuf *pkt_to_free = lp-
> >rx.mbuf_out[worker].array[k];
> @@ -349,9 +349,8 @@ app_lcore_io_tx(
>  				(void **) &lp-
> >tx.mbuf_out[port].array[n_mbufs],
>  				bsz_rd);
> 
> -			if (unlikely(ret == -ENOENT)) {
> +			if (unlikely(ret == 0))
>  				continue;
> -			}
> 
>  			n_mbufs += bsz_rd;
> 
> @@ -505,9 +504,8 @@ app_lcore_worker(
>  			(void **) lp->mbuf_in.array,
>  			bsz_rd);
> 
> -		if (unlikely(ret == -ENOENT)) {
> +		if (unlikely(ret == 0))
>  			continue;
> -		}
> 
>  #if APP_WORKER_DROP_ALL_PACKETS
>  		for (j = 0; j < bsz_rd; j ++) {
> @@ -559,7 +557,7 @@ app_lcore_worker(
> 
>  #if APP_STATS
>  			lp->rings_out_iters[port] ++;
> -			if (ret == 0) {
> +			if (ret > 0) {
>  				lp->rings_out_count[port] += 1;
>  			}
>  			if (lp->rings_out_iters[port] == APP_STATS){
> @@ -572,7 +570,7 @@ app_lcore_worker(
>  			}
>  #endif
> 
> -			if (unlikely(ret == -ENOBUFS)) {
> +			if (unlikely(ret == 0)) {
>  				uint32_t k;
>  				for (k = 0; k < bsz_wr; k ++) {
>  					struct rte_mbuf *pkt_to_free = lp-
> >mbuf_out[port].array[k];
> @@ -609,7 +607,7 @@ app_lcore_worker_flush(struct
> app_lcore_params_worker *lp)
>  			(void **) lp->mbuf_out[port].array,
>  			lp->mbuf_out[port].n_mbufs);
> 
> -		if (unlikely(ret < 0)) {
> +		if (unlikely(ret == 0)) {
>  			uint32_t k;
>  			for (k = 0; k < lp->mbuf_out[port].n_mbufs; k ++) {
>  				struct rte_mbuf *pkt_to_free = lp-
> >mbuf_out[port].array[k];
> diff --git a/examples/multi_process/client_server_mp/mp_client/client.c
> b/examples/multi_process/client_server_mp/mp_client/client.c
> index d4f9ca3..dca9eb9 100644
> --- a/examples/multi_process/client_server_mp/mp_client/client.c
> +++ b/examples/multi_process/client_server_mp/mp_client/client.c
> @@ -276,14 +276,10 @@ main(int argc, char *argv[])
>  	printf("[Press Ctrl-C to quit ...]\n");
> 
>  	for (;;) {
> -		uint16_t i, rx_pkts = PKT_READ_SIZE;
> +		uint16_t i, rx_pkts;
>  		uint8_t port;
> 
> -		/* try dequeuing max possible packets first, if that fails, get
> the
> -		 * most we can. Loop body should only execute once,
> maximum */
> -		while (rx_pkts > 0 &&
> -				unlikely(rte_ring_dequeue_bulk(rx_ring,
> pkts, rx_pkts) != 0))
> -			rx_pkts =
> (uint16_t)RTE_MIN(rte_ring_count(rx_ring), PKT_READ_SIZE);
> +		rx_pkts = rte_ring_dequeue_burst(rx_ring, pkts,
> PKT_READ_SIZE);
> 
>  		if (unlikely(rx_pkts == 0)){
>  			if (need_flush)
> diff --git a/examples/multi_process/client_server_mp/mp_server/main.c
> b/examples/multi_process/client_server_mp/mp_server/main.c
> index a6dc12d..19c95b2 100644
> --- a/examples/multi_process/client_server_mp/mp_server/main.c
> +++ b/examples/multi_process/client_server_mp/mp_server/main.c
> @@ -227,7 +227,7 @@ flush_rx_queue(uint16_t client)
> 
>  	cl = &clients[client];
>  	if (rte_ring_enqueue_bulk(cl->rx_q, (void
> **)cl_rx_buf[client].buffer,
> -			cl_rx_buf[client].count) != 0){
> +			cl_rx_buf[client].count) == 0){
>  		for (j = 0; j < cl_rx_buf[client].count; j++)
>  			rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
>  		cl->stats.rx_drop += cl_rx_buf[client].count;
> diff --git a/examples/qos_sched/app_thread.c
> b/examples/qos_sched/app_thread.c
> index 70fdcdb..dab4594 100644
> --- a/examples/qos_sched/app_thread.c
> +++ b/examples/qos_sched/app_thread.c
> @@ -107,7 +107,7 @@ app_rx_thread(struct thread_conf **confs)
>  			}
> 
>  			if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
> -								(void
> **)rx_mbufs, nb_rx) != 0)) {
> +					(void **)rx_mbufs, nb_rx) == 0)) {
>  				for(i = 0; i < nb_rx; i++) {
>  					rte_pktmbuf_free(rx_mbufs[i]);
> 
> @@ -180,7 +180,7 @@ app_tx_thread(struct thread_conf **confs)
>  	while ((conf = confs[conf_idx])) {
>  		retval = rte_ring_sc_dequeue_bulk(conf->tx_ring, (void
> **)mbufs,
>  					burst_conf.qos_dequeue);
> -		if (likely(retval == 0)) {
> +		if (likely(retval != 0)) {
>  			app_send_packets(conf, mbufs,
> burst_conf.qos_dequeue);
> 
>  			conf->counter = 0; /* reset empty read loop counter
> */
> @@ -230,7 +230,9 @@ app_worker_thread(struct thread_conf **confs)
>  		nb_pkt = rte_sched_port_dequeue(conf->sched_port,
> mbufs,
>  					burst_conf.qos_dequeue);
>  		if (likely(nb_pkt > 0))
> -			while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
> (void **)mbufs, nb_pkt) != 0);
> +			while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
> +					(void **)mbufs, nb_pkt) == 0)
> +				; /* empty body */
> 
>  		conf_idx++;
>  		if (confs[conf_idx] == NULL)
> diff --git a/examples/server_node_efd/node/node.c
> b/examples/server_node_efd/node/node.c
> index a6c0c70..9ec6a05 100644
> --- a/examples/server_node_efd/node/node.c
> +++ b/examples/server_node_efd/node/node.c
> @@ -392,7 +392,7 @@ main(int argc, char *argv[])
>  		 */
>  		while (rx_pkts > 0 &&
>  				unlikely(rte_ring_dequeue_bulk(rx_ring,
> pkts,
> -					rx_pkts) != 0))
> +					rx_pkts) == 0))
>  			rx_pkts =
> (uint16_t)RTE_MIN(rte_ring_count(rx_ring),
>  					PKT_READ_SIZE);
> 
> diff --git a/examples/server_node_efd/server/main.c
> b/examples/server_node_efd/server/main.c
> index 1a54d1b..3eb7fac 100644
> --- a/examples/server_node_efd/server/main.c
> +++ b/examples/server_node_efd/server/main.c
> @@ -247,7 +247,7 @@ flush_rx_queue(uint16_t node)
> 
>  	cl = &nodes[node];
>  	if (rte_ring_enqueue_bulk(cl->rx_q, (void
> **)cl_rx_buf[node].buffer,
> -			cl_rx_buf[node].count) != 0){
> +			cl_rx_buf[node].count) != cl_rx_buf[node].count){
>  		for (j = 0; j < cl_rx_buf[node].count; j++)
>  			rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
>  		cl->stats.rx_drop += cl_rx_buf[node].count;
> diff --git a/lib/librte_mempool/rte_mempool_ring.c
> b/lib/librte_mempool/rte_mempool_ring.c
> index b9aa64d..409b860 100644
> --- a/lib/librte_mempool/rte_mempool_ring.c
> +++ b/lib/librte_mempool/rte_mempool_ring.c
> @@ -42,26 +42,30 @@ static int
>  common_ring_mp_enqueue(struct rte_mempool *mp, void * const
> *obj_table,
>  		unsigned n)
>  {
> -	return rte_ring_mp_enqueue_bulk(mp->pool_data, obj_table, n);
> +	return rte_ring_mp_enqueue_bulk(mp->pool_data,
> +			obj_table, n) == 0 ? -ENOBUFS : 0;
>  }
> 
>  static int
>  common_ring_sp_enqueue(struct rte_mempool *mp, void * const
> *obj_table,
>  		unsigned n)
>  {
> -	return rte_ring_sp_enqueue_bulk(mp->pool_data, obj_table, n);
> +	return rte_ring_sp_enqueue_bulk(mp->pool_data,
> +			obj_table, n) == 0 ? -ENOBUFS : 0;
>  }
> 
>  static int
>  common_ring_mc_dequeue(struct rte_mempool *mp, void **obj_table,
> unsigned n)
>  {
> -	return rte_ring_mc_dequeue_bulk(mp->pool_data, obj_table, n);
> +	return rte_ring_mc_dequeue_bulk(mp->pool_data,
> +			obj_table, n) == 0 ? -ENOBUFS : 0;
>  }
> 
>  static int
>  common_ring_sc_dequeue(struct rte_mempool *mp, void **obj_table,
> unsigned n)
>  {
> -	return rte_ring_sc_dequeue_bulk(mp->pool_data, obj_table, n);
> +	return rte_ring_sc_dequeue_bulk(mp->pool_data,
> +			obj_table, n) == 0 ? -ENOBUFS : 0;
>  }
> 
>  static unsigned
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 906e8ae..34b438c 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -349,14 +349,10 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> *r);
>   *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
>   *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from
> ring
>   * @return
> - *   Depend on the behavior value
> - *   if behavior = RTE_RING_QUEUE_FIXED
> - *   - 0: Success; objects enqueue.
> - *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is
> enqueued.
> - *   if behavior = RTE_RING_QUEUE_VARIABLE
> - *   - n: Actual number of objects enqueued.
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
>  			 unsigned n, enum rte_ring_queue_behavior
> behavior)
>  {
> @@ -388,7 +384,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void
> * const *obj_table,
>  		/* check that we have enough room in ring */
>  		if (unlikely(n > free_entries)) {
>  			if (behavior == RTE_RING_QUEUE_FIXED)
> -				return -ENOBUFS;
> +				return 0;
>  			else {
>  				/* No free entry available */
>  				if (unlikely(free_entries == 0))
> @@ -414,7 +410,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void
> * const *obj_table,
>  		rte_pause();
> 
>  	r->prod.tail = prod_next;
> -	return (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
> +	return n;
>  }
> 
>  /**
> @@ -430,14 +426,10 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r,
> void * const *obj_table,
>   *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
>   *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from
> ring
>   * @return
> - *   Depend on the behavior value
> - *   if behavior = RTE_RING_QUEUE_FIXED
> - *   - 0: Success; objects enqueue.
> - *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is
> enqueued.
> - *   if behavior = RTE_RING_QUEUE_VARIABLE
> - *   - n: Actual number of objects enqueued.
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
>  			 unsigned n, enum rte_ring_queue_behavior
> behavior)
>  {
> @@ -457,7 +449,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void *
> const *obj_table,
>  	/* check that we have enough room in ring */
>  	if (unlikely(n > free_entries)) {
>  		if (behavior == RTE_RING_QUEUE_FIXED)
> -			return -ENOBUFS;
> +			return 0;
>  		else {
>  			/* No free entry available */
>  			if (unlikely(free_entries == 0))
> @@ -474,7 +466,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void *
> const *obj_table,
>  	rte_smp_wmb();
> 
>  	r->prod.tail = prod_next;
> -	return (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
> +	return n;
>  }
> 
>  /**
> @@ -495,16 +487,11 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r,
> void * const *obj_table,
>   *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a
> ring
>   *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from
> ring
>   * @return
> - *   Depend on the behavior value
> - *   if behavior = RTE_RING_QUEUE_FIXED
> - *   - 0: Success; objects dequeued.
> - *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
> - *     dequeued.
> - *   if behavior = RTE_RING_QUEUE_VARIABLE
> - *   - n: Actual number of objects dequeued.
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>   */
> 
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
>  		 unsigned n, enum rte_ring_queue_behavior behavior)
>  {
> @@ -536,7 +523,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void
> **obj_table,
>  		/* Set the actual entries for dequeue */
>  		if (n > entries) {
>  			if (behavior == RTE_RING_QUEUE_FIXED)
> -				return -ENOENT;
> +				return 0;
>  			else {
>  				if (unlikely(entries == 0))
>  					return 0;
> @@ -562,7 +549,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void
> **obj_table,
> 
>  	r->cons.tail = cons_next;
> 
> -	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
> +	return n;
>  }
> 
>  /**
> @@ -580,15 +567,10 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r,
> void **obj_table,
>   *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a
> ring
>   *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from
> ring
>   * @return
> - *   Depend on the behavior value
> - *   if behavior = RTE_RING_QUEUE_FIXED
> - *   - 0: Success; objects dequeued.
> - *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
> - *     dequeued.
> - *   if behavior = RTE_RING_QUEUE_VARIABLE
> - *   - n: Actual number of objects dequeued.
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
>  		 unsigned n, enum rte_ring_queue_behavior behavior)
>  {
> @@ -607,7 +589,7 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void
> **obj_table,
> 
>  	if (n > entries) {
>  		if (behavior == RTE_RING_QUEUE_FIXED)
> -			return -ENOENT;
> +			return 0;
>  		else {
>  			if (unlikely(entries == 0))
>  				return 0;
> @@ -623,7 +605,7 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void
> **obj_table,
>  	rte_smp_rmb();
> 
>  	r->cons.tail = cons_next;
> -	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
> +	return n;
>  }
> 
>  /**
> @@ -639,10 +621,9 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void
> **obj_table,
>   * @param n
>   *   The number of objects to add in the ring from the obj_table.
>   * @return
> - *   - 0: Success; objects enqueue.
> - *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is
> enqueued.
> + *   The number of objects enqueued, either 0 or n
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			 unsigned n)
>  {
> @@ -659,10 +640,9 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void
> * const *obj_table,
>   * @param n
>   *   The number of objects to add in the ring from the obj_table.
>   * @return
> - *   - 0: Success; objects enqueued.
> - *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + *   The number of objects enqueued, either 0 or n
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			 unsigned n)
>  {
> @@ -683,10 +663,9 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *
> const *obj_table,
>   * @param n
>   *   The number of objects to add in the ring from the obj_table.
>   * @return
> - *   - 0: Success; objects enqueued.
> - *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + *   The number of objects enqueued, either 0 or n
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  		      unsigned n)
>  {
> @@ -713,7 +692,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void *
> const *obj_table,
>  static inline int __attribute__((always_inline))
>  rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
>  {
> -	return rte_ring_mp_enqueue_bulk(r, &obj, 1);
> +	return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
>  }
> 
>  /**
> @@ -730,7 +709,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
>  static inline int __attribute__((always_inline))
>  rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
>  {
> -	return rte_ring_sp_enqueue_bulk(r, &obj, 1);
> +	return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
>  }
> 
>  /**
> @@ -751,10 +730,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
>  static inline int __attribute__((always_inline))
>  rte_ring_enqueue(struct rte_ring *r, void *obj)
>  {
> -	if (r->prod.single)
> -		return rte_ring_sp_enqueue(r, obj);
> -	else
> -		return rte_ring_mp_enqueue(r, obj);
> +	return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
>  }
> 
>  /**
> @@ -770,11 +746,9 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
>   * @param n
>   *   The number of objects to dequeue from the ring to the obj_table.
>   * @return
> - *   - 0: Success; objects dequeued.
> - *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
> - *     dequeued.
> + *   The number of objects dequeued, either 0 or n
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
>  {
>  	return __rte_ring_mc_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED);
> @@ -791,11 +765,9 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void
> **obj_table, unsigned n)
>   *   The number of objects to dequeue from the ring to the obj_table,
>   *   must be strictly positive.
>   * @return
> - *   - 0: Success; objects dequeued.
> - *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
> - *     dequeued.
> + *   The number of objects dequeued, either 0 or n
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
>  {
>  	return __rte_ring_sc_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED);
> @@ -815,11 +787,9 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void
> **obj_table, unsigned n)
>   * @param n
>   *   The number of objects to dequeue from the ring to the obj_table.
>   * @return
> - *   - 0: Success; objects dequeued.
> - *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
> - *     dequeued.
> + *   The number of objects dequeued, either 0 or n
>   */
> -static inline int __attribute__((always_inline))
> +static inline unsigned int __attribute__((always_inline))
>  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
>  {
>  	if (r->cons.single)
> @@ -846,7 +816,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> **obj_table, unsigned n)
>  static inline int __attribute__((always_inline))
>  rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
>  {
> -	return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
> +	return rte_ring_mc_dequeue_bulk(r, obj_p, 1)  ? 0 : -ENOBUFS;
>  }
> 
>  /**
> @@ -864,7 +834,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void
> **obj_p)
>  static inline int __attribute__((always_inline))
>  rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
>  {
> -	return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
> +	return rte_ring_sc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
>  }
> 
>  /**
> @@ -886,10 +856,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void
> **obj_p)
>  static inline int __attribute__((always_inline))
>  rte_ring_dequeue(struct rte_ring *r, void **obj_p)
>  {
> -	if (r->cons.single)
> -		return rte_ring_sc_dequeue(r, obj_p);
> -	else
> -		return rte_ring_mc_dequeue(r, obj_p);
> +	return rte_ring_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
>  }
> 
>  /**
> diff --git a/test/test-pipeline/pipeline_hash.c b/test/test-
> pipeline/pipeline_hash.c
> index 10d2869..1ac0aa8 100644
> --- a/test/test-pipeline/pipeline_hash.c
> +++ b/test/test-pipeline/pipeline_hash.c
> @@ -547,6 +547,6 @@ app_main_loop_rx_metadata(void) {
>  				app.rings_rx[i],
>  				(void **) app.mbuf_rx.array,
>  				n_mbufs);
> -		} while (ret < 0);
> +		} while (ret == 0);
>  	}
>  }
> diff --git a/test/test-pipeline/runtime.c b/test/test-pipeline/runtime.c
> index 42a6142..4e20669 100644
> --- a/test/test-pipeline/runtime.c
> +++ b/test/test-pipeline/runtime.c
> @@ -98,7 +98,7 @@ app_main_loop_rx(void) {
>  				app.rings_rx[i],
>  				(void **) app.mbuf_rx.array,
>  				n_mbufs);
> -		} while (ret < 0);
> +		} while (ret == 0);
>  	}
>  }
> 
> @@ -123,7 +123,7 @@ app_main_loop_worker(void) {
>  			(void **) worker_mbuf->array,
>  			app.burst_size_worker_read);
> 
> -		if (ret == -ENOENT)
> +		if (ret == 0)
>  			continue;
> 
>  		do {
> @@ -131,7 +131,7 @@ app_main_loop_worker(void) {
>  				app.rings_tx[i ^ 1],
>  				(void **) worker_mbuf->array,
>  				app.burst_size_worker_write);
> -		} while (ret < 0);
> +		} while (ret == 0);
>  	}
>  }
> 
> @@ -152,7 +152,7 @@ app_main_loop_tx(void) {
>  			(void **) &app.mbuf_tx[i].array[n_mbufs],
>  			app.burst_size_tx_read);
> 
> -		if (ret == -ENOENT)
> +		if (ret == 0)
>  			continue;
> 
>  		n_mbufs += app.burst_size_tx_read;
> diff --git a/test/test/test_ring.c b/test/test/test_ring.c
> index 666a451..112433b 100644
> --- a/test/test/test_ring.c
> +++ b/test/test/test_ring.c
> @@ -117,20 +117,18 @@ test_ring_basic_full_empty(void * const src[], void
> *dst[])
>  		rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
>  		printf("%s: iteration %u, random shift: %u;\n",
>  		    __func__, i, rand);
> -		TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r,
> src,
> -		    rand));
> -		TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst,
> rand));
> +		TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand) !=
> 0);
> +		TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) ==
> rand);
> 
>  		/* fill the ring */
> -		TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r,
> src,
> -		    rsz));
> +		TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz) != 0);
>  		TEST_RING_VERIFY(0 == rte_ring_free_count(r));
>  		TEST_RING_VERIFY(rsz == rte_ring_count(r));
>  		TEST_RING_VERIFY(rte_ring_full(r));
>  		TEST_RING_VERIFY(0 == rte_ring_empty(r));
> 
>  		/* empty the ring */
> -		TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
> +		TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rsz) ==
> rsz);
>  		TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
>  		TEST_RING_VERIFY(0 == rte_ring_count(r));
>  		TEST_RING_VERIFY(0 == rte_ring_full(r));
> @@ -171,37 +169,37 @@ test_ring_basic(void)
>  	printf("enqueue 1 obj\n");
>  	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
>  	cur_src += 1;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("enqueue 2 objs\n");
>  	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
>  	cur_src += 2;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("enqueue MAX_BULK objs\n");
>  	ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
>  	cur_src += MAX_BULK;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("dequeue 1 obj\n");
>  	ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
>  	cur_dst += 1;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("dequeue 2 objs\n");
>  	ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
>  	cur_dst += 2;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("dequeue MAX_BULK objs\n");
>  	ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
>  	cur_dst += MAX_BULK;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	/* check data */
> @@ -217,37 +215,37 @@ test_ring_basic(void)
>  	printf("enqueue 1 obj\n");
>  	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
>  	cur_src += 1;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("enqueue 2 objs\n");
>  	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
>  	cur_src += 2;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("enqueue MAX_BULK objs\n");
>  	ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
>  	cur_src += MAX_BULK;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("dequeue 1 obj\n");
>  	ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
>  	cur_dst += 1;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("dequeue 2 objs\n");
>  	ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
>  	cur_dst += 2;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	printf("dequeue MAX_BULK objs\n");
>  	ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
>  	cur_dst += MAX_BULK;
> -	if (ret != 0)
> +	if (ret == 0)
>  		goto fail;
> 
>  	/* check data */
> @@ -264,11 +262,11 @@ test_ring_basic(void)
>  	for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
>  		ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
>  		cur_src += MAX_BULK;
> -		if (ret != 0)
> +		if (ret == 0)
>  			goto fail;
>  		ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
>  		cur_dst += MAX_BULK;
> -		if (ret != 0)
> +		if (ret == 0)
>  			goto fail;
>  	}
> 
> @@ -294,25 +292,25 @@ test_ring_basic(void)
> 
>  	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
>  	cur_src += num_elems;
> -	if (ret != 0) {
> +	if (ret == 0) {
>  		printf("Cannot enqueue\n");
>  		goto fail;
>  	}
>  	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
>  	cur_src += num_elems;
> -	if (ret != 0) {
> +	if (ret == 0) {
>  		printf("Cannot enqueue\n");
>  		goto fail;
>  	}
>  	ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
>  	cur_dst += num_elems;
> -	if (ret != 0) {
> +	if (ret == 0) {
>  		printf("Cannot dequeue\n");
>  		goto fail;
>  	}
>  	ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
>  	cur_dst += num_elems;
> -	if (ret != 0) {
> +	if (ret == 0) {
>  		printf("Cannot dequeue2\n");
>  		goto fail;
>  	}
> diff --git a/test/test/test_ring_perf.c b/test/test/test_ring_perf.c
> index 320c20c..8ccbdef 100644
> --- a/test/test/test_ring_perf.c
> +++ b/test/test/test_ring_perf.c
> @@ -195,13 +195,13 @@ enqueue_bulk(void *p)
> 
>  	const uint64_t sp_start = rte_rdtsc();
>  	for (i = 0; i < iterations; i++)
> -		while (rte_ring_sp_enqueue_bulk(r, burst, size) != 0)
> +		while (rte_ring_sp_enqueue_bulk(r, burst, size) == 0)
>  			rte_pause();
>  	const uint64_t sp_end = rte_rdtsc();
> 
>  	const uint64_t mp_start = rte_rdtsc();
>  	for (i = 0; i < iterations; i++)
> -		while (rte_ring_mp_enqueue_bulk(r, burst, size) != 0)
> +		while (rte_ring_mp_enqueue_bulk(r, burst, size) == 0)
>  			rte_pause();
>  	const uint64_t mp_end = rte_rdtsc();
> 
> @@ -230,13 +230,13 @@ dequeue_bulk(void *p)
> 
>  	const uint64_t sc_start = rte_rdtsc();
>  	for (i = 0; i < iterations; i++)
> -		while (rte_ring_sc_dequeue_bulk(r, burst, size) != 0)
> +		while (rte_ring_sc_dequeue_bulk(r, burst, size) == 0)
>  			rte_pause();
>  	const uint64_t sc_end = rte_rdtsc();
> 
>  	const uint64_t mc_start = rte_rdtsc();
>  	for (i = 0; i < iterations; i++)
> -		while (rte_ring_mc_dequeue_bulk(r, burst, size) != 0)
> +		while (rte_ring_mc_dequeue_bulk(r, burst, size) == 0)
>  			rte_pause();
>  	const uint64_t mc_end = rte_rdtsc();
> 
> --
> 2.9.3
  
Bruce Richardson April 13, 2017, 8:33 a.m. UTC | #2
On Thu, Apr 13, 2017 at 07:42:39AM +0100, Wang, Zhihong wrote:
> Hi Bruce,
> 
> This patch changes the behavior and causes some existing code to
> malfunction, e.g. bond_ethdev_stop() will get stuck here:
> 
> while (rte_ring_dequeue(port->rx_ring, &pkt) != -ENOENT)
> 		rte_pktmbuf_free(pkt);
> 
> Another example in test/test/virtual_pmd.c: virtual_ethdev_stop().
> 
> 
> Thanks
> Zhihong
> 
Actually, the behaviour of dequeue function should not be changing, so
this error initially confused me. However, it looks like a typo in my
patchset, compare function API comment, vs implementation:

 * @return
 *   - 0: Success, objects dequeued.
 *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
 *     dequeued.

rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOBUFS;

So it looks like I put in ENOBUFS instead of ENOENT when modifying the
line. I'll do up a patch to fix that.

/Bruce
  

Patch

diff --git a/doc/guides/rel_notes/release_17_05.rst b/doc/guides/rel_notes/release_17_05.rst
index 084b359..6da2612 100644
--- a/doc/guides/rel_notes/release_17_05.rst
+++ b/doc/guides/rel_notes/release_17_05.rst
@@ -137,6 +137,17 @@  API Changes
   * removed the build-time setting ``CONFIG_RTE_RING_PAUSE_REP_COUNT``
   * removed the function ``rte_ring_set_water_mark`` as part of a general
     removal of watermarks support in the library.
+  * changed the return value of the enqueue and dequeue bulk functions to
+    match that of the burst equivalents. In all cases, ring functions which
+    operate on multiple packets now return the number of elements enqueued
+    or dequeued, as appropriate. The updated functions are:
+
+    - ``rte_ring_mp_enqueue_bulk``
+    - ``rte_ring_sp_enqueue_bulk``
+    - ``rte_ring_enqueue_bulk``
+    - ``rte_ring_mc_dequeue_bulk``
+    - ``rte_ring_sc_dequeue_bulk``
+    - ``rte_ring_dequeue_bulk``
 
 ABI Changes
 -----------
diff --git a/doc/guides/sample_app_ug/server_node_efd.rst b/doc/guides/sample_app_ug/server_node_efd.rst
index 9b69cfe..e3a63c8 100644
--- a/doc/guides/sample_app_ug/server_node_efd.rst
+++ b/doc/guides/sample_app_ug/server_node_efd.rst
@@ -286,7 +286,7 @@  repeated infinitely.
 
         cl = &nodes[node];
         if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
-                cl_rx_buf[node].count) != 0){
+                cl_rx_buf[node].count) != cl_rx_buf[node].count){
             for (j = 0; j < cl_rx_buf[node].count; j++)
                 rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
             cl->stats.rx_drop += cl_rx_buf[node].count;
diff --git a/examples/load_balancer/runtime.c b/examples/load_balancer/runtime.c
index 6944325..82b10bc 100644
--- a/examples/load_balancer/runtime.c
+++ b/examples/load_balancer/runtime.c
@@ -146,7 +146,7 @@  app_lcore_io_rx_buffer_to_send (
 		(void **) lp->rx.mbuf_out[worker].array,
 		bsz);
 
-	if (unlikely(ret == -ENOBUFS)) {
+	if (unlikely(ret == 0)) {
 		uint32_t k;
 		for (k = 0; k < bsz; k ++) {
 			struct rte_mbuf *m = lp->rx.mbuf_out[worker].array[k];
@@ -312,7 +312,7 @@  app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers)
 			(void **) lp->rx.mbuf_out[worker].array,
 			lp->rx.mbuf_out[worker].n_mbufs);
 
-		if (unlikely(ret < 0)) {
+		if (unlikely(ret == 0)) {
 			uint32_t k;
 			for (k = 0; k < lp->rx.mbuf_out[worker].n_mbufs; k ++) {
 				struct rte_mbuf *pkt_to_free = lp->rx.mbuf_out[worker].array[k];
@@ -349,9 +349,8 @@  app_lcore_io_tx(
 				(void **) &lp->tx.mbuf_out[port].array[n_mbufs],
 				bsz_rd);
 
-			if (unlikely(ret == -ENOENT)) {
+			if (unlikely(ret == 0))
 				continue;
-			}
 
 			n_mbufs += bsz_rd;
 
@@ -505,9 +504,8 @@  app_lcore_worker(
 			(void **) lp->mbuf_in.array,
 			bsz_rd);
 
-		if (unlikely(ret == -ENOENT)) {
+		if (unlikely(ret == 0))
 			continue;
-		}
 
 #if APP_WORKER_DROP_ALL_PACKETS
 		for (j = 0; j < bsz_rd; j ++) {
@@ -559,7 +557,7 @@  app_lcore_worker(
 
 #if APP_STATS
 			lp->rings_out_iters[port] ++;
-			if (ret == 0) {
+			if (ret > 0) {
 				lp->rings_out_count[port] += 1;
 			}
 			if (lp->rings_out_iters[port] == APP_STATS){
@@ -572,7 +570,7 @@  app_lcore_worker(
 			}
 #endif
 
-			if (unlikely(ret == -ENOBUFS)) {
+			if (unlikely(ret == 0)) {
 				uint32_t k;
 				for (k = 0; k < bsz_wr; k ++) {
 					struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
@@ -609,7 +607,7 @@  app_lcore_worker_flush(struct app_lcore_params_worker *lp)
 			(void **) lp->mbuf_out[port].array,
 			lp->mbuf_out[port].n_mbufs);
 
-		if (unlikely(ret < 0)) {
+		if (unlikely(ret == 0)) {
 			uint32_t k;
 			for (k = 0; k < lp->mbuf_out[port].n_mbufs; k ++) {
 				struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
diff --git a/examples/multi_process/client_server_mp/mp_client/client.c b/examples/multi_process/client_server_mp/mp_client/client.c
index d4f9ca3..dca9eb9 100644
--- a/examples/multi_process/client_server_mp/mp_client/client.c
+++ b/examples/multi_process/client_server_mp/mp_client/client.c
@@ -276,14 +276,10 @@  main(int argc, char *argv[])
 	printf("[Press Ctrl-C to quit ...]\n");
 
 	for (;;) {
-		uint16_t i, rx_pkts = PKT_READ_SIZE;
+		uint16_t i, rx_pkts;
 		uint8_t port;
 
-		/* try dequeuing max possible packets first, if that fails, get the
-		 * most we can. Loop body should only execute once, maximum */
-		while (rx_pkts > 0 &&
-				unlikely(rte_ring_dequeue_bulk(rx_ring, pkts, rx_pkts) != 0))
-			rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring), PKT_READ_SIZE);
+		rx_pkts = rte_ring_dequeue_burst(rx_ring, pkts, PKT_READ_SIZE);
 
 		if (unlikely(rx_pkts == 0)){
 			if (need_flush)
diff --git a/examples/multi_process/client_server_mp/mp_server/main.c b/examples/multi_process/client_server_mp/mp_server/main.c
index a6dc12d..19c95b2 100644
--- a/examples/multi_process/client_server_mp/mp_server/main.c
+++ b/examples/multi_process/client_server_mp/mp_server/main.c
@@ -227,7 +227,7 @@  flush_rx_queue(uint16_t client)
 
 	cl = &clients[client];
 	if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer,
-			cl_rx_buf[client].count) != 0){
+			cl_rx_buf[client].count) == 0){
 		for (j = 0; j < cl_rx_buf[client].count; j++)
 			rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
 		cl->stats.rx_drop += cl_rx_buf[client].count;
diff --git a/examples/qos_sched/app_thread.c b/examples/qos_sched/app_thread.c
index 70fdcdb..dab4594 100644
--- a/examples/qos_sched/app_thread.c
+++ b/examples/qos_sched/app_thread.c
@@ -107,7 +107,7 @@  app_rx_thread(struct thread_conf **confs)
 			}
 
 			if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
-								(void **)rx_mbufs, nb_rx) != 0)) {
+					(void **)rx_mbufs, nb_rx) == 0)) {
 				for(i = 0; i < nb_rx; i++) {
 					rte_pktmbuf_free(rx_mbufs[i]);
 
@@ -180,7 +180,7 @@  app_tx_thread(struct thread_conf **confs)
 	while ((conf = confs[conf_idx])) {
 		retval = rte_ring_sc_dequeue_bulk(conf->tx_ring, (void **)mbufs,
 					burst_conf.qos_dequeue);
-		if (likely(retval == 0)) {
+		if (likely(retval != 0)) {
 			app_send_packets(conf, mbufs, burst_conf.qos_dequeue);
 
 			conf->counter = 0; /* reset empty read loop counter */
@@ -230,7 +230,9 @@  app_worker_thread(struct thread_conf **confs)
 		nb_pkt = rte_sched_port_dequeue(conf->sched_port, mbufs,
 					burst_conf.qos_dequeue);
 		if (likely(nb_pkt > 0))
-			while (rte_ring_sp_enqueue_bulk(conf->tx_ring, (void **)mbufs, nb_pkt) != 0);
+			while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
+					(void **)mbufs, nb_pkt) == 0)
+				; /* empty body */
 
 		conf_idx++;
 		if (confs[conf_idx] == NULL)
diff --git a/examples/server_node_efd/node/node.c b/examples/server_node_efd/node/node.c
index a6c0c70..9ec6a05 100644
--- a/examples/server_node_efd/node/node.c
+++ b/examples/server_node_efd/node/node.c
@@ -392,7 +392,7 @@  main(int argc, char *argv[])
 		 */
 		while (rx_pkts > 0 &&
 				unlikely(rte_ring_dequeue_bulk(rx_ring, pkts,
-					rx_pkts) != 0))
+					rx_pkts) == 0))
 			rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring),
 					PKT_READ_SIZE);
 
diff --git a/examples/server_node_efd/server/main.c b/examples/server_node_efd/server/main.c
index 1a54d1b..3eb7fac 100644
--- a/examples/server_node_efd/server/main.c
+++ b/examples/server_node_efd/server/main.c
@@ -247,7 +247,7 @@  flush_rx_queue(uint16_t node)
 
 	cl = &nodes[node];
 	if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
-			cl_rx_buf[node].count) != 0){
+			cl_rx_buf[node].count) != cl_rx_buf[node].count){
 		for (j = 0; j < cl_rx_buf[node].count; j++)
 			rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
 		cl->stats.rx_drop += cl_rx_buf[node].count;
diff --git a/lib/librte_mempool/rte_mempool_ring.c b/lib/librte_mempool/rte_mempool_ring.c
index b9aa64d..409b860 100644
--- a/lib/librte_mempool/rte_mempool_ring.c
+++ b/lib/librte_mempool/rte_mempool_ring.c
@@ -42,26 +42,30 @@  static int
 common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
 		unsigned n)
 {
-	return rte_ring_mp_enqueue_bulk(mp->pool_data, obj_table, n);
+	return rte_ring_mp_enqueue_bulk(mp->pool_data,
+			obj_table, n) == 0 ? -ENOBUFS : 0;
 }
 
 static int
 common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table,
 		unsigned n)
 {
-	return rte_ring_sp_enqueue_bulk(mp->pool_data, obj_table, n);
+	return rte_ring_sp_enqueue_bulk(mp->pool_data,
+			obj_table, n) == 0 ? -ENOBUFS : 0;
 }
 
 static int
 common_ring_mc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
 {
-	return rte_ring_mc_dequeue_bulk(mp->pool_data, obj_table, n);
+	return rte_ring_mc_dequeue_bulk(mp->pool_data,
+			obj_table, n) == 0 ? -ENOBUFS : 0;
 }
 
 static int
 common_ring_sc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
 {
-	return rte_ring_sc_dequeue_bulk(mp->pool_data, obj_table, n);
+	return rte_ring_sc_dequeue_bulk(mp->pool_data,
+			obj_table, n) == 0 ? -ENOBUFS : 0;
 }
 
 static unsigned
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 906e8ae..34b438c 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -349,14 +349,10 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 			 unsigned n, enum rte_ring_queue_behavior behavior)
 {
@@ -388,7 +384,7 @@  __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 		/* check that we have enough room in ring */
 		if (unlikely(n > free_entries)) {
 			if (behavior == RTE_RING_QUEUE_FIXED)
-				return -ENOBUFS;
+				return 0;
 			else {
 				/* No free entry available */
 				if (unlikely(free_entries == 0))
@@ -414,7 +410,7 @@  __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 		rte_pause();
 
 	r->prod.tail = prod_next;
-	return (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
+	return n;
 }
 
 /**
@@ -430,14 +426,10 @@  __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 			 unsigned n, enum rte_ring_queue_behavior behavior)
 {
@@ -457,7 +449,7 @@  __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	/* check that we have enough room in ring */
 	if (unlikely(n > free_entries)) {
 		if (behavior == RTE_RING_QUEUE_FIXED)
-			return -ENOBUFS;
+			return 0;
 		else {
 			/* No free entry available */
 			if (unlikely(free_entries == 0))
@@ -474,7 +466,7 @@  __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	rte_smp_wmb();
 
 	r->prod.tail = prod_next;
-	return (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
+	return n;
 }
 
 /**
@@ -495,16 +487,11 @@  __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
 
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 		 unsigned n, enum rte_ring_queue_behavior behavior)
 {
@@ -536,7 +523,7 @@  __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 		/* Set the actual entries for dequeue */
 		if (n > entries) {
 			if (behavior == RTE_RING_QUEUE_FIXED)
-				return -ENOENT;
+				return 0;
 			else {
 				if (unlikely(entries == 0))
 					return 0;
@@ -562,7 +549,7 @@  __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 
 	r->cons.tail = cons_next;
 
-	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+	return n;
 }
 
 /**
@@ -580,15 +567,10 @@  __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
 		 unsigned n, enum rte_ring_queue_behavior behavior)
 {
@@ -607,7 +589,7 @@  __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
 
 	if (n > entries) {
 		if (behavior == RTE_RING_QUEUE_FIXED)
-			return -ENOENT;
+			return 0;
 		else {
 			if (unlikely(entries == 0))
 				return 0;
@@ -623,7 +605,7 @@  __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
 	rte_smp_rmb();
 
 	r->cons.tail = cons_next;
-	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+	return n;
 }
 
 /**
@@ -639,10 +621,9 @@  __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
- *   - 0: Success; objects enqueue.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ *   The number of objects enqueued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
@@ -659,10 +640,9 @@  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
- *   - 0: Success; objects enqueued.
- *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ *   The number of objects enqueued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
@@ -683,10 +663,9 @@  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  * @param n
  *   The number of objects to add in the ring from the obj_table.
  * @return
- *   - 0: Success; objects enqueued.
- *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ *   The number of objects enqueued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 		      unsigned n)
 {
@@ -713,7 +692,7 @@  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 static inline int __attribute__((always_inline))
 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
 {
-	return rte_ring_mp_enqueue_bulk(r, &obj, 1);
+	return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -730,7 +709,7 @@  rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
 {
-	return rte_ring_sp_enqueue_bulk(r, &obj, 1);
+	return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -751,10 +730,7 @@  rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_enqueue(struct rte_ring *r, void *obj)
 {
-	if (r->prod.single)
-		return rte_ring_sp_enqueue(r, obj);
-	else
-		return rte_ring_mp_enqueue(r, obj);
+	return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -770,11 +746,9 @@  rte_ring_enqueue(struct rte_ring *r, void *obj)
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
+ *   The number of objects dequeued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
 	return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
@@ -791,11 +765,9 @@  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
  *   The number of objects to dequeue from the ring to the obj_table,
  *   must be strictly positive.
  * @return
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
+ *   The number of objects dequeued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
 	return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
@@ -815,11 +787,9 @@  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
- *     dequeued.
+ *   The number of objects dequeued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
 	if (r->cons.single)
@@ -846,7 +816,7 @@  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 static inline int __attribute__((always_inline))
 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
 {
-	return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
+	return rte_ring_mc_dequeue_bulk(r, obj_p, 1)  ? 0 : -ENOBUFS;
 }
 
 /**
@@ -864,7 +834,7 @@  rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
 static inline int __attribute__((always_inline))
 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
 {
-	return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
+	return rte_ring_sc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -886,10 +856,7 @@  rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
 static inline int __attribute__((always_inline))
 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 {
-	if (r->cons.single)
-		return rte_ring_sc_dequeue(r, obj_p);
-	else
-		return rte_ring_mc_dequeue(r, obj_p);
+	return rte_ring_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
 }
 
 /**
diff --git a/test/test-pipeline/pipeline_hash.c b/test/test-pipeline/pipeline_hash.c
index 10d2869..1ac0aa8 100644
--- a/test/test-pipeline/pipeline_hash.c
+++ b/test/test-pipeline/pipeline_hash.c
@@ -547,6 +547,6 @@  app_main_loop_rx_metadata(void) {
 				app.rings_rx[i],
 				(void **) app.mbuf_rx.array,
 				n_mbufs);
-		} while (ret < 0);
+		} while (ret == 0);
 	}
 }
diff --git a/test/test-pipeline/runtime.c b/test/test-pipeline/runtime.c
index 42a6142..4e20669 100644
--- a/test/test-pipeline/runtime.c
+++ b/test/test-pipeline/runtime.c
@@ -98,7 +98,7 @@  app_main_loop_rx(void) {
 				app.rings_rx[i],
 				(void **) app.mbuf_rx.array,
 				n_mbufs);
-		} while (ret < 0);
+		} while (ret == 0);
 	}
 }
 
@@ -123,7 +123,7 @@  app_main_loop_worker(void) {
 			(void **) worker_mbuf->array,
 			app.burst_size_worker_read);
 
-		if (ret == -ENOENT)
+		if (ret == 0)
 			continue;
 
 		do {
@@ -131,7 +131,7 @@  app_main_loop_worker(void) {
 				app.rings_tx[i ^ 1],
 				(void **) worker_mbuf->array,
 				app.burst_size_worker_write);
-		} while (ret < 0);
+		} while (ret == 0);
 	}
 }
 
@@ -152,7 +152,7 @@  app_main_loop_tx(void) {
 			(void **) &app.mbuf_tx[i].array[n_mbufs],
 			app.burst_size_tx_read);
 
-		if (ret == -ENOENT)
+		if (ret == 0)
 			continue;
 
 		n_mbufs += app.burst_size_tx_read;
diff --git a/test/test/test_ring.c b/test/test/test_ring.c
index 666a451..112433b 100644
--- a/test/test/test_ring.c
+++ b/test/test/test_ring.c
@@ -117,20 +117,18 @@  test_ring_basic_full_empty(void * const src[], void *dst[])
 		rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
 		printf("%s: iteration %u, random shift: %u;\n",
 		    __func__, i, rand);
-		TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
-		    rand));
-		TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
+		TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand) != 0);
+		TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) == rand);
 
 		/* fill the ring */
-		TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
-		    rsz));
+		TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz) != 0);
 		TEST_RING_VERIFY(0 == rte_ring_free_count(r));
 		TEST_RING_VERIFY(rsz == rte_ring_count(r));
 		TEST_RING_VERIFY(rte_ring_full(r));
 		TEST_RING_VERIFY(0 == rte_ring_empty(r));
 
 		/* empty the ring */
-		TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
+		TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rsz) == rsz);
 		TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
 		TEST_RING_VERIFY(0 == rte_ring_count(r));
 		TEST_RING_VERIFY(0 == rte_ring_full(r));
@@ -171,37 +169,37 @@  test_ring_basic(void)
 	printf("enqueue 1 obj\n");
 	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
 	cur_src += 1;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("enqueue 2 objs\n");
 	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
 	cur_src += 2;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("enqueue MAX_BULK objs\n");
 	ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
 	cur_src += MAX_BULK;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("dequeue 1 obj\n");
 	ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
 	cur_dst += 1;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("dequeue 2 objs\n");
 	ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
 	cur_dst += 2;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("dequeue MAX_BULK objs\n");
 	ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
 	cur_dst += MAX_BULK;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	/* check data */
@@ -217,37 +215,37 @@  test_ring_basic(void)
 	printf("enqueue 1 obj\n");
 	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
 	cur_src += 1;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("enqueue 2 objs\n");
 	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
 	cur_src += 2;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("enqueue MAX_BULK objs\n");
 	ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
 	cur_src += MAX_BULK;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("dequeue 1 obj\n");
 	ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
 	cur_dst += 1;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("dequeue 2 objs\n");
 	ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
 	cur_dst += 2;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	printf("dequeue MAX_BULK objs\n");
 	ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
 	cur_dst += MAX_BULK;
-	if (ret != 0)
+	if (ret == 0)
 		goto fail;
 
 	/* check data */
@@ -264,11 +262,11 @@  test_ring_basic(void)
 	for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
 		ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
 		cur_src += MAX_BULK;
-		if (ret != 0)
+		if (ret == 0)
 			goto fail;
 		ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
 		cur_dst += MAX_BULK;
-		if (ret != 0)
+		if (ret == 0)
 			goto fail;
 	}
 
@@ -294,25 +292,25 @@  test_ring_basic(void)
 
 	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
 	cur_src += num_elems;
-	if (ret != 0) {
+	if (ret == 0) {
 		printf("Cannot enqueue\n");
 		goto fail;
 	}
 	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
 	cur_src += num_elems;
-	if (ret != 0) {
+	if (ret == 0) {
 		printf("Cannot enqueue\n");
 		goto fail;
 	}
 	ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
 	cur_dst += num_elems;
-	if (ret != 0) {
+	if (ret == 0) {
 		printf("Cannot dequeue\n");
 		goto fail;
 	}
 	ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
 	cur_dst += num_elems;
-	if (ret != 0) {
+	if (ret == 0) {
 		printf("Cannot dequeue2\n");
 		goto fail;
 	}
diff --git a/test/test/test_ring_perf.c b/test/test/test_ring_perf.c
index 320c20c..8ccbdef 100644
--- a/test/test/test_ring_perf.c
+++ b/test/test/test_ring_perf.c
@@ -195,13 +195,13 @@  enqueue_bulk(void *p)
 
 	const uint64_t sp_start = rte_rdtsc();
 	for (i = 0; i < iterations; i++)
-		while (rte_ring_sp_enqueue_bulk(r, burst, size) != 0)
+		while (rte_ring_sp_enqueue_bulk(r, burst, size) == 0)
 			rte_pause();
 	const uint64_t sp_end = rte_rdtsc();
 
 	const uint64_t mp_start = rte_rdtsc();
 	for (i = 0; i < iterations; i++)
-		while (rte_ring_mp_enqueue_bulk(r, burst, size) != 0)
+		while (rte_ring_mp_enqueue_bulk(r, burst, size) == 0)
 			rte_pause();
 	const uint64_t mp_end = rte_rdtsc();
 
@@ -230,13 +230,13 @@  dequeue_bulk(void *p)
 
 	const uint64_t sc_start = rte_rdtsc();
 	for (i = 0; i < iterations; i++)
-		while (rte_ring_sc_dequeue_bulk(r, burst, size) != 0)
+		while (rte_ring_sc_dequeue_bulk(r, burst, size) == 0)
 			rte_pause();
 	const uint64_t sc_end = rte_rdtsc();
 
 	const uint64_t mc_start = rte_rdtsc();
 	for (i = 0; i < iterations; i++)
-		while (rte_ring_mc_dequeue_bulk(r, burst, size) != 0)
+		while (rte_ring_mc_dequeue_bulk(r, burst, size) == 0)
 			rte_pause();
 	const uint64_t mc_end = rte_rdtsc();