[dpdk-dev] [PATCH RFCv3 11/19] ring: allow enq fns to return free space value

Bruce Richardson bruce.richardson at intel.com
Tue Feb 7 15:12:49 CET 2017


Add an extra parameter to the ring enqueue burst/bulk functions so that
those functions can optionally return the amount of free space in the
ring. This information can be used by applications in a number of ways,
for instance, with single-producer queues, it provides a max
enqueue size which is guaranteed to work. It can also be used to
implement watermark functionality in apps, replacing the older
functionality with a more flexible version, which enables apps to
implement multiple watermark thresholds, rather than just one.

Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
---
 app/test-pipeline/pipeline_hash.c                  |  3 +-
 app/test-pipeline/runtime.c                        |  5 +-
 app/test/test_link_bonding_mode4.c                 |  3 +-
 app/test/test_pmd_ring_perf.c                      |  5 +-
 app/test/test_ring.c                               | 55 +++++++------
 app/test/test_ring_perf.c                          | 16 ++--
 app/test/test_table_ports.c                        |  4 +-
 app/test/virtual_pmd.c                             |  4 +-
 drivers/net/ring/rte_eth_ring.c                    |  2 +-
 examples/distributor/main.c                        |  3 +-
 examples/load_balancer/runtime.c                   | 12 ++-
 .../client_server_mp/mp_server/main.c              |  2 +-
 examples/packet_ordering/main.c                    |  7 +-
 examples/qos_sched/app_thread.c                    |  4 +-
 examples/server_node_efd/server/main.c             |  2 +-
 lib/librte_hash/rte_cuckoo_hash.c                  |  2 +-
 lib/librte_mempool/rte_mempool_ring.c              |  4 +-
 lib/librte_pdump/rte_pdump.c                       |  2 +-
 lib/librte_port/rte_port_ras.c                     |  2 +-
 lib/librte_port/rte_port_ring.c                    | 28 ++++---
 lib/librte_ring/rte_ring.h                         | 89 +++++++++++-----------
 21 files changed, 135 insertions(+), 119 deletions(-)

diff --git a/app/test-pipeline/pipeline_hash.c b/app/test-pipeline/pipeline_hash.c
index 1ac0aa8..0c6e04f 100644
--- a/app/test-pipeline/pipeline_hash.c
+++ b/app/test-pipeline/pipeline_hash.c
@@ -546,7 +546,8 @@ app_main_loop_rx_metadata(void) {
 			ret = rte_ring_sp_enqueue_bulk(
 				app.rings_rx[i],
 				(void **) app.mbuf_rx.array,
-				n_mbufs);
+				n_mbufs,
+				NULL);
 		} while (ret == 0);
 	}
 }
diff --git a/app/test-pipeline/runtime.c b/app/test-pipeline/runtime.c
index 4e20669..c06ff54 100644
--- a/app/test-pipeline/runtime.c
+++ b/app/test-pipeline/runtime.c
@@ -97,7 +97,7 @@ app_main_loop_rx(void) {
 			ret = rte_ring_sp_enqueue_bulk(
 				app.rings_rx[i],
 				(void **) app.mbuf_rx.array,
-				n_mbufs);
+				n_mbufs, NULL);
 		} while (ret == 0);
 	}
 }
@@ -130,7 +130,8 @@ app_main_loop_worker(void) {
 			ret = rte_ring_sp_enqueue_bulk(
 				app.rings_tx[i ^ 1],
 				(void **) worker_mbuf->array,
-				app.burst_size_worker_write);
+				app.burst_size_worker_write,
+				NULL);
 		} while (ret == 0);
 	}
 }
diff --git a/app/test/test_link_bonding_mode4.c b/app/test/test_link_bonding_mode4.c
index 53caa3e..8df28b4 100644
--- a/app/test/test_link_bonding_mode4.c
+++ b/app/test/test_link_bonding_mode4.c
@@ -206,7 +206,8 @@ slave_get_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
 static int
 slave_put_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
 {
-	return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, size);
+	return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf,
+			size, NULL);
 }
 
 static uint16_t
diff --git a/app/test/test_pmd_ring_perf.c b/app/test/test_pmd_ring_perf.c
index af011f7..045a7f2 100644
--- a/app/test/test_pmd_ring_perf.c
+++ b/app/test/test_pmd_ring_perf.c
@@ -98,7 +98,7 @@ test_single_enqueue_dequeue(void)
 	const uint64_t sc_start = rte_rdtsc_precise();
 	rte_compiler_barrier();
 	for (i = 0; i < iterations; i++) {
-		rte_ring_enqueue_bulk(r, &burst, 1);
+		rte_ring_enqueue_bulk(r, &burst, 1, NULL);
 		rte_ring_dequeue_bulk(r, &burst, 1);
 	}
 	const uint64_t sc_end = rte_rdtsc_precise();
@@ -131,7 +131,8 @@ test_bulk_enqueue_dequeue(void)
 	for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
 		const uint64_t sc_start = rte_rdtsc();
 		for (i = 0; i < iterations; i++) {
-			rte_ring_sp_enqueue_bulk(r, (void *)burst, bulk_sizes[sz]);
+			rte_ring_sp_enqueue_bulk(r, (void *)burst,
+					bulk_sizes[sz], NULL);
 			rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]);
 		}
 		const uint64_t sc_end = rte_rdtsc();
diff --git a/app/test/test_ring.c b/app/test/test_ring.c
index 4378fd0..aa2a711 100644
--- a/app/test/test_ring.c
+++ b/app/test/test_ring.c
@@ -118,12 +118,11 @@ test_ring_basic_full_empty(void * const src[], void *dst[])
 		printf("%s: iteration %u, random shift: %u;\n",
 		    __func__, i, rand);
 		TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src,
-		    rand));
+				rand, NULL));
 		TEST_RING_VERIFY(rand == rte_ring_dequeue_bulk(r, dst, rand));
 
 		/* fill the ring */
-		TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src,
-		    rsz));
+		TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src, rsz, NULL));
 		TEST_RING_VERIFY(0 == rte_ring_free_count(r));
 		TEST_RING_VERIFY(rsz == rte_ring_count(r));
 		TEST_RING_VERIFY(rte_ring_full(r));
@@ -169,19 +168,19 @@ test_ring_basic(void)
 	cur_dst = dst;
 
 	printf("enqueue 1 obj\n");
-	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
+	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1, NULL);
 	cur_src += 1;
 	if (ret == 0)
 		goto fail;
 
 	printf("enqueue 2 objs\n");
-	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
+	ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2, NULL);
 	cur_src += 2;
 	if (ret == 0)
 		goto fail;
 
 	printf("enqueue MAX_BULK objs\n");
-	ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
+	ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
 	cur_src += MAX_BULK;
 	if (ret == 0)
 		goto fail;
@@ -215,19 +214,19 @@ test_ring_basic(void)
 	cur_dst = dst;
 
 	printf("enqueue 1 obj\n");
-	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
+	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1, NULL);
 	cur_src += 1;
 	if (ret == 0)
 		goto fail;
 
 	printf("enqueue 2 objs\n");
-	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
+	ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2, NULL);
 	cur_src += 2;
 	if (ret == 0)
 		goto fail;
 
 	printf("enqueue MAX_BULK objs\n");
-	ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+	ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
 	cur_src += MAX_BULK;
 	if (ret == 0)
 		goto fail;
@@ -262,7 +261,7 @@ test_ring_basic(void)
 
 	printf("fill and empty the ring\n");
 	for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
-		ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+		ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
 		cur_src += MAX_BULK;
 		if (ret == 0)
 			goto fail;
@@ -292,13 +291,13 @@ test_ring_basic(void)
 	cur_src = src;
 	cur_dst = dst;
 
-	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
 	cur_src += num_elems;
 	if (ret == 0) {
 		printf("Cannot enqueue\n");
 		goto fail;
 	}
-	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+	ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
 	cur_src += num_elems;
 	if (ret == 0) {
 		printf("Cannot enqueue\n");
@@ -373,19 +372,19 @@ test_ring_burst_basic(void)
 
 	printf("Test SP & SC basic functions \n");
 	printf("enqueue 1 obj\n");
-	ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
+	ret = rte_ring_sp_enqueue_burst(r, cur_src, 1, NULL);
 	cur_src += 1;
 	if ((ret & RTE_RING_SZ_MASK) != 1)
 		goto fail;
 
 	printf("enqueue 2 objs\n");
-	ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+	ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
 	cur_src += 2;
 	if ((ret & RTE_RING_SZ_MASK) != 2)
 		goto fail;
 
 	printf("enqueue MAX_BULK objs\n");
-	ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
+	ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 	cur_src += MAX_BULK;
 	if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
 		goto fail;
@@ -421,7 +420,7 @@ test_ring_burst_basic(void)
 
 	printf("Test enqueue without enough memory space \n");
 	for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
-		ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+		ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 		cur_src += MAX_BULK;
 		if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
 			goto fail;
@@ -429,14 +428,14 @@ test_ring_burst_basic(void)
 	}
 
 	printf("Enqueue 2 objects, free entries = MAX_BULK - 2  \n");
-	ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+	ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
 	cur_src += 2;
 	if ((ret & RTE_RING_SZ_MASK) != 2)
 		goto fail;
 
 	printf("Enqueue the remaining entries = MAX_BULK - 2  \n");
 	/* Always one free entry left */
-	ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+	ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 	cur_src += MAX_BULK - 3;
 	if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
 		goto fail;
@@ -446,7 +445,7 @@ test_ring_burst_basic(void)
 		goto fail;
 
 	printf("Test enqueue for a full entry  \n");
-	ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+	ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 	if ((ret & RTE_RING_SZ_MASK) != 0)
 		goto fail;
 
@@ -488,19 +487,19 @@ test_ring_burst_basic(void)
 	printf("Test MP & MC basic functions \n");
 
 	printf("enqueue 1 obj\n");
-	ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
+	ret = rte_ring_mp_enqueue_burst(r, cur_src, 1, NULL);
 	cur_src += 1;
 	if ((ret & RTE_RING_SZ_MASK) != 1)
 		goto fail;
 
 	printf("enqueue 2 objs\n");
-	ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+	ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
 	cur_src += 2;
 	if ((ret & RTE_RING_SZ_MASK) != 2)
 		goto fail;
 
 	printf("enqueue MAX_BULK objs\n");
-	ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+	ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 	cur_src += MAX_BULK;
 	if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
 		goto fail;
@@ -536,7 +535,7 @@ test_ring_burst_basic(void)
 
 	printf("fill and empty the ring\n");
 	for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
-		ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+		ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 		cur_src += MAX_BULK;
 		if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
 			goto fail;
@@ -559,19 +558,19 @@ test_ring_burst_basic(void)
 
 	printf("Test enqueue without enough memory space \n");
 	for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
-		ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+		ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 		cur_src += MAX_BULK;
 		if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
 			goto fail;
 	}
 
 	/* Available memory space for the exact MAX_BULK objects */
-	ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+	ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
 	cur_src += 2;
 	if ((ret & RTE_RING_SZ_MASK) != 2)
 		goto fail;
 
-	ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+	ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
 	cur_src += MAX_BULK - 3;
 	if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
 		goto fail;
@@ -609,7 +608,7 @@ test_ring_burst_basic(void)
 
 	printf("Covering rte_ring_enqueue_burst functions \n");
 
-	ret = rte_ring_enqueue_burst(r, cur_src, 2);
+	ret = rte_ring_enqueue_burst(r, cur_src, 2, NULL);
 	cur_src += 2;
 	if ((ret & RTE_RING_SZ_MASK) != 2)
 		goto fail;
@@ -748,7 +747,7 @@ test_ring_basic_ex(void)
 	}
 
 	/* Covering the ring burst operation */
-	ret = rte_ring_enqueue_burst(rp, obj, 2);
+	ret = rte_ring_enqueue_burst(rp, obj, 2, NULL);
 	if ((ret & RTE_RING_SZ_MASK) != 2) {
 		printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
 		goto fail_test;
diff --git a/app/test/test_ring_perf.c b/app/test/test_ring_perf.c
index 8ccbdef..f95a8e9 100644
--- a/app/test/test_ring_perf.c
+++ b/app/test/test_ring_perf.c
@@ -195,13 +195,13 @@ enqueue_bulk(void *p)
 
 	const uint64_t sp_start = rte_rdtsc();
 	for (i = 0; i < iterations; i++)
-		while (rte_ring_sp_enqueue_bulk(r, burst, size) == 0)
+		while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0)
 			rte_pause();
 	const uint64_t sp_end = rte_rdtsc();
 
 	const uint64_t mp_start = rte_rdtsc();
 	for (i = 0; i < iterations; i++)
-		while (rte_ring_mp_enqueue_bulk(r, burst, size) == 0)
+		while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0)
 			rte_pause();
 	const uint64_t mp_end = rte_rdtsc();
 
@@ -323,14 +323,16 @@ test_burst_enqueue_dequeue(void)
 	for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
 		const uint64_t sc_start = rte_rdtsc();
 		for (i = 0; i < iterations; i++) {
-			rte_ring_sp_enqueue_burst(r, burst, bulk_sizes[sz]);
+			rte_ring_sp_enqueue_burst(r, burst,
+					bulk_sizes[sz], NULL);
 			rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
 		}
 		const uint64_t sc_end = rte_rdtsc();
 
 		const uint64_t mc_start = rte_rdtsc();
 		for (i = 0; i < iterations; i++) {
-			rte_ring_mp_enqueue_burst(r, burst, bulk_sizes[sz]);
+			rte_ring_mp_enqueue_burst(r, burst,
+					bulk_sizes[sz], NULL);
 			rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
 		}
 		const uint64_t mc_end = rte_rdtsc();
@@ -357,14 +359,16 @@ test_bulk_enqueue_dequeue(void)
 	for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
 		const uint64_t sc_start = rte_rdtsc();
 		for (i = 0; i < iterations; i++) {
-			rte_ring_sp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+			rte_ring_sp_enqueue_bulk(r, burst,
+					bulk_sizes[sz], NULL);
 			rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
 		}
 		const uint64_t sc_end = rte_rdtsc();
 
 		const uint64_t mc_start = rte_rdtsc();
 		for (i = 0; i < iterations; i++) {
-			rte_ring_mp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+			rte_ring_mp_enqueue_bulk(r, burst,
+					bulk_sizes[sz], NULL);
 			rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
 		}
 		const uint64_t mc_end = rte_rdtsc();
diff --git a/app/test/test_table_ports.c b/app/test/test_table_ports.c
index 2532367..395f4f3 100644
--- a/app/test/test_table_ports.c
+++ b/app/test/test_table_ports.c
@@ -80,7 +80,7 @@ test_port_ring_reader(void)
 	mbuf[0] = (void *)rte_pktmbuf_alloc(pool);
 
 	expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
-		mbuf, 1);
+		mbuf, 1, NULL);
 	received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf, 1);
 
 	if (received_pkts < expected_pkts)
@@ -93,7 +93,7 @@ test_port_ring_reader(void)
 		mbuf[i] = rte_pktmbuf_alloc(pool);
 
 	expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
-		(void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX);
+		(void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX, NULL);
 	received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf,
 		RTE_PORT_IN_BURST_SIZE_MAX);
 
diff --git a/app/test/virtual_pmd.c b/app/test/virtual_pmd.c
index 6e4dcd8..39e070c 100644
--- a/app/test/virtual_pmd.c
+++ b/app/test/virtual_pmd.c
@@ -380,7 +380,7 @@ virtual_ethdev_tx_burst_success(void *queue, struct rte_mbuf **bufs,
 		nb_pkts = 0;
 	else
 		nb_pkts = rte_ring_enqueue_burst(dev_private->tx_queue, (void **)bufs,
-				nb_pkts);
+				nb_pkts, NULL);
 
 	/* increment opacket count */
 	dev_private->eth_stats.opackets += nb_pkts;
@@ -496,7 +496,7 @@ virtual_ethdev_add_mbufs_to_rx_queue(uint8_t port_id,
 			vrtl_eth_dev->data->dev_private;
 
 	return rte_ring_enqueue_burst(dev_private->rx_queue, (void **)pkt_burst,
-			burst_length);
+			burst_length, NULL);
 }
 
 int
diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c
index 6f9cc1a..adbf478 100644
--- a/drivers/net/ring/rte_eth_ring.c
+++ b/drivers/net/ring/rte_eth_ring.c
@@ -102,7 +102,7 @@ eth_ring_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 	void **ptrs = (void *)&bufs[0];
 	struct ring_queue *r = q;
 	const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng,
-			ptrs, nb_bufs);
+			ptrs, nb_bufs, NULL);
 	if (r->rng->flags & RING_F_SP_ENQ) {
 		r->tx_pkts.cnt += nb_tx;
 		r->err_pkts.cnt += nb_bufs - nb_tx;
diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index e7641d2..cfd360b 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -238,7 +238,8 @@ lcore_rx(struct lcore_params *p)
 			continue;
 		}
 
-		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs,
+				nb_ret, NULL);
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
 			RTE_LOG_DP(DEBUG, DISTRAPP,
diff --git a/examples/load_balancer/runtime.c b/examples/load_balancer/runtime.c
index 82b10bc..1645994 100644
--- a/examples/load_balancer/runtime.c
+++ b/examples/load_balancer/runtime.c
@@ -144,7 +144,8 @@ app_lcore_io_rx_buffer_to_send (
 	ret = rte_ring_sp_enqueue_bulk(
 		lp->rx.rings[worker],
 		(void **) lp->rx.mbuf_out[worker].array,
-		bsz);
+		bsz,
+		NULL);
 
 	if (unlikely(ret == 0)) {
 		uint32_t k;
@@ -310,7 +311,8 @@ app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers)
 		ret = rte_ring_sp_enqueue_bulk(
 			lp->rx.rings[worker],
 			(void **) lp->rx.mbuf_out[worker].array,
-			lp->rx.mbuf_out[worker].n_mbufs);
+			lp->rx.mbuf_out[worker].n_mbufs,
+			NULL);
 
 		if (unlikely(ret == 0)) {
 			uint32_t k;
@@ -553,7 +555,8 @@ app_lcore_worker(
 			ret = rte_ring_sp_enqueue_bulk(
 				lp->rings_out[port],
 				(void **) lp->mbuf_out[port].array,
-				bsz_wr);
+				bsz_wr,
+				NULL);
 
 #if APP_STATS
 			lp->rings_out_iters[port] ++;
@@ -605,7 +608,8 @@ app_lcore_worker_flush(struct app_lcore_params_worker *lp)
 		ret = rte_ring_sp_enqueue_bulk(
 			lp->rings_out[port],
 			(void **) lp->mbuf_out[port].array,
-			lp->mbuf_out[port].n_mbufs);
+			lp->mbuf_out[port].n_mbufs,
+			NULL);
 
 		if (unlikely(ret == 0)) {
 			uint32_t k;
diff --git a/examples/multi_process/client_server_mp/mp_server/main.c b/examples/multi_process/client_server_mp/mp_server/main.c
index 19c95b2..c2b0261 100644
--- a/examples/multi_process/client_server_mp/mp_server/main.c
+++ b/examples/multi_process/client_server_mp/mp_server/main.c
@@ -227,7 +227,7 @@ flush_rx_queue(uint16_t client)
 
 	cl = &clients[client];
 	if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer,
-			cl_rx_buf[client].count) == 0){
+			cl_rx_buf[client].count, NULL) == 0){
 		for (j = 0; j < cl_rx_buf[client].count; j++)
 			rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
 		cl->stats.rx_drop += cl_rx_buf[client].count;
diff --git a/examples/packet_ordering/main.c b/examples/packet_ordering/main.c
index d4dc789..d268350 100644
--- a/examples/packet_ordering/main.c
+++ b/examples/packet_ordering/main.c
@@ -421,8 +421,8 @@ rx_thread(struct rte_ring *ring_out)
 					pkts[i++]->seqn = seqn++;
 
 				/* enqueue to rx_to_workers ring */
-				ret = rte_ring_enqueue_burst(ring_out, (void *) pkts,
-								nb_rx_pkts);
+				ret = rte_ring_enqueue_burst(ring_out,
+						(void *)pkts, nb_rx_pkts, NULL);
 				app_stats.rx.enqueue_pkts += ret;
 				if (unlikely(ret < nb_rx_pkts)) {
 					app_stats.rx.enqueue_failed_pkts +=
@@ -473,7 +473,8 @@ worker_thread(void *args_ptr)
 			burst_buffer[i++]->port ^= xor_val;
 
 		/* enqueue the modified mbufs to workers_to_tx ring */
-		ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size);
+		ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer,
+				burst_size, NULL);
 		__sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
 		if (unlikely(ret < burst_size)) {
 			/* Return the mbufs to their respective pool, dropping packets */
diff --git a/examples/qos_sched/app_thread.c b/examples/qos_sched/app_thread.c
index dab4594..0c81a15 100644
--- a/examples/qos_sched/app_thread.c
+++ b/examples/qos_sched/app_thread.c
@@ -107,7 +107,7 @@ app_rx_thread(struct thread_conf **confs)
 			}
 
 			if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
-					(void **)rx_mbufs, nb_rx) == 0)) {
+					(void **)rx_mbufs, nb_rx, NULL) == 0)) {
 				for(i = 0; i < nb_rx; i++) {
 					rte_pktmbuf_free(rx_mbufs[i]);
 
@@ -231,7 +231,7 @@ app_worker_thread(struct thread_conf **confs)
 					burst_conf.qos_dequeue);
 		if (likely(nb_pkt > 0))
 			while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
-					(void **)mbufs, nb_pkt) == 0)
+					(void **)mbufs, nb_pkt, NULL) == 0)
 				; /* empty body */
 
 		conf_idx++;
diff --git a/examples/server_node_efd/server/main.c b/examples/server_node_efd/server/main.c
index 3eb7fac..597b4c2 100644
--- a/examples/server_node_efd/server/main.c
+++ b/examples/server_node_efd/server/main.c
@@ -247,7 +247,7 @@ flush_rx_queue(uint16_t node)
 
 	cl = &nodes[node];
 	if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
-			cl_rx_buf[node].count) != cl_rx_buf[node].count){
+			cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){
 		for (j = 0; j < cl_rx_buf[node].count; j++)
 			rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
 		cl->stats.rx_drop += cl_rx_buf[node].count;
diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 51db006..6552199 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -808,7 +808,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 			/* Need to enqueue the free slots in global ring. */
 			n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
 						cached_free_slots->objs,
-						LCORE_CACHE_SIZE);
+						LCORE_CACHE_SIZE, NULL);
 			cached_free_slots->len -= n_slots;
 		}
 		/* Put index of new free slot in cache. */
diff --git a/lib/librte_mempool/rte_mempool_ring.c b/lib/librte_mempool/rte_mempool_ring.c
index 409b860..9b8fd2b 100644
--- a/lib/librte_mempool/rte_mempool_ring.c
+++ b/lib/librte_mempool/rte_mempool_ring.c
@@ -43,7 +43,7 @@ common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
 		unsigned n)
 {
 	return rte_ring_mp_enqueue_bulk(mp->pool_data,
-			obj_table, n) == 0 ? -ENOBUFS : 0;
+			obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
 }
 
 static int
@@ -51,7 +51,7 @@ common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table,
 		unsigned n)
 {
 	return rte_ring_sp_enqueue_bulk(mp->pool_data,
-			obj_table, n) == 0 ? -ENOBUFS : 0;
+			obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
 }
 
 static int
diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
index a580a6a..d6d3e46 100644
--- a/lib/librte_pdump/rte_pdump.c
+++ b/lib/librte_pdump/rte_pdump.c
@@ -197,7 +197,7 @@ pdump_copy(struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
 			dup_bufs[d_pkts++] = p;
 	}
 
-	ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts);
+	ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts, NULL);
 	if (unlikely(ring_enq < d_pkts)) {
 		RTE_LOG(DEBUG, PDUMP,
 			"only %d of packets enqueued to ring\n", ring_enq);
diff --git a/lib/librte_port/rte_port_ras.c b/lib/librte_port/rte_port_ras.c
index c4bb508..4de0945 100644
--- a/lib/librte_port/rte_port_ras.c
+++ b/lib/librte_port/rte_port_ras.c
@@ -167,7 +167,7 @@ send_burst(struct rte_port_ring_writer_ras *p)
 	uint32_t nb_tx;
 
 	nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
-			p->tx_buf_count);
+			p->tx_buf_count, NULL);
 
 	RTE_PORT_RING_WRITER_RAS_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
 	for ( ; nb_tx < p->tx_buf_count; nb_tx++)
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index 3b9d3d0..9fadac7 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -241,7 +241,7 @@ send_burst(struct rte_port_ring_writer *p)
 	uint32_t nb_tx;
 
 	nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
-			p->tx_buf_count);
+			p->tx_buf_count, NULL);
 
 	RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
 	for ( ; nb_tx < p->tx_buf_count; nb_tx++)
@@ -256,7 +256,7 @@ send_burst_mp(struct rte_port_ring_writer *p)
 	uint32_t nb_tx;
 
 	nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
-			p->tx_buf_count);
+			p->tx_buf_count, NULL);
 
 	RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
 	for ( ; nb_tx < p->tx_buf_count; nb_tx++)
@@ -318,11 +318,11 @@ rte_port_ring_writer_tx_bulk_internal(void *port,
 
 		RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
 		if (is_multi)
-			n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts,
-				n_pkts);
+			n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
+					(void **)pkts, n_pkts, NULL);
 		else
-			n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts,
-				n_pkts);
+			n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
+					(void **)pkts, n_pkts, NULL);
 
 		RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
 		for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
@@ -517,7 +517,7 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
 	uint32_t nb_tx = 0, i;
 
 	nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
-				p->tx_buf_count);
+				p->tx_buf_count, NULL);
 
 	/* We sent all the packets in a first try */
 	if (nb_tx >= p->tx_buf_count) {
@@ -527,7 +527,8 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
 
 	for (i = 0; i < p->n_retries; i++) {
 		nb_tx += rte_ring_sp_enqueue_burst(p->ring,
-				(void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+				(void **) (p->tx_buf + nb_tx),
+				p->tx_buf_count - nb_tx, NULL);
 
 		/* We sent all the packets in more than one try */
 		if (nb_tx >= p->tx_buf_count) {
@@ -550,7 +551,7 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
 	uint32_t nb_tx = 0, i;
 
 	nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
-				p->tx_buf_count);
+				p->tx_buf_count, NULL);
 
 	/* We sent all the packets in a first try */
 	if (nb_tx >= p->tx_buf_count) {
@@ -560,7 +561,8 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
 
 	for (i = 0; i < p->n_retries; i++) {
 		nb_tx += rte_ring_mp_enqueue_burst(p->ring,
-				(void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+				(void **) (p->tx_buf + nb_tx),
+				p->tx_buf_count - nb_tx, NULL);
 
 		/* We sent all the packets in more than one try */
 		if (nb_tx >= p->tx_buf_count) {
@@ -633,10 +635,12 @@ rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
 		RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
 		if (is_multi)
 			n_pkts_ok =
-				rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+				rte_ring_mp_enqueue_burst(p->ring,
+						(void **)pkts, n_pkts, NULL);
 		else
 			n_pkts_ok =
-				rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+				rte_ring_sp_enqueue_burst(p->ring,
+						(void **)pkts, n_pkts, NULL);
 
 		if (n_pkts_ok >= n_pkts)
 			return 0;
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index d4d44ce..2f8995c 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -354,20 +354,16 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
  */
 static inline unsigned int __attribute__((always_inline))
 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
+			 unsigned int n, enum rte_ring_queue_behavior behavior,
+			 unsigned int *free_space)
 {
 	uint32_t prod_head, prod_next;
 	uint32_t cons_tail, free_entries;
-	const unsigned max = n;
+	const unsigned int max = n;
 	int success;
 	unsigned int i;
 	uint32_t mask = r->mask;
 
-	/* Avoid the unnecessary cmpset operation below, which is also
-	 * potentially harmful when n equals 0. */
-	if (n == 0)
-		return 0;
-
 	/* move prod.head atomically */
 	do {
 		/* Reset n to the initial burst count */
@@ -382,16 +378,12 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 		free_entries = (mask + cons_tail - prod_head);
 
 		/* check that we have enough room in ring */
-		if (unlikely(n > free_entries)) {
-			if (behavior == RTE_RING_QUEUE_FIXED)
-				return 0;
-			else {
-				/* No free entry available */
-				if (unlikely(free_entries == 0))
-					return 0;
-				n = free_entries;
-			}
-		}
+		if (unlikely(n > free_entries))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ?
+					0 : free_entries;
+
+		if (n == 0)
+			goto end;
 
 		prod_next = prod_head + n;
 		success = rte_atomic32_cmpset(&r->prod.head, prod_head,
@@ -410,6 +402,9 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 		rte_pause();
 
 	r->prod.tail = prod_next;
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
 	return n;
 }
 
@@ -435,7 +430,8 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  */
 static inline unsigned int __attribute__((always_inline))
 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
+			 unsigned int n, enum rte_ring_queue_behavior behavior,
+			 unsigned int *free_space)
 {
 	uint32_t prod_head, cons_tail;
 	uint32_t prod_next, free_entries;
@@ -451,16 +447,12 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	free_entries = mask + cons_tail - prod_head;
 
 	/* check that we have enough room in ring */
-	if (unlikely(n > free_entries)) {
-		if (behavior == RTE_RING_QUEUE_FIXED)
-			return 0;
-		else {
-			/* No free entry available */
-			if (unlikely(free_entries == 0))
-				return 0;
-			n = free_entries;
-		}
-	}
+	if (unlikely(n > free_entries))
+		n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : free_entries;
+
+	if (n == 0)
+		goto end;
+
 
 	prod_next = prod_head + n;
 	r->prod.head = prod_next;
@@ -470,6 +462,9 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	rte_smp_wmb();
 
 	r->prod.tail = prod_next;
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
 	return n;
 }
 
@@ -639,9 +634,10 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
  */
 static inline unsigned int __attribute__((always_inline))
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-			 unsigned n)
+			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+			free_space);
 }
 
 /**
@@ -658,9 +654,10 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  */
 static inline unsigned int __attribute__((always_inline))
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-			 unsigned n)
+			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+			free_space);
 }
 
 /**
@@ -681,12 +678,12 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  */
 static inline unsigned int __attribute__((always_inline))
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-		      unsigned n)
+		      unsigned int n, unsigned int *free_space)
 {
 	if (r->prod.sp_enqueue)
-		return rte_ring_sp_enqueue_bulk(r, obj_table, n);
+		return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
 	else
-		return rte_ring_mp_enqueue_bulk(r, obj_table, n);
+		return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
 }
 
 /**
@@ -706,7 +703,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 static inline int __attribute__((always_inline))
 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
 {
-	return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+	return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -723,7 +720,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
 {
-	return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+	return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -744,7 +741,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_enqueue(struct rte_ring *r, void *obj)
 {
-	return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+	return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -990,9 +987,10 @@ struct rte_ring *rte_ring_lookup(const char *name);
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-			 unsigned n)
+			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_ring_mp_do_enqueue(r, obj_table, n,
+			RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
 /**
@@ -1009,9 +1007,10 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-			 unsigned n)
+			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_ring_sp_do_enqueue(r, obj_table, n,
+			RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
 /**
@@ -1032,12 +1031,12 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-		      unsigned n)
+		      unsigned int n, unsigned int *free_space)
 {
 	if (r->prod.sp_enqueue)
-		return rte_ring_sp_enqueue_burst(r, obj_table, n);
+		return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
 	else
-		return rte_ring_mp_enqueue_burst(r, obj_table, n);
+		return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
 }
 
 /**
-- 
2.9.3



More information about the dev mailing list