[dpdk-dev,1/5] ring: allow rings with non power-of-2 sizes

Message ID 20170607133620.275801-2-bruce.richardson@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Bruce Richardson June 7, 2017, 1:36 p.m. UTC
  The rte_rings traditionally have only supported having ring sizes as powers
of 2, with the actual usable space being the size - 1. In some cases, for
example, with an eventdev where we want to precisely control queue depths
for latency, we need to allow ring sizes which are not powers of two so we
add in an additional ring capacity value to allow that. For existing rings,
this value will be size-1, i.e. the same as the mask, but if the new
EXACT_SZ flag is passed on ring creation, the ring will have exactly the
usable space requested, although the underlying memory size may be bigger.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_ring.c | 26 ++++++++++++--
 lib/librte_ring/rte_ring.h | 88 +++++++++++++++++++++++++++++-----------------
 2 files changed, 78 insertions(+), 36 deletions(-)
  

Comments

Olivier Matz June 30, 2017, 9:40 a.m. UTC | #1
Hi Bruce,

Few comments inline.

On Wed,  7 Jun 2017 14:36:16 +0100, Bruce Richardson <bruce.richardson@intel.com> wrote:
> The rte_rings traditionally have only supported having ring sizes as powers
> of 2, with the actual usable space being the size - 1. In some cases, for
> example, with an eventdev where we want to precisely control queue depths
> for latency, we need to allow ring sizes which are not powers of two so we
> add in an additional ring capacity value to allow that. For existing rings,
> this value will be size-1, i.e. the same as the mask, but if the new
> EXACT_SZ flag is passed on ring creation, the ring will have exactly the
> usable space requested, although the underlying memory size may be bigger.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>

Specifying the exact wanted size looks to be a better API than the current
one, which is to give the power of two above the wanted value, knowing there
will be only size-1 elements available.

What would you think about deprecating ring init/creation without this
flag in a few versions? Then, later, we could remove this flag and
the new behavior would become the default.


> ---
>  lib/librte_ring/rte_ring.c | 26 ++++++++++++--
>  lib/librte_ring/rte_ring.h | 88 +++++++++++++++++++++++++++++-----------------
>  2 files changed, 78 insertions(+), 36 deletions(-)
> 
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> index 5f98c33..b8047ee 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -140,8 +140,22 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
>  	r->flags = flags;
>  	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
>  	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> -	r->size = count;
> -	r->mask = count - 1;
> +
> +	if (flags & RING_F_EXACT_SZ) {
> +		r->size = rte_align32pow2(count + 1);
> +		r->mask = r->size - 1;
> +		r->capacity = count;
> +	} else {
> +		if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
> +			RTE_LOG(ERR, RING,
> +				"Requested size is invalid, must be power of 2, and not exceed the size limit %u\n",
> +				RTE_RING_SZ_MASK);
> +			return -EINVAL;
> +		}
> +		r->size = count;
> +		r->mask = count - 1;
> +		r->capacity = r->mask;
> +	}
>  	r->prod.head = r->cons.head = 0;
>  	r->prod.tail = r->cons.tail = 0;
>  
> @@ -160,10 +174,15 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
>  	ssize_t ring_size;
>  	int mz_flags = 0;
>  	struct rte_ring_list* ring_list = NULL;
> +	const unsigned int requested_count = count;
>  	int ret;
>  
>  	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
>  
> +	/* for an exact size ring, round up from count to a power of two */
> +	if (flags & RING_F_EXACT_SZ)
> +		count = rte_align32pow2(count + 1);
> +
>  	ring_size = rte_ring_get_memsize(count);
>  	if (ring_size < 0) {
>  		rte_errno = ring_size;
> @@ -194,7 +213,7 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
>  		r = mz->addr;
>  		/* no need to check return value here, we already checked the
>  		 * arguments above */
> -		rte_ring_init(r, name, count, flags);
> +		rte_ring_init(r, name, requested_count, flags);
>  
>  		te->data = (void *) r;
>  		r->memzone = mz;
> @@ -262,6 +281,7 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
>  	fprintf(f, "ring <%s>@%p\n", r->name, r);
>  	fprintf(f, "  flags=%x\n", r->flags);
>  	fprintf(f, "  size=%"PRIu32"\n", r->size);
> +	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
>  	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
>  	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
>  	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 97f025a..494d31f 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -153,6 +153,7 @@ struct rte_ring {
>  			/**< Memzone, if any, containing the rte_ring */
>  	uint32_t size;           /**< Size of ring. */
>  	uint32_t mask;           /**< Mask (size-1) of ring. */
> +	uint32_t capacity;       /**< Usable size of ring */
>  
>  	/** Ring producer status. */
>  	struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);
> @@ -163,6 +164,15 @@ struct rte_ring {
>  
>  #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
>  #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
> +/**
> + * Ring is to hold exactly requested number of entries.
> + * Without this flag set, the ring size requested must be a power of 2, and the
> + * usable space will be that size - 1. With the flag, the requested size will
> + * be rounded up to the next power of two, but the usable space will be exactly
> + * that requested. Worst case, if a power-of-2 size is requested, half the
> + * ring space will be wasted.
> + */
> +#define RING_F_EXACT_SZ 0x0004
>  #define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
>  
>  /* @internal defines for passing to the enqueue dequeue worker functions */
> @@ -389,7 +399,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>  		uint32_t *old_head, uint32_t *new_head,
>  		uint32_t *free_entries)
>  {
> -	const uint32_t mask = r->mask;
> +	const uint32_t capacity = r->capacity;
>  	unsigned int max = n;
>  	int success;
>  
> @@ -399,11 +409,13 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>  
>  		*old_head = r->prod.head;
>  		const uint32_t cons_tail = r->cons.tail;
> -		/* The subtraction is done between two unsigned 32bits value
> +		/*
> +		 *  The subtraction is done between two unsigned 32bits value
>  		 * (the result is always modulo 32 bits even if we have
>  		 * *old_head > cons_tail). So 'free_entries' is always between 0
> -		 * and size(ring)-1. */
> -		*free_entries = (mask + cons_tail - *old_head);
> +		 * and capacity (which is < size).
> +		 */
> +		*free_entries = (capacity + cons_tail - *old_head);
>  
>  		/* check that we have enough room in ring */
>  		if (unlikely(n > *free_entries))
> @@ -845,69 +857,63 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
>  }
>  
>  /**
> - * Test if a ring is full.
> + * Return the number of entries in a ring.
>   *
>   * @param r
>   *   A pointer to the ring structure.
>   * @return
> - *   - 1: The ring is full.
> - *   - 0: The ring is not full.
> + *   The number of entries in the ring.
>   */
> -static inline int
> -rte_ring_full(const struct rte_ring *r)
> +static inline unsigned
> +rte_ring_count(const struct rte_ring *r)
>  {
>  	uint32_t prod_tail = r->prod.tail;
>  	uint32_t cons_tail = r->cons.tail;
> -	return ((cons_tail - prod_tail - 1) & r->mask) == 0;
> +	return (prod_tail - cons_tail) & r->mask;
>  }

When used on a mc/mp ring, this function can now return a value
which is higher than the ring capacity. Even if this function is
not really accurate when used while the ring is use, I think we
should maximize the result with r->capacity.

This will also avoid rte_ring_free_count() to return a overflowed
value.


>  
>  /**
> - * Test if a ring is empty.
> + * Return the number of free entries in a ring.
>   *
>   * @param r
>   *   A pointer to the ring structure.
>   * @return
> - *   - 1: The ring is empty.
> - *   - 0: The ring is not empty.
> + *   The number of free entries in the ring.
>   */
> -static inline int
> -rte_ring_empty(const struct rte_ring *r)
> +static inline unsigned
> +rte_ring_free_count(const struct rte_ring *r)
>  {
> -	uint32_t prod_tail = r->prod.tail;
> -	uint32_t cons_tail = r->cons.tail;
> -	return !!(cons_tail == prod_tail);
> +	return r->capacity - rte_ring_count(r);
>  }
>  
>  /**
> - * Return the number of entries in a ring.
> + * Test if a ring is full.
>   *
>   * @param r
>   *   A pointer to the ring structure.
>   * @return
> - *   The number of entries in the ring.
> + *   - 1: The ring is full.
> + *   - 0: The ring is not full.
>   */
> -static inline unsigned
> -rte_ring_count(const struct rte_ring *r)
> +static inline int
> +rte_ring_full(const struct rte_ring *r)
>  {
> -	uint32_t prod_tail = r->prod.tail;
> -	uint32_t cons_tail = r->cons.tail;
> -	return (prod_tail - cons_tail) & r->mask;
> +	return rte_ring_free_count(r) == 0;
>  }
>  
>  /**
> - * Return the number of free entries in a ring.
> + * Test if a ring is empty.
>   *
>   * @param r
>   *   A pointer to the ring structure.
>   * @return
> - *   The number of free entries in the ring.
> + *   - 1: The ring is empty.
> + *   - 0: The ring is not empty.
>   */
> -static inline unsigned
> -rte_ring_free_count(const struct rte_ring *r)
> +static inline int
> +rte_ring_empty(const struct rte_ring *r)
>  {
> -	uint32_t prod_tail = r->prod.tail;
> -	uint32_t cons_tail = r->cons.tail;
> -	return (cons_tail - prod_tail - 1) & r->mask;
> +	return rte_ring_count(r) == 0;
>  }
>  
>  /**
> @@ -916,7 +922,9 @@ rte_ring_free_count(const struct rte_ring *r)
>   * @param r
>   *   A pointer to the ring structure.
>   * @return
> - *   The number of elements which can be stored in the ring.
> + *   The size of the data store used by the ring.
> + *   NOTE: this is not the same as the usable space in the ring. To query that
> + *   use ``rte_ring_get_capacity()``.
>   */
>  static inline unsigned int
>  rte_ring_get_size(const struct rte_ring *r)
> @@ -925,6 +933,20 @@ rte_ring_get_size(const struct rte_ring *r)
>  }
>  
>  /**
> + * Return the number of elements which can be stored in the ring.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @return
> + *   The usable size of the ring.
> + */
> +static inline unsigned int
> +rte_ring_get_capacity(const struct rte_ring *r)
> +{
> +	return r->capacity;
> +}
> +
> +/**
>   * Dump the status of all rings on the console
>   *
>   * @param f

I think the users of rte_ring_get_size() use this API to get the
number of elements that can fit in the ring. The patch changes the
semantic of this function (even if the result is the same when
EXACT_SZ is not passed). This is the same for ring->size.

Wouldn't it be better to rename all current occurences of "size" as
"storage_size", and introduce a new "size", which is what you call
"capacity".

I'm asking the question but I'm not that convinced myself...
nevertheless I find storage_size/size less confusing than
size/capacity.


Olivier
  
Bruce Richardson June 30, 2017, 11:32 a.m. UTC | #2
On Fri, Jun 30, 2017 at 11:40:46AM +0200, Olivier Matz wrote:
> Hi Bruce,
> 
> Few comments inline.
> 
> On Wed,  7 Jun 2017 14:36:16 +0100, Bruce Richardson <bruce.richardson@intel.com> wrote:
> > The rte_rings traditionally have only supported having ring sizes as powers
> > of 2, with the actual usable space being the size - 1. In some cases, for
> > example, with an eventdev where we want to precisely control queue depths
> > for latency, we need to allow ring sizes which are not powers of two so we
> > add in an additional ring capacity value to allow that. For existing rings,
> > this value will be size-1, i.e. the same as the mask, but if the new
> > EXACT_SZ flag is passed on ring creation, the ring will have exactly the
> > usable space requested, although the underlying memory size may be bigger.
> > 
> > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> 
> Specifying the exact wanted size looks to be a better API than the current
> one, which is to give the power of two above the wanted value, knowing there
> will be only size-1 elements available.
> 
> What would you think about deprecating ring init/creation without this
> flag in a few versions? Then, later, we could remove this flag and
> the new behavior would become the default.
>

I'm not really sure it's necessary. For the vast majority of cases what
we have now is fine, and it ensures that we maximise the ring space. I'd
tend to keep the new behaviour for those cases where we really need it.

It's a trade off between what we hide and expose:
* current scheme hides the fact that you get one entry less than you ask
  for, but the memory space is as expected.
* new scheme gives you exactly the entries you ask for, but hides the
  fact that you could be using up to double the memory space for the
  ring.

> 
> > ---
> >  lib/librte_ring/rte_ring.c | 26 ++++++++++++--
> >  lib/librte_ring/rte_ring.h | 88 +++++++++++++++++++++++++++++-----------------
> >  2 files changed, 78 insertions(+), 36 deletions(-)
> > 
> > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> > index 5f98c33..b8047ee 100644
> > --- a/lib/librte_ring/rte_ring.c
> > +++ b/lib/librte_ring/rte_ring.c
> > @@ -140,8 +140,22 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> >  	r->flags = flags;
> >  	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> >  	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> > -	r->size = count;
> > -	r->mask = count - 1;
> > +
> > +	if (flags & RING_F_EXACT_SZ) {
> > +		r->size = rte_align32pow2(count + 1);
> > +		r->mask = r->size - 1;
> > +		r->capacity = count;
> > +	} else {
> > +		if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
> > +			RTE_LOG(ERR, RING,
> > +				"Requested size is invalid, must be power of 2, and not exceed the size limit %u\n",
> > +				RTE_RING_SZ_MASK);
> > +			return -EINVAL;
> > +		}
> > +		r->size = count;
> > +		r->mask = count - 1;
> > +		r->capacity = r->mask;
> > +	}
> >  	r->prod.head = r->cons.head = 0;
> >  	r->prod.tail = r->cons.tail = 0;
> >  
> > @@ -160,10 +174,15 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
> >  	ssize_t ring_size;
> >  	int mz_flags = 0;
> >  	struct rte_ring_list* ring_list = NULL;
> > +	const unsigned int requested_count = count;
> >  	int ret;
> >  
> >  	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
> >  
> > +	/* for an exact size ring, round up from count to a power of two */
> > +	if (flags & RING_F_EXACT_SZ)
> > +		count = rte_align32pow2(count + 1);
> > +
> >  	ring_size = rte_ring_get_memsize(count);
> >  	if (ring_size < 0) {
> >  		rte_errno = ring_size;
> > @@ -194,7 +213,7 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
> >  		r = mz->addr;
> >  		/* no need to check return value here, we already checked the
> >  		 * arguments above */
> > -		rte_ring_init(r, name, count, flags);
> > +		rte_ring_init(r, name, requested_count, flags);
> >  
> >  		te->data = (void *) r;
> >  		r->memzone = mz;
> > @@ -262,6 +281,7 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
> >  	fprintf(f, "ring <%s>@%p\n", r->name, r);
> >  	fprintf(f, "  flags=%x\n", r->flags);
> >  	fprintf(f, "  size=%"PRIu32"\n", r->size);
> > +	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
> >  	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> >  	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> >  	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > index 97f025a..494d31f 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -153,6 +153,7 @@ struct rte_ring {
> >  			/**< Memzone, if any, containing the rte_ring */
> >  	uint32_t size;           /**< Size of ring. */
> >  	uint32_t mask;           /**< Mask (size-1) of ring. */
> > +	uint32_t capacity;       /**< Usable size of ring */
> >  
> >  	/** Ring producer status. */
> >  	struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);
> > @@ -163,6 +164,15 @@ struct rte_ring {
> >  
> >  #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
> >  #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
> > +/**
> > + * Ring is to hold exactly requested number of entries.
> > + * Without this flag set, the ring size requested must be a power of 2, and the
> > + * usable space will be that size - 1. With the flag, the requested size will
> > + * be rounded up to the next power of two, but the usable space will be exactly
> > + * that requested. Worst case, if a power-of-2 size is requested, half the
> > + * ring space will be wasted.
> > + */
> > +#define RING_F_EXACT_SZ 0x0004
> >  #define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
> >  
> >  /* @internal defines for passing to the enqueue dequeue worker functions */
> > @@ -389,7 +399,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> >  		uint32_t *old_head, uint32_t *new_head,
> >  		uint32_t *free_entries)
> >  {
> > -	const uint32_t mask = r->mask;
> > +	const uint32_t capacity = r->capacity;
> >  	unsigned int max = n;
> >  	int success;
> >  
> > @@ -399,11 +409,13 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> >  
> >  		*old_head = r->prod.head;
> >  		const uint32_t cons_tail = r->cons.tail;
> > -		/* The subtraction is done between two unsigned 32bits value
> > +		/*
> > +		 *  The subtraction is done between two unsigned 32bits value
> >  		 * (the result is always modulo 32 bits even if we have
> >  		 * *old_head > cons_tail). So 'free_entries' is always between 0
> > -		 * and size(ring)-1. */
> > -		*free_entries = (mask + cons_tail - *old_head);
> > +		 * and capacity (which is < size).
> > +		 */
> > +		*free_entries = (capacity + cons_tail - *old_head);
> >  
> >  		/* check that we have enough room in ring */
> >  		if (unlikely(n > *free_entries))
> > @@ -845,69 +857,63 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
> >  }
> >  
> >  /**
> > - * Test if a ring is full.
> > + * Return the number of entries in a ring.
> >   *
> >   * @param r
> >   *   A pointer to the ring structure.
> >   * @return
> > - *   - 1: The ring is full.
> > - *   - 0: The ring is not full.
> > + *   The number of entries in the ring.
> >   */
> > -static inline int
> > -rte_ring_full(const struct rte_ring *r)
> > +static inline unsigned
> > +rte_ring_count(const struct rte_ring *r)
> >  {
> >  	uint32_t prod_tail = r->prod.tail;
> >  	uint32_t cons_tail = r->cons.tail;
> > -	return ((cons_tail - prod_tail - 1) & r->mask) == 0;
> > +	return (prod_tail - cons_tail) & r->mask;
> >  }
> 
> When used on a mc/mp ring, this function can now return a value
> which is higher than the ring capacity. Even if this function is
> not really accurate when used while the ring is use, I think we
> should maximize the result with r->capacity.
> 
> This will also avoid rte_ring_free_count() to return a overflowed
> value.
> 

How does it return an overflowing value? I think in the MP/MC case the
tests made each time around the loop for cmpset should ensure we never
overflow.

> 
> >  
> >  /**
> > - * Test if a ring is empty.
> > + * Return the number of free entries in a ring.
> >   *
> >   * @param r
> >   *   A pointer to the ring structure.
> >   * @return
> > - *   - 1: The ring is empty.
> > - *   - 0: The ring is not empty.
> > + *   The number of free entries in the ring.
> >   */
> > -static inline int
> > -rte_ring_empty(const struct rte_ring *r)
> > +static inline unsigned
> > +rte_ring_free_count(const struct rte_ring *r)
> >  {
> > -	uint32_t prod_tail = r->prod.tail;
> > -	uint32_t cons_tail = r->cons.tail;
> > -	return !!(cons_tail == prod_tail);
> > +	return r->capacity - rte_ring_count(r);
> >  }
> >  
> >  /**
> > - * Return the number of entries in a ring.
> > + * Test if a ring is full.
> >   *
> >   * @param r
> >   *   A pointer to the ring structure.
> >   * @return
> > - *   The number of entries in the ring.
> > + *   - 1: The ring is full.
> > + *   - 0: The ring is not full.
> >   */
> > -static inline unsigned
> > -rte_ring_count(const struct rte_ring *r)
> > +static inline int
> > +rte_ring_full(const struct rte_ring *r)
> >  {
> > -	uint32_t prod_tail = r->prod.tail;
> > -	uint32_t cons_tail = r->cons.tail;
> > -	return (prod_tail - cons_tail) & r->mask;
> > +	return rte_ring_free_count(r) == 0;
> >  }
> >  
> >  /**
> > - * Return the number of free entries in a ring.
> > + * Test if a ring is empty.
> >   *
> >   * @param r
> >   *   A pointer to the ring structure.
> >   * @return
> > - *   The number of free entries in the ring.
> > + *   - 1: The ring is empty.
> > + *   - 0: The ring is not empty.
> >   */
> > -static inline unsigned
> > -rte_ring_free_count(const struct rte_ring *r)
> > +static inline int
> > +rte_ring_empty(const struct rte_ring *r)
> >  {
> > -	uint32_t prod_tail = r->prod.tail;
> > -	uint32_t cons_tail = r->cons.tail;
> > -	return (cons_tail - prod_tail - 1) & r->mask;
> > +	return rte_ring_count(r) == 0;
> >  }
> >  
> >  /**
> > @@ -916,7 +922,9 @@ rte_ring_free_count(const struct rte_ring *r)
> >   * @param r
> >   *   A pointer to the ring structure.
> >   * @return
> > - *   The number of elements which can be stored in the ring.
> > + *   The size of the data store used by the ring.
> > + *   NOTE: this is not the same as the usable space in the ring. To query that
> > + *   use ``rte_ring_get_capacity()``.
> >   */
> >  static inline unsigned int
> >  rte_ring_get_size(const struct rte_ring *r)
> > @@ -925,6 +933,20 @@ rte_ring_get_size(const struct rte_ring *r)
> >  }
> >  
> >  /**
> > + * Return the number of elements which can be stored in the ring.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @return
> > + *   The usable size of the ring.
> > + */
> > +static inline unsigned int
> > +rte_ring_get_capacity(const struct rte_ring *r)
> > +{
> > +	return r->capacity;
> > +}
> > +
> > +/**
> >   * Dump the status of all rings on the console
> >   *
> >   * @param f
> 
> I think the users of rte_ring_get_size() use this API to get the
> number of elements that can fit in the ring. The patch changes the
> semantic of this function (even if the result is the same when
> EXACT_SZ is not passed). This is the same for ring->size.
> 
> Wouldn't it be better to rename all current occurences of "size" as
> "storage_size", and introduce a new "size", which is what you call
> "capacity".
> 
> I'm asking the question but I'm not that convinced myself...
> nevertheless I find storage_size/size less confusing than
> size/capacity.
> 
I went back and forward on this myself a bit too. However, the reason I
went for this approach is purely one of backwards compatibility. For
legacy apps unaware of the new apps, there is no change in behaviour or
return value for the size function. For apps that are aware of this
change there is the new function provided.
If we did change size to be storage size, then you could end up getting
off-by-one errors in apps that haven't been updated.

Regards,
/Bruce
  
Olivier Matz June 30, 2017, 12:24 p.m. UTC | #3
On Fri, 30 Jun 2017 12:32:27 +0100, Bruce Richardson
<bruce.richardson@intel.com> wrote:
> On Fri, Jun 30, 2017 at 11:40:46AM +0200, Olivier Matz wrote:
> > Hi Bruce,
> > 
> > Few comments inline.
> > 
> > On Wed,  7 Jun 2017 14:36:16 +0100, Bruce Richardson <bruce.richardson@intel.com> wrote:  
> > > The rte_rings traditionally have only supported having ring sizes as powers
> > > of 2, with the actual usable space being the size - 1. In some cases, for
> > > example, with an eventdev where we want to precisely control queue depths
> > > for latency, we need to allow ring sizes which are not powers of two so we
> > > add in an additional ring capacity value to allow that. For existing rings,
> > > this value will be size-1, i.e. the same as the mask, but if the new
> > > EXACT_SZ flag is passed on ring creation, the ring will have exactly the
> > > usable space requested, although the underlying memory size may be bigger.
> > > 
> > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>  
> > 
> > Specifying the exact wanted size looks to be a better API than the current
> > one, which is to give the power of two above the wanted value, knowing there
> > will be only size-1 elements available.
> > 
> > What would you think about deprecating ring init/creation without this
> > flag in a few versions? Then, later, we could remove this flag and
> > the new behavior would become the default.
> >  
> 
> I'm not really sure it's necessary. For the vast majority of cases what
> we have now is fine, and it ensures that we maximise the ring space. I'd
> tend to keep the new behaviour for those cases where we really need it.
> 
> It's a trade off between what we hide and expose:
> * current scheme hides the fact that you get one entry less than you ask
>   for, but the memory space is as expected.
> * new scheme gives you exactly the entries you ask for, but hides the
>   fact that you could be using up to double the memory space for the
>   ring.

Yes, having 2 different behavior is precisely what I don't really like.
Giving the number of entries the user asks for is straightforward, which
is a good thing for an API. And hiding the extra consumed memory looks
acceptable, this can be documented.

That said, if we decide to deprecate it, there's no need to do it right
now.


> > > @@ -845,69 +857,63 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
> > >  }
> > >  
> > >  /**
> > > - * Test if a ring is full.
> > > + * Return the number of entries in a ring.
> > >   *
> > >   * @param r
> > >   *   A pointer to the ring structure.
> > >   * @return
> > > - *   - 1: The ring is full.
> > > - *   - 0: The ring is not full.
> > > + *   The number of entries in the ring.
> > >   */
> > > -static inline int
> > > -rte_ring_full(const struct rte_ring *r)
> > > +static inline unsigned
> > > +rte_ring_count(const struct rte_ring *r)
> > >  {
> > >  	uint32_t prod_tail = r->prod.tail;
> > >  	uint32_t cons_tail = r->cons.tail;
> > > -	return ((cons_tail - prod_tail - 1) & r->mask) == 0;
> > > +	return (prod_tail - cons_tail) & r->mask;
> > >  }  
> > 
> > When used on a mc/mp ring, this function can now return a value
> > which is higher than the ring capacity. Even if this function is
> > not really accurate when used while the ring is use, I think we
> > should maximize the result with r->capacity.
> > 
> > This will also avoid rte_ring_free_count() to return a overflowed
> > value.
> >   
> 
> How does it return an overflowing value? I think in the MP/MC case the
> tests made each time around the loop for cmpset should ensure we never
> overflow.

Here we are reading r->prod.tail and r->cons.tail without
synchronization, nor memory barriers. So basically, there is no
guarantee that the values are consistent.

Before, the returned value could be wrong, but always in the
interval [0, mask].

Now, rte_ring_count() is still in the interval [0, mask], but the
capacity of the ring is lower than mask. If rte_ring_count() returns
mask, it would cause rte_ring_free_count() to return a value lower than
0 (overflowed since the result is unsigned).


> > > @@ -916,7 +922,9 @@ rte_ring_free_count(const struct rte_ring *r)
> > >   * @param r
> > >   *   A pointer to the ring structure.
> > >   * @return
> > > - *   The number of elements which can be stored in the ring.
> > > + *   The size of the data store used by the ring.
> > > + *   NOTE: this is not the same as the usable space in the ring. To query that
> > > + *   use ``rte_ring_get_capacity()``.
> > >   */
> > >  static inline unsigned int
> > >  rte_ring_get_size(const struct rte_ring *r)
> > > @@ -925,6 +933,20 @@ rte_ring_get_size(const struct rte_ring *r)
> > >  }
> > >  
> > >  /**
> > > + * Return the number of elements which can be stored in the ring.
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @return
> > > + *   The usable size of the ring.
> > > + */
> > > +static inline unsigned int
> > > +rte_ring_get_capacity(const struct rte_ring *r)
> > > +{
> > > +	return r->capacity;
> > > +}
> > > +
> > > +/**
> > >   * Dump the status of all rings on the console
> > >   *
> > >   * @param f  
> > 
> > I think the users of rte_ring_get_size() use this API to get the
> > number of elements that can fit in the ring. The patch changes the
> > semantic of this function (even if the result is the same when
> > EXACT_SZ is not passed). This is the same for ring->size.
> > 
> > Wouldn't it be better to rename all current occurences of "size" as
> > "storage_size", and introduce a new "size", which is what you call
> > "capacity".
> > 
> > I'm asking the question but I'm not that convinced myself...
> > nevertheless I find storage_size/size less confusing than
> > size/capacity.
> >   
> I went back and forward on this myself a bit too. However, the reason I
> went for this approach is purely one of backwards compatibility. For
> legacy apps unaware of the new apps, there is no change in behaviour or
> return value for the size function. For apps that are aware of this
> change there is the new function provided.
> If we did change size to be storage size, then you could end up getting
> off-by-one errors in apps that haven't been updated.

OK.


Olivier
  
Bruce Richardson June 30, 2017, 1:59 p.m. UTC | #4
On Fri, Jun 30, 2017 at 02:24:38PM +0200, Olivier Matz wrote:
> On Fri, 30 Jun 2017 12:32:27 +0100, Bruce Richardson
> <bruce.richardson@intel.com> wrote:
> > On Fri, Jun 30, 2017 at 11:40:46AM +0200, Olivier Matz wrote:
> > > Hi Bruce,
> > > 
> > > Few comments inline.
> > > 
> > > On Wed,  7 Jun 2017 14:36:16 +0100, Bruce Richardson <bruce.richardson@intel.com> wrote:  
> > > > The rte_rings traditionally have only supported having ring sizes as powers
> > > > of 2, with the actual usable space being the size - 1. In some cases, for
> > > > example, with an eventdev where we want to precisely control queue depths
> > > > for latency, we need to allow ring sizes which are not powers of two so we
> > > > add in an additional ring capacity value to allow that. For existing rings,
> > > > this value will be size-1, i.e. the same as the mask, but if the new
> > > > EXACT_SZ flag is passed on ring creation, the ring will have exactly the
> > > > usable space requested, although the underlying memory size may be bigger.
> > > > 
> > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>  
> > > 
> > > Specifying the exact wanted size looks to be a better API than the current
> > > one, which is to give the power of two above the wanted value, knowing there
> > > will be only size-1 elements available.
> > > 
> > > What would you think about deprecating ring init/creation without this
> > > flag in a few versions? Then, later, we could remove this flag and
> > > the new behavior would become the default.
> > >  
> > 
> > I'm not really sure it's necessary. For the vast majority of cases what
> > we have now is fine, and it ensures that we maximise the ring space. I'd
> > tend to keep the new behaviour for those cases where we really need it.
> > 
> > It's a trade off between what we hide and expose:
> > * current scheme hides the fact that you get one entry less than you ask
> >   for, but the memory space is as expected.
> > * new scheme gives you exactly the entries you ask for, but hides the
> >   fact that you could be using up to double the memory space for the
> >   ring.
> 
> Yes, having 2 different behavior is precisely what I don't really like.
> Giving the number of entries the user asks for is straightforward, which
> is a good thing for an API. And hiding the extra consumed memory looks
> acceptable, this can be documented.
> 
> That said, if we decide to deprecate it, there's no need to do it right
> now.
> 
> 
> > > > @@ -845,69 +857,63 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
> > > >  }
> > > >  
> > > >  /**
> > > > - * Test if a ring is full.
> > > > + * Return the number of entries in a ring.
> > > >   *
> > > >   * @param r
> > > >   *   A pointer to the ring structure.
> > > >   * @return
> > > > - *   - 1: The ring is full.
> > > > - *   - 0: The ring is not full.
> > > > + *   The number of entries in the ring.
> > > >   */
> > > > -static inline int
> > > > -rte_ring_full(const struct rte_ring *r)
> > > > +static inline unsigned
> > > > +rte_ring_count(const struct rte_ring *r)
> > > >  {
> > > >  	uint32_t prod_tail = r->prod.tail;
> > > >  	uint32_t cons_tail = r->cons.tail;
> > > > -	return ((cons_tail - prod_tail - 1) & r->mask) == 0;
> > > > +	return (prod_tail - cons_tail) & r->mask;
> > > >  }  
> > > 
> > > When used on a mc/mp ring, this function can now return a value
> > > which is higher than the ring capacity. Even if this function is
> > > not really accurate when used while the ring is use, I think we
> > > should maximize the result with r->capacity.
> > > 
> > > This will also avoid rte_ring_free_count() to return a overflowed
> > > value.
> > >   
> > 
> > How does it return an overflowing value? I think in the MP/MC case the
> > tests made each time around the loop for cmpset should ensure we never
> > overflow.
> 
> Here we are reading r->prod.tail and r->cons.tail without
> synchronization, nor memory barriers. So basically, there is no
> guarantee that the values are consistent.
> 
> Before, the returned value could be wrong, but always in the
> interval [0, mask].
> 
> Now, rte_ring_count() is still in the interval [0, mask], but the
> capacity of the ring is lower than mask. If rte_ring_count() returns
> mask, it would cause rte_ring_free_count() to return a value lower than
> 0 (overflowed since the result is unsigned).
> 
> 
I'm still not convinced that this can actually happen, but just in case,
I'll add in the extra check in V2.

/Bruce
  

Patch

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 5f98c33..b8047ee 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -140,8 +140,22 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	r->flags = flags;
 	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
 	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
-	r->size = count;
-	r->mask = count - 1;
+
+	if (flags & RING_F_EXACT_SZ) {
+		r->size = rte_align32pow2(count + 1);
+		r->mask = r->size - 1;
+		r->capacity = count;
+	} else {
+		if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
+			RTE_LOG(ERR, RING,
+				"Requested size is invalid, must be power of 2, and not exceed the size limit %u\n",
+				RTE_RING_SZ_MASK);
+			return -EINVAL;
+		}
+		r->size = count;
+		r->mask = count - 1;
+		r->capacity = r->mask;
+	}
 	r->prod.head = r->cons.head = 0;
 	r->prod.tail = r->cons.tail = 0;
 
@@ -160,10 +174,15 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 	ssize_t ring_size;
 	int mz_flags = 0;
 	struct rte_ring_list* ring_list = NULL;
+	const unsigned int requested_count = count;
 	int ret;
 
 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
 
+	/* for an exact size ring, round up from count to a power of two */
+	if (flags & RING_F_EXACT_SZ)
+		count = rte_align32pow2(count + 1);
+
 	ring_size = rte_ring_get_memsize(count);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
@@ -194,7 +213,7 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 		r = mz->addr;
 		/* no need to check return value here, we already checked the
 		 * arguments above */
-		rte_ring_init(r, name, count, flags);
+		rte_ring_init(r, name, requested_count, flags);
 
 		te->data = (void *) r;
 		r->memzone = mz;
@@ -262,6 +281,7 @@  rte_ring_dump(FILE *f, const struct rte_ring *r)
 	fprintf(f, "ring <%s>@%p\n", r->name, r);
 	fprintf(f, "  flags=%x\n", r->flags);
 	fprintf(f, "  size=%"PRIu32"\n", r->size);
+	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
 	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
 	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
 	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 97f025a..494d31f 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -153,6 +153,7 @@  struct rte_ring {
 			/**< Memzone, if any, containing the rte_ring */
 	uint32_t size;           /**< Size of ring. */
 	uint32_t mask;           /**< Mask (size-1) of ring. */
+	uint32_t capacity;       /**< Usable size of ring */
 
 	/** Ring producer status. */
 	struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);
@@ -163,6 +164,15 @@  struct rte_ring {
 
 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
 #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+/**
+ * Ring is to hold exactly requested number of entries.
+ * Without this flag set, the ring size requested must be a power of 2, and the
+ * usable space will be that size - 1. With the flag, the requested size will
+ * be rounded up to the next power of two, but the usable space will be exactly
+ * that requested. Worst case, if a power-of-2 size is requested, half the
+ * ring space will be wasted.
+ */
+#define RING_F_EXACT_SZ 0x0004
 #define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
 
 /* @internal defines for passing to the enqueue dequeue worker functions */
@@ -389,7 +399,7 @@  __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
 		uint32_t *old_head, uint32_t *new_head,
 		uint32_t *free_entries)
 {
-	const uint32_t mask = r->mask;
+	const uint32_t capacity = r->capacity;
 	unsigned int max = n;
 	int success;
 
@@ -399,11 +409,13 @@  __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
 
 		*old_head = r->prod.head;
 		const uint32_t cons_tail = r->cons.tail;
-		/* The subtraction is done between two unsigned 32bits value
+		/*
+		 *  The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
-		 * and size(ring)-1. */
-		*free_entries = (mask + cons_tail - *old_head);
+		 * and capacity (which is < size).
+		 */
+		*free_entries = (capacity + cons_tail - *old_head);
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *free_entries))
@@ -845,69 +857,63 @@  rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 }
 
 /**
- * Test if a ring is full.
+ * Return the number of entries in a ring.
  *
  * @param r
  *   A pointer to the ring structure.
  * @return
- *   - 1: The ring is full.
- *   - 0: The ring is not full.
+ *   The number of entries in the ring.
  */
-static inline int
-rte_ring_full(const struct rte_ring *r)
+static inline unsigned
+rte_ring_count(const struct rte_ring *r)
 {
 	uint32_t prod_tail = r->prod.tail;
 	uint32_t cons_tail = r->cons.tail;
-	return ((cons_tail - prod_tail - 1) & r->mask) == 0;
+	return (prod_tail - cons_tail) & r->mask;
 }
 
 /**
- * Test if a ring is empty.
+ * Return the number of free entries in a ring.
  *
  * @param r
  *   A pointer to the ring structure.
  * @return
- *   - 1: The ring is empty.
- *   - 0: The ring is not empty.
+ *   The number of free entries in the ring.
  */
-static inline int
-rte_ring_empty(const struct rte_ring *r)
+static inline unsigned
+rte_ring_free_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return !!(cons_tail == prod_tail);
+	return r->capacity - rte_ring_count(r);
 }
 
 /**
- * Return the number of entries in a ring.
+ * Test if a ring is full.
  *
  * @param r
  *   A pointer to the ring structure.
  * @return
- *   The number of entries in the ring.
+ *   - 1: The ring is full.
+ *   - 0: The ring is not full.
  */
-static inline unsigned
-rte_ring_count(const struct rte_ring *r)
+static inline int
+rte_ring_full(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return (prod_tail - cons_tail) & r->mask;
+	return rte_ring_free_count(r) == 0;
 }
 
 /**
- * Return the number of free entries in a ring.
+ * Test if a ring is empty.
  *
  * @param r
  *   A pointer to the ring structure.
  * @return
- *   The number of free entries in the ring.
+ *   - 1: The ring is empty.
+ *   - 0: The ring is not empty.
  */
-static inline unsigned
-rte_ring_free_count(const struct rte_ring *r)
+static inline int
+rte_ring_empty(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	return (cons_tail - prod_tail - 1) & r->mask;
+	return rte_ring_count(r) == 0;
 }
 
 /**
@@ -916,7 +922,9 @@  rte_ring_free_count(const struct rte_ring *r)
  * @param r
  *   A pointer to the ring structure.
  * @return
- *   The number of elements which can be stored in the ring.
+ *   The size of the data store used by the ring.
+ *   NOTE: this is not the same as the usable space in the ring. To query that
+ *   use ``rte_ring_get_capacity()``.
  */
 static inline unsigned int
 rte_ring_get_size(const struct rte_ring *r)
@@ -925,6 +933,20 @@  rte_ring_get_size(const struct rte_ring *r)
 }
 
 /**
+ * Return the number of elements which can be stored in the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   The usable size of the ring.
+ */
+static inline unsigned int
+rte_ring_get_capacity(const struct rte_ring *r)
+{
+	return r->capacity;
+}
+
+/**
  * Dump the status of all rings on the console
  *
  * @param f