@@ -101,7 +101,7 @@ EAL_REGISTER_TAILQ(rte_common_ring_tailq)
/* return the size of memory occupied by a ring */
ssize_t
-rte_common_ring_get_memsize(unsigned count)
+rte_common_ring_get_memsize(unsigned count, unsigned int elem_sz)
{
ssize_t sz;
@@ -113,14 +113,14 @@ rte_common_ring_get_memsize(unsigned count)
return -EINVAL;
}
- sz = sizeof(struct rte_ring) + count * sizeof(void *);
+ sz = sizeof(struct rte_ring) + count * elem_sz;
sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
return sz;
}
int
rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
- unsigned flags)
+ unsigned flags, unsigned int elem_sz)
{
int ret;
@@ -146,6 +146,7 @@ rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
if (ret < 0 || ret >= (int)sizeof(r->name))
return -ENAMETOOLONG;
r->flags = flags;
+ r->elem_sz = elem_sz;
r->prod.watermark = count;
r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
@@ -160,7 +161,7 @@ rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
/* create the ring */
struct rte_ring *
rte_common_ring_create(const char *name, unsigned count, int socket_id,
- unsigned flags)
+ unsigned flags, unsigned int elem_sz)
{
char mz_name[RTE_MEMZONE_NAMESIZE];
struct rte_ring *r;
@@ -173,7 +174,7 @@ rte_common_ring_create(const char *name, unsigned count, int socket_id,
ring_list = RTE_TAILQ_CAST(rte_common_ring_tailq.head, rte_common_ring_list);
- ring_size = rte_common_ring_get_memsize(count);
+ ring_size = rte_common_ring_get_memsize(count, elem_sz);
if (ring_size < 0) {
rte_errno = ring_size;
return NULL;
@@ -203,7 +204,7 @@ rte_common_ring_create(const char *name, unsigned count, int socket_id,
r = mz->addr;
/* no need to check return value here, we already checked the
* arguments above */
- rte_common_ring_init(r, name, count, flags);
+ rte_common_ring_init(r, name, count, flags, elem_sz);
te->data = (void *) r;
r->memzone = mz;
@@ -293,6 +294,7 @@ rte_common_ring_dump(FILE *f, const struct rte_ring *r)
fprintf(f, "ring <%s>@%p\n", r->name, r);
fprintf(f, " flags=%x\n", r->flags);
+ fprintf(f, " elem_sz=%u\n", r->elem_sz);
fprintf(f, " size=%"PRIu32"\n", r->prod.size);
fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
@@ -101,6 +101,7 @@ extern "C" {
#include <rte_atomic.h>
#include <rte_branch_prediction.h>
#include <rte_memzone.h>
+#include <rte_vect.h>
#define RTE_TAILQ_RING_NAME "RTE_RING"
@@ -157,6 +158,7 @@ struct rte_ring {
*/
char name[RTE_MEMZONE_NAMESIZE]; /**< Name of the ring. */
int flags; /**< Flags supplied at creation. */
+ unsigned int elem_sz; /**< Size of a ring entry */
const struct rte_memzone *memzone;
/**< Memzone, if any, containing the rte_ring */
@@ -232,7 +234,7 @@ struct rte_ring {
* - The memory size needed for the ring on success.
* - -EINVAL if count is not a power of 2.
*/
-ssize_t rte_common_ring_get_memsize(unsigned count);
+ssize_t rte_common_ring_get_memsize(unsigned count, unsigned int elem_sz);
/**
* Initialize a ring structure.
@@ -269,7 +271,7 @@ ssize_t rte_common_ring_get_memsize(unsigned count);
* 0 on success, or a negative value on error.
*/
int rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
- unsigned flags);
+ unsigned flags, unsigned int elem_sz);
/**
* Create a new ring named *name* in memory.
@@ -311,7 +313,8 @@ int rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
* - ENOMEM - no appropriate memory area found in which to create memzone
*/
struct rte_ring *rte_common_ring_create(const char *name, unsigned count,
- int socket_id, unsigned flags);
+ int socket_id, unsigned flags,
+ unsigned int elem_sz);
/**
* De-allocate all memory used by the ring.
*
@@ -354,25 +357,50 @@ void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
* Placed here since identical code needed in both
* single and multi producer enqueue functions */
#define ENQUEUE_PTRS() do { \
+ void * const *objs = obj_table; \
const uint32_t size = r->prod.size; \
uint32_t idx = prod_head & mask; \
if (likely(idx + n < size)) { \
for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
- r->ring[idx] = obj_table[i]; \
- r->ring[idx+1] = obj_table[i+1]; \
- r->ring[idx+2] = obj_table[i+2]; \
- r->ring[idx+3] = obj_table[i+3]; \
+ r->ring[idx] = objs[i]; \
+ r->ring[idx+1] = objs[i+1]; \
+ r->ring[idx+2] = objs[i+2]; \
+ r->ring[idx+3] = objs[i+3]; \
} \
switch (n & 0x3) { \
- case 3: r->ring[idx++] = obj_table[i++]; \
- case 2: r->ring[idx++] = obj_table[i++]; \
- case 1: r->ring[idx++] = obj_table[i++]; \
+ case 3: r->ring[idx++] = objs[i++]; \
+ case 2: r->ring[idx++] = objs[i++]; \
+ case 1: r->ring[idx++] = objs[i++]; \
} \
} else { \
for (i = 0; idx < size; i++, idx++)\
- r->ring[idx] = obj_table[i]; \
+ r->ring[idx] = objs[i]; \
for (idx = 0; i < n; i++, idx++) \
- r->ring[idx] = obj_table[i]; \
+ r->ring[idx] = objs[i]; \
+ } \
+} while(0)
+#define ENQUEUE_16B() do { \
+ rte_xmm_t *ring = (void *)r->ring; \
+ const rte_xmm_t *objs = obj_table; \
+ const uint32_t size = r->prod.size; \
+ uint32_t idx = prod_head & mask; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
+ ring[idx] = objs[i]; \
+ ring[idx+1] = objs[i+1]; \
+ ring[idx+2] = objs[i+2]; \
+ ring[idx+3] = objs[i+3]; \
+ } \
+ switch (n & 0x3) { \
+ case 3: ring[idx++] = objs[i++]; \
+ case 2: ring[idx++] = objs[i++]; \
+ case 1: ring[idx++] = objs[i++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++)\
+ ring[idx] = objs[i]; \
+ for (idx = 0; i < n; i++, idx++) \
+ ring[idx] = objs[i]; \
} \
} while(0)
@@ -380,25 +408,50 @@ void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
* Placed here since identical code needed in both
* single and multi consumer dequeue functions */
#define DEQUEUE_PTRS() do { \
+ void **objs = obj_table; \
+ uint32_t idx = cons_head & mask; \
+ const uint32_t size = r->cons.size; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
+ objs[i] = r->ring[idx]; \
+ objs[i+1] = r->ring[idx+1]; \
+ objs[i+2] = r->ring[idx+2]; \
+ objs[i+3] = r->ring[idx+3]; \
+ } \
+ switch (n & 0x3) { \
+ case 3: objs[i++] = r->ring[idx++]; \
+ case 2: objs[i++] = r->ring[idx++]; \
+ case 1: objs[i++] = r->ring[idx++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) \
+ objs[i] = r->ring[idx]; \
+ for (idx = 0; i < n; i++, idx++) \
+ objs[i] = r->ring[idx]; \
+ } \
+} while (0)
+#define DEQUEUE_16B() do { \
+ rte_xmm_t *ring = (void *)r->ring; \
+ rte_xmm_t *objs = obj_table; \
uint32_t idx = cons_head & mask; \
const uint32_t size = r->cons.size; \
if (likely(idx + n < size)) { \
for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
- obj_table[i] = r->ring[idx]; \
- obj_table[i+1] = r->ring[idx+1]; \
- obj_table[i+2] = r->ring[idx+2]; \
- obj_table[i+3] = r->ring[idx+3]; \
+ objs[i] = ring[idx]; \
+ objs[i+1] = ring[idx+1]; \
+ objs[i+2] = ring[idx+2]; \
+ objs[i+3] = ring[idx+3]; \
} \
switch (n & 0x3) { \
- case 3: obj_table[i++] = r->ring[idx++]; \
- case 2: obj_table[i++] = r->ring[idx++]; \
- case 1: obj_table[i++] = r->ring[idx++]; \
+ case 3: objs[i++] = ring[idx++]; \
+ case 2: objs[i++] = ring[idx++]; \
+ case 1: objs[i++] = ring[idx++]; \
} \
} else { \
for (i = 0; idx < size; i++, idx++) \
- obj_table[i] = r->ring[idx]; \
+ objs[i] = ring[idx]; \
for (idx = 0; i < n; i++, idx++) \
- obj_table[i] = r->ring[idx]; \
+ objs[i] = ring[idx]; \
} \
} while (0)
@@ -428,8 +481,9 @@ void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
* - n: Actual number of objects enqueued.
*/
static inline int __attribute__((always_inline))
-__rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_mp_do_enqueue(struct rte_ring *r, const void *obj_table,
+ unsigned n, unsigned int elem_sz,
+ enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
@@ -480,7 +534,10 @@ __rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
} while (unlikely(success == 0));
/* write entries in ring */
- ENQUEUE_PTRS();
+ switch (elem_sz) {
+ case sizeof(void *): ENQUEUE_PTRS(); break;
+ case sizeof(rte_xmm_t): ENQUEUE_16B(); break;
+ }
rte_smp_wmb();
/* if we exceed the watermark */
@@ -537,8 +594,9 @@ __rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
* - n: Actual number of objects enqueued.
*/
static inline int __attribute__((always_inline))
-__rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_sp_do_enqueue(struct rte_ring *r, const void *obj_table,
+ unsigned n, unsigned int elem_sz,
+ enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, cons_tail;
uint32_t prod_next, free_entries;
@@ -575,7 +633,10 @@ __rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
r->prod.head = prod_next;
/* write entries in ring */
- ENQUEUE_PTRS();
+ switch (elem_sz) {
+ case sizeof(void *): ENQUEUE_PTRS(); break;
+ case sizeof(rte_xmm_t): ENQUEUE_16B(); break;
+ }
rte_smp_wmb();
/* if we exceed the watermark */
@@ -621,8 +682,9 @@ __rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
*/
static inline int __attribute__((always_inline))
-__rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_mc_do_dequeue(struct rte_ring *r, void *obj_table,
+ unsigned n, unsigned int elem_sz,
+ enum rte_ring_queue_behavior behavior)
{
uint32_t cons_head, prod_tail;
uint32_t cons_next, entries;
@@ -671,7 +733,10 @@ __rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
} while (unlikely(success == 0));
/* copy in table */
- DEQUEUE_PTRS();
+ switch (elem_sz) {
+ case sizeof(void *): DEQUEUE_PTRS(); break;
+ case sizeof(rte_xmm_t): DEQUEUE_16B(); break;
+ }
rte_smp_rmb();
/*
@@ -720,8 +785,9 @@ __rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
* - n: Actual number of objects dequeued.
*/
static inline int __attribute__((always_inline))
-__rte_common_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
- unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_sc_do_dequeue(struct rte_ring *r, void *obj_table,
+ unsigned n, unsigned int elem_sz,
+ enum rte_ring_queue_behavior behavior)
{
uint32_t cons_head, prod_tail;
uint32_t cons_next, entries;
@@ -755,7 +821,10 @@ __rte_common_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
r->cons.head = cons_next;
/* copy in table */
- DEQUEUE_PTRS();
+ switch (elem_sz) {
+ case sizeof(void *): DEQUEUE_PTRS(); break;
+ case sizeof(rte_xmm_t): DEQUEUE_16B(); break;
+ }
rte_smp_rmb();
__RING_STAT_ADD(r, deq_success, n);
@@ -36,14 +36,14 @@
ssize_t
rte_ring_get_memsize(unsigned count)
{
- return rte_common_ring_get_memsize(count);
+ return rte_common_ring_get_memsize(count, sizeof(void *));
}
int
rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
unsigned flags)
{
- return rte_common_ring_init(r, name, count, flags);
+ return rte_common_ring_init(r, name, count, flags, sizeof(void *));
}
@@ -51,7 +51,8 @@ struct rte_ring *
rte_ring_create(const char *name, unsigned count, int socket_id,
unsigned flags)
{
- return rte_common_ring_create(name, count, socket_id, flags);
+ return rte_common_ring_create(name, count, socket_id, flags,
+ sizeof(void *));
}
void
@@ -215,7 +215,7 @@ static inline int __attribute__((always_inline))
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
- return __rte_common_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_common_ring_mp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
}
/**
@@ -237,7 +237,7 @@ static inline int __attribute__((always_inline))
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
- return __rte_common_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_common_ring_sp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
}
/**
@@ -356,7 +356,7 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
static inline int __attribute__((always_inline))
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
- return __rte_common_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_common_ring_mc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
}
/**
@@ -377,7 +377,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
static inline int __attribute__((always_inline))
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
- return __rte_common_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+ return __rte_common_ring_sc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
}
/**
@@ -568,7 +568,7 @@ static inline unsigned __attribute__((always_inline))
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
- return __rte_common_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_common_ring_mp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
}
/**
@@ -587,7 +587,7 @@ static inline unsigned __attribute__((always_inline))
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
- return __rte_common_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_common_ring_sp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
}
/**
@@ -636,7 +636,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
static inline unsigned __attribute__((always_inline))
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
{
- return __rte_common_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_common_ring_mc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
}
/**
@@ -656,7 +656,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
static inline unsigned __attribute__((always_inline))
rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
{
- return __rte_common_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+ return __rte_common_ring_sc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
}
/**