@@ -120,6 +120,9 @@ API Changes
* removed the build-time setting ``CONFIG_RTE_RING_PAUSE_REP_COUNT``
* removed the function ``rte_ring_set_water_mark`` as part of a general
removal of watermarks support in the library.
+ * added an extra parameter to the burst/bulk enqueue functions to
+ return the number of free spaces in the ring after enqueue. This can
+ be used by an application to implement its own watermark functionality.
* changed the return value of the enqueue and dequeue bulk functions to
match that of the burst equivalents. In all cases, ring functions which
operate on multiple packets now return the number of elements enqueued
@@ -102,7 +102,7 @@ eth_ring_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
void **ptrs = (void *)&bufs[0];
struct ring_queue *r = q;
const uint16_t nb_tx = (uint16_t)rte_ring_enqueue_burst(r->rng,
- ptrs, nb_bufs);
+ ptrs, nb_bufs, NULL);
if (r->rng->flags & RING_F_SP_ENQ) {
r->tx_pkts.cnt += nb_tx;
r->err_pkts.cnt += nb_bufs - nb_tx;
@@ -238,7 +238,8 @@ lcore_rx(struct lcore_params *p)
continue;
}
- uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+ uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs,
+ nb_ret, NULL);
app_stats.rx.enqueued_pkts += sent;
if (unlikely(sent < nb_ret)) {
RTE_LOG_DP(DEBUG, DISTRAPP,
@@ -144,7 +144,8 @@ app_lcore_io_rx_buffer_to_send (
ret = rte_ring_sp_enqueue_bulk(
lp->rx.rings[worker],
(void **) lp->rx.mbuf_out[worker].array,
- bsz);
+ bsz,
+ NULL);
if (unlikely(ret == 0)) {
uint32_t k;
@@ -310,7 +311,8 @@ app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers)
ret = rte_ring_sp_enqueue_bulk(
lp->rx.rings[worker],
(void **) lp->rx.mbuf_out[worker].array,
- lp->rx.mbuf_out[worker].n_mbufs);
+ lp->rx.mbuf_out[worker].n_mbufs,
+ NULL);
if (unlikely(ret == 0)) {
uint32_t k;
@@ -553,7 +555,8 @@ app_lcore_worker(
ret = rte_ring_sp_enqueue_bulk(
lp->rings_out[port],
(void **) lp->mbuf_out[port].array,
- bsz_wr);
+ bsz_wr,
+ NULL);
#if APP_STATS
lp->rings_out_iters[port] ++;
@@ -605,7 +608,8 @@ app_lcore_worker_flush(struct app_lcore_params_worker *lp)
ret = rte_ring_sp_enqueue_bulk(
lp->rings_out[port],
(void **) lp->mbuf_out[port].array,
- lp->mbuf_out[port].n_mbufs);
+ lp->mbuf_out[port].n_mbufs,
+ NULL);
if (unlikely(ret == 0)) {
uint32_t k;
@@ -227,7 +227,7 @@ flush_rx_queue(uint16_t client)
cl = &clients[client];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[client].buffer,
- cl_rx_buf[client].count) == 0){
+ cl_rx_buf[client].count, NULL) == 0){
for (j = 0; j < cl_rx_buf[client].count; j++)
rte_pktmbuf_free(cl_rx_buf[client].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[client].count;
@@ -421,8 +421,8 @@ rx_thread(struct rte_ring *ring_out)
pkts[i++]->seqn = seqn++;
/* enqueue to rx_to_workers ring */
- ret = rte_ring_enqueue_burst(ring_out, (void *) pkts,
- nb_rx_pkts);
+ ret = rte_ring_enqueue_burst(ring_out,
+ (void *)pkts, nb_rx_pkts, NULL);
app_stats.rx.enqueue_pkts += ret;
if (unlikely(ret < nb_rx_pkts)) {
app_stats.rx.enqueue_failed_pkts +=
@@ -473,7 +473,8 @@ worker_thread(void *args_ptr)
burst_buffer[i++]->port ^= xor_val;
/* enqueue the modified mbufs to workers_to_tx ring */
- ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer, burst_size);
+ ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer,
+ burst_size, NULL);
__sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
if (unlikely(ret < burst_size)) {
/* Return the mbufs to their respective pool, dropping packets */
@@ -107,7 +107,7 @@ app_rx_thread(struct thread_conf **confs)
}
if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
- (void **)rx_mbufs, nb_rx) == 0)) {
+ (void **)rx_mbufs, nb_rx, NULL) == 0)) {
for(i = 0; i < nb_rx; i++) {
rte_pktmbuf_free(rx_mbufs[i]);
@@ -231,7 +231,7 @@ app_worker_thread(struct thread_conf **confs)
burst_conf.qos_dequeue);
if (likely(nb_pkt > 0))
while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
- (void **)mbufs, nb_pkt) == 0)
+ (void **)mbufs, nb_pkt, NULL) == 0)
; /* empty body */
conf_idx++;
@@ -247,7 +247,7 @@ flush_rx_queue(uint16_t node)
cl = &nodes[node];
if (rte_ring_enqueue_bulk(cl->rx_q, (void **)cl_rx_buf[node].buffer,
- cl_rx_buf[node].count) != cl_rx_buf[node].count){
+ cl_rx_buf[node].count, NULL) != cl_rx_buf[node].count){
for (j = 0; j < cl_rx_buf[node].count; j++)
rte_pktmbuf_free(cl_rx_buf[node].buffer[j]);
cl->stats.rx_drop += cl_rx_buf[node].count;
@@ -808,7 +808,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
/* Need to enqueue the free slots in global ring. */
n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
cached_free_slots->objs,
- LCORE_CACHE_SIZE);
+ LCORE_CACHE_SIZE, NULL);
cached_free_slots->len -= n_slots;
}
/* Put index of new free slot in cache. */
@@ -43,7 +43,7 @@ common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
unsigned n)
{
return rte_ring_mp_enqueue_bulk(mp->pool_data,
- obj_table, n) == 0 ? -ENOBUFS : 0;
+ obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
}
static int
@@ -51,7 +51,7 @@ common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table,
unsigned n)
{
return rte_ring_sp_enqueue_bulk(mp->pool_data,
- obj_table, n) == 0 ? -ENOBUFS : 0;
+ obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
}
static int
@@ -197,7 +197,7 @@ pdump_copy(struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
dup_bufs[d_pkts++] = p;
}
- ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts);
+ ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts, NULL);
if (unlikely(ring_enq < d_pkts)) {
RTE_LOG(DEBUG, PDUMP,
"only %d of packets enqueued to ring\n", ring_enq);
@@ -167,7 +167,7 @@ send_burst(struct rte_port_ring_writer_ras *p)
uint32_t nb_tx;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
- p->tx_buf_count);
+ p->tx_buf_count, NULL);
RTE_PORT_RING_WRITER_RAS_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
@@ -241,7 +241,7 @@ send_burst(struct rte_port_ring_writer *p)
uint32_t nb_tx;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
- p->tx_buf_count);
+ p->tx_buf_count, NULL);
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
@@ -256,7 +256,7 @@ send_burst_mp(struct rte_port_ring_writer *p)
uint32_t nb_tx;
nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
- p->tx_buf_count);
+ p->tx_buf_count, NULL);
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
for ( ; nb_tx < p->tx_buf_count; nb_tx++)
@@ -318,11 +318,11 @@ rte_port_ring_writer_tx_bulk_internal(void *port,
RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
if (is_multi)
- n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts,
- n_pkts);
+ n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
else
- n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts,
- n_pkts);
+ n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
@@ -517,7 +517,7 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
uint32_t nb_tx = 0, i;
nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
- p->tx_buf_count);
+ p->tx_buf_count, NULL);
/* We sent all the packets in a first try */
if (nb_tx >= p->tx_buf_count) {
@@ -527,7 +527,8 @@ send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
for (i = 0; i < p->n_retries; i++) {
nb_tx += rte_ring_sp_enqueue_burst(p->ring,
- (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+ (void **) (p->tx_buf + nb_tx),
+ p->tx_buf_count - nb_tx, NULL);
/* We sent all the packets in more than one try */
if (nb_tx >= p->tx_buf_count) {
@@ -550,7 +551,7 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
uint32_t nb_tx = 0, i;
nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
- p->tx_buf_count);
+ p->tx_buf_count, NULL);
/* We sent all the packets in a first try */
if (nb_tx >= p->tx_buf_count) {
@@ -560,7 +561,8 @@ send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
for (i = 0; i < p->n_retries; i++) {
nb_tx += rte_ring_mp_enqueue_burst(p->ring,
- (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+ (void **) (p->tx_buf + nb_tx),
+ p->tx_buf_count - nb_tx, NULL);
/* We sent all the packets in more than one try */
if (nb_tx >= p->tx_buf_count) {
@@ -633,10 +635,12 @@ rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
if (is_multi)
n_pkts_ok =
- rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+ rte_ring_mp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
else
n_pkts_ok =
- rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+ rte_ring_sp_enqueue_burst(p->ring,
+ (void **)pkts, n_pkts, NULL);
if (n_pkts_ok >= n_pkts)
return 0;
@@ -357,20 +357,16 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
*/
static inline unsigned int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
- const unsigned max = n;
+ const unsigned int max = n;
int success;
unsigned int i;
uint32_t mask = r->mask;
- /* Avoid the unnecessary cmpset operation below, which is also
- * potentially harmful when n equals 0. */
- if (n == 0)
- return 0;
-
/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
@@ -385,16 +381,12 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
free_entries = (mask + cons_tail - prod_head);
/* check that we have enough room in ring */
- if (unlikely(n > free_entries)) {
- if (behavior == RTE_RING_QUEUE_FIXED)
- return 0;
- else {
- /* No free entry available */
- if (unlikely(free_entries == 0))
- return 0;
- n = free_entries;
- }
- }
+ if (unlikely(n > free_entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ?
+ 0 : free_entries;
+
+ if (n == 0)
+ goto end;
prod_next = prod_head + n;
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
@@ -413,6 +405,9 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
rte_pause();
r->prod.tail = prod_next;
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
return n;
}
@@ -434,7 +429,8 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
*/
static inline unsigned int __attribute__((always_inline))
__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
{
uint32_t prod_head, cons_tail;
uint32_t prod_next, free_entries;
@@ -450,16 +446,12 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
free_entries = mask + cons_tail - prod_head;
/* check that we have enough room in ring */
- if (unlikely(n > free_entries)) {
- if (behavior == RTE_RING_QUEUE_FIXED)
- return 0;
- else {
- /* No free entry available */
- if (unlikely(free_entries == 0))
- return 0;
- n = free_entries;
- }
- }
+ if (unlikely(n > free_entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : free_entries;
+
+ if (n == 0)
+ goto end;
+
prod_next = prod_head + n;
r->prod.head = prod_next;
@@ -469,6 +461,9 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
rte_smp_wmb();
r->prod.tail = prod_next;
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
return n;
}
@@ -628,9 +623,10 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+ unsigned int n, unsigned int *free_space)
{
- return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ free_space);
}
/**
@@ -647,9 +643,10 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+ unsigned int n, unsigned int *free_space)
{
- return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+ free_space);
}
/**
@@ -670,12 +667,12 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
*/
static inline unsigned int __attribute__((always_inline))
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+ unsigned int n, unsigned int *free_space)
{
if (r->prod.sp_enqueue)
- return rte_ring_sp_enqueue_bulk(r, obj_table, n);
+ return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
else
- return rte_ring_mp_enqueue_bulk(r, obj_table, n);
+ return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
}
/**
@@ -695,7 +692,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
static inline int __attribute__((always_inline))
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{
- return rte_ring_mp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+ return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
}
/**
@@ -712,7 +709,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
static inline int __attribute__((always_inline))
rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
{
- return rte_ring_sp_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+ return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
}
/**
@@ -733,7 +730,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
static inline int __attribute__((always_inline))
rte_ring_enqueue(struct rte_ring *r, void *obj)
{
- return rte_ring_enqueue_bulk(r, &obj, 1) ? 0 : -ENOBUFS;
+ return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
}
/**
@@ -979,9 +976,10 @@ struct rte_ring *rte_ring_lookup(const char *name);
*/
static inline unsigned __attribute__((always_inline))
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+ unsigned int n, unsigned int *free_space)
{
- return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_ring_mp_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, free_space);
}
/**
@@ -998,9 +996,10 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
*/
static inline unsigned __attribute__((always_inline))
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+ unsigned int n, unsigned int *free_space)
{
- return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_ring_sp_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE, free_space);
}
/**
@@ -1021,12 +1020,12 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
*/
static inline unsigned __attribute__((always_inline))
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+ unsigned int n, unsigned int *free_space)
{
if (r->prod.sp_enqueue)
- return rte_ring_sp_enqueue_burst(r, obj_table, n);
+ return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
else
- return rte_ring_mp_enqueue_burst(r, obj_table, n);
+ return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
}
/**
@@ -546,7 +546,8 @@ app_main_loop_rx_metadata(void) {
ret = rte_ring_sp_enqueue_bulk(
app.rings_rx[i],
(void **) app.mbuf_rx.array,
- n_mbufs);
+ n_mbufs,
+ NULL);
} while (ret == 0);
}
}
@@ -97,7 +97,7 @@ app_main_loop_rx(void) {
ret = rte_ring_sp_enqueue_bulk(
app.rings_rx[i],
(void **) app.mbuf_rx.array,
- n_mbufs);
+ n_mbufs, NULL);
} while (ret == 0);
}
}
@@ -130,7 +130,8 @@ app_main_loop_worker(void) {
ret = rte_ring_sp_enqueue_bulk(
app.rings_tx[i ^ 1],
(void **) worker_mbuf->array,
- app.burst_size_worker_write);
+ app.burst_size_worker_write,
+ NULL);
} while (ret == 0);
}
}
@@ -206,7 +206,8 @@ slave_get_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
static int
slave_put_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
{
- return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf, size);
+ return rte_ring_enqueue_burst(slave->rx_queue, (void **)buf,
+ size, NULL);
}
static uint16_t
@@ -98,7 +98,7 @@ test_single_enqueue_dequeue(void)
const uint64_t sc_start = rte_rdtsc_precise();
rte_compiler_barrier();
for (i = 0; i < iterations; i++) {
- rte_ring_enqueue_bulk(r, &burst, 1);
+ rte_ring_enqueue_bulk(r, &burst, 1, NULL);
rte_ring_dequeue_bulk(r, &burst, 1);
}
const uint64_t sc_end = rte_rdtsc_precise();
@@ -131,7 +131,8 @@ test_bulk_enqueue_dequeue(void)
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_sp_enqueue_bulk(r, (void *)burst, bulk_sizes[sz]);
+ rte_ring_sp_enqueue_bulk(r, (void *)burst,
+ bulk_sizes[sz], NULL);
rte_ring_sc_dequeue_bulk(r, (void *)burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
@@ -117,11 +117,12 @@ test_ring_basic_full_empty(void * const src[], void *dst[])
rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
printf("%s: iteration %u, random shift: %u;\n",
__func__, i, rand);
- TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand) != 0);
+ TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rand,
+ NULL) != 0);
TEST_RING_VERIFY(rte_ring_dequeue_bulk(r, dst, rand) == rand);
/* fill the ring */
- TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz) != 0);
+ TEST_RING_VERIFY(rte_ring_enqueue_bulk(r, src, rsz, NULL) != 0);
TEST_RING_VERIFY(0 == rte_ring_free_count(r));
TEST_RING_VERIFY(rsz == rte_ring_count(r));
TEST_RING_VERIFY(rte_ring_full(r));
@@ -167,19 +168,19 @@ test_ring_basic(void)
cur_dst = dst;
printf("enqueue 1 obj\n");
- ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1, NULL);
cur_src += 1;
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2, NULL);
cur_src += 2;
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
@@ -213,19 +214,19 @@ test_ring_basic(void)
cur_dst = dst;
printf("enqueue 1 obj\n");
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1, NULL);
cur_src += 1;
if (ret == 0)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2, NULL);
cur_src += 2;
if (ret == 0)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
@@ -260,7 +261,7 @@ test_ring_basic(void)
printf("fill and empty the ring\n");
for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
- ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if (ret == 0)
goto fail;
@@ -290,13 +291,13 @@ test_ring_basic(void)
cur_src = src;
cur_dst = dst;
- ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+ ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
cur_src += num_elems;
if (ret == 0) {
printf("Cannot enqueue\n");
goto fail;
}
- ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+ ret = rte_ring_enqueue_bulk(r, cur_src, num_elems, NULL);
cur_src += num_elems;
if (ret == 0) {
printf("Cannot enqueue\n");
@@ -371,19 +372,19 @@ test_ring_burst_basic(void)
printf("Test SP & SC basic functions \n");
printf("enqueue 1 obj\n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 1, NULL);
cur_src += 1;
if ((ret & RTE_RING_SZ_MASK) != 1)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
@@ -419,7 +420,7 @@ test_ring_burst_basic(void)
printf("Test enqueue without enough memory space \n");
for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
goto fail;
@@ -427,14 +428,14 @@ test_ring_burst_basic(void)
}
printf("Enqueue 2 objects, free entries = MAX_BULK - 2 \n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("Enqueue the remaining entries = MAX_BULK - 2 \n");
/* Always one free entry left */
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK - 3;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
goto fail;
@@ -444,7 +445,7 @@ test_ring_burst_basic(void)
goto fail;
printf("Test enqueue for a full entry \n");
- ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
if ((ret & RTE_RING_SZ_MASK) != 0)
goto fail;
@@ -486,19 +487,19 @@ test_ring_burst_basic(void)
printf("Test MP & MC basic functions \n");
printf("enqueue 1 obj\n");
- ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 1, NULL);
cur_src += 1;
if ((ret & RTE_RING_SZ_MASK) != 1)
goto fail;
printf("enqueue 2 objs\n");
- ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
printf("enqueue MAX_BULK objs\n");
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
@@ -534,7 +535,7 @@ test_ring_burst_basic(void)
printf("fill and empty the ring\n");
for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
@@ -557,19 +558,19 @@ test_ring_burst_basic(void)
printf("Test enqueue without enough memory space \n");
for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
goto fail;
}
/* Available memory space for the exact MAX_BULK objects */
- ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
- ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK, NULL);
cur_src += MAX_BULK - 3;
if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
goto fail;
@@ -607,7 +608,7 @@ test_ring_burst_basic(void)
printf("Covering rte_ring_enqueue_burst functions \n");
- ret = rte_ring_enqueue_burst(r, cur_src, 2);
+ ret = rte_ring_enqueue_burst(r, cur_src, 2, NULL);
cur_src += 2;
if ((ret & RTE_RING_SZ_MASK) != 2)
goto fail;
@@ -746,7 +747,7 @@ test_ring_basic_ex(void)
}
/* Covering the ring burst operation */
- ret = rte_ring_enqueue_burst(rp, obj, 2);
+ ret = rte_ring_enqueue_burst(rp, obj, 2, NULL);
if ((ret & RTE_RING_SZ_MASK) != 2) {
printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
goto fail_test;
@@ -195,13 +195,13 @@ enqueue_bulk(void *p)
const uint64_t sp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
- while (rte_ring_sp_enqueue_bulk(r, burst, size) == 0)
+ while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0)
rte_pause();
const uint64_t sp_end = rte_rdtsc();
const uint64_t mp_start = rte_rdtsc();
for (i = 0; i < iterations; i++)
- while (rte_ring_mp_enqueue_bulk(r, burst, size) == 0)
+ while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0)
rte_pause();
const uint64_t mp_end = rte_rdtsc();
@@ -323,14 +323,16 @@ test_burst_enqueue_dequeue(void)
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_sp_enqueue_burst(r, burst, bulk_sizes[sz]);
+ rte_ring_sp_enqueue_burst(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
const uint64_t mc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_mp_enqueue_burst(r, burst, bulk_sizes[sz]);
+ rte_ring_mp_enqueue_burst(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
}
const uint64_t mc_end = rte_rdtsc();
@@ -357,14 +359,16 @@ test_bulk_enqueue_dequeue(void)
for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
const uint64_t sc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_sp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+ rte_ring_sp_enqueue_bulk(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
}
const uint64_t sc_end = rte_rdtsc();
const uint64_t mc_start = rte_rdtsc();
for (i = 0; i < iterations; i++) {
- rte_ring_mp_enqueue_bulk(r, burst, bulk_sizes[sz]);
+ rte_ring_mp_enqueue_bulk(r, burst,
+ bulk_sizes[sz], NULL);
rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
}
const uint64_t mc_end = rte_rdtsc();
@@ -80,7 +80,7 @@ test_port_ring_reader(void)
mbuf[0] = (void *)rte_pktmbuf_alloc(pool);
expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
- mbuf, 1);
+ mbuf, 1, NULL);
received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf, 1);
if (received_pkts < expected_pkts)
@@ -93,7 +93,7 @@ test_port_ring_reader(void)
mbuf[i] = rte_pktmbuf_alloc(pool);
expected_pkts = rte_ring_sp_enqueue_burst(port_ring_reader_params.ring,
- (void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX);
+ (void * const *) mbuf, RTE_PORT_IN_BURST_SIZE_MAX, NULL);
received_pkts = rte_port_ring_reader_ops.f_rx(port, res_mbuf,
RTE_PORT_IN_BURST_SIZE_MAX);
@@ -380,7 +380,7 @@ virtual_ethdev_tx_burst_success(void *queue, struct rte_mbuf **bufs,
nb_pkts = 0;
else
nb_pkts = rte_ring_enqueue_burst(dev_private->tx_queue, (void **)bufs,
- nb_pkts);
+ nb_pkts, NULL);
/* increment opacket count */
dev_private->eth_stats.opackets += nb_pkts;
@@ -496,7 +496,7 @@ virtual_ethdev_add_mbufs_to_rx_queue(uint8_t port_id,
vrtl_eth_dev->data->dev_private;
return rte_ring_enqueue_burst(dev_private->rx_queue, (void **)pkt_burst,
- burst_length);
+ burst_length, NULL);
}
int