[v3,2/5] ring: add a non-blocking implementation

Message ID 20190118152326.22686-3-gage.eads@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series Add non-blocking ring |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Eads, Gage Jan. 18, 2019, 3:23 p.m. UTC
  This commit adds support for non-blocking circular ring enqueue and dequeue
functions. The ring uses a 128-bit compare-and-swap instruction, and thus
is currently limited to x86_64.

The algorithm is based on the original rte ring (derived from FreeBSD's
bufring.h) and inspired by Michael and Scott's non-blocking concurrent
queue. Importantly, it adds a modification counter to each ring entry to
ensure only one thread can write to an unused entry.

-----
Algorithm:

Multi-producer non-blocking enqueue:
1. Move the producer head index 'n' locations forward, effectively
   reserving 'n' locations.
2. For each pointer:
 a. Read the producer tail index, then ring[tail]. If ring[tail]'s
    modification counter isn't 'tail', retry.
 b. Construct the new entry: {pointer, tail + ring size}
 c. Compare-and-swap the old entry with the new. If unsuccessful, the
    next loop iteration will try to enqueue this pointer again.
 d. Compare-and-swap the tail index with 'tail + 1', whether or not step 2c
    succeeded. This guarantees threads can make forward progress.

Multi-consumer non-blocking dequeue:
1. Move the consumer head index 'n' locations forward, effectively
   reserving 'n' pointers to be dequeued.
2. Copy 'n' pointers into the caller's object table (ignoring the
   modification counter), starting from ring[tail], then compare-and-swap
   the tail index with 'tail + n'.  If unsuccessful, repeat step 2.

-----
Discussion:

There are two cases where the ABA problem is mitigated:
1. Enqueueing a pointer to the ring: without a modification counter
   tied to the tail index, the index could become stale by the time the
   enqueue happens, causing it to overwrite valid data. Tying the
   counter to the tail index gives us an expected value (as opposed to,
   say, a monotonically incrementing counter).

   Since the counter will eventually wrap, there is potential for the ABA
   problem. However, using a 64-bit counter makes this likelihood
   effectively zero.

2. Updating a tail index: the ABA problem can occur if the thread is
   preempted and the tail index wraps around. However, using 64-bit indexes
   makes this likelihood effectively zero.

With no contention, an enqueue of n pointers uses (1 + 2n) CAS operations
and a dequeue of n pointers uses 2. This algorithm has worse average-case
performance than the regular rte ring (particularly a highly-contended ring
with large bulk accesses), however:
- For applications with preemptible pthreads, the regular rte ring's
  worst-case performance (i.e. one thread being preempted in the
  update_tail() critical section) is much worse than the non-blocking
  ring's.
- Software caching can mitigate the average case performance for ring-based
  algorithms. For example, a non-blocking ring based mempool (a likely use
  case for this ring) with per-thread caching.

The non-blocking ring is enabled via a new flag, RING_F_NB. Because the
ring's memsize is now a function of its flags (the non-blocking ring
requires 128b for each entry), this commit adds a new argument ('flags') to
rte_ring_get_memsize(). An API deprecation notice will be sent in a
separate commit.

For ease-of-use, existing ring enqueue and dequeue functions work on both
regular and non-blocking rings. This introduces an additional branch in
the datapath, but this should be a highly predictable branch.
ring_perf_autotest shows a negligible performance impact; it's hard to
distinguish a real difference versus system noise.

                                  | ring_perf_autotest cycles with branch -
             Test                 |   ring_perf_autotest cycles without
------------------------------------------------------------------
SP/SC single enq/dequeue          | 0.33
MP/MC single enq/dequeue          | -4.00
SP/SC burst enq/dequeue (size 8)  | 0.00
MP/MC burst enq/dequeue (size 8)  | 0.00
SP/SC burst enq/dequeue (size 32) | 0.00
MP/MC burst enq/dequeue (size 32) | 0.00
SC empty dequeue                  | 1.00
MC empty dequeue                  | 0.00

Single lcore:
SP/SC bulk enq/dequeue (size 8)   | 0.49
MP/MC bulk enq/dequeue (size 8)   | 0.08
SP/SC bulk enq/dequeue (size 32)  | 0.07
MP/MC bulk enq/dequeue (size 32)  | 0.09

Two physical cores:
SP/SC bulk enq/dequeue (size 8)   | 0.19
MP/MC bulk enq/dequeue (size 8)   | -0.37
SP/SC bulk enq/dequeue (size 32)  | 0.09
MP/MC bulk enq/dequeue (size 32)  | -0.05

Two NUMA nodes:
SP/SC bulk enq/dequeue (size 8)   | -1.96
MP/MC bulk enq/dequeue (size 8)   | 0.88
SP/SC bulk enq/dequeue (size 32)  | 0.10
MP/MC bulk enq/dequeue (size 32)  | 0.46

Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. Each test run three
times and the results averaged.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 lib/librte_ring/rte_ring.c           |  72 ++++-
 lib/librte_ring/rte_ring.h           | 550 +++++++++++++++++++++++++++++++++--
 lib/librte_ring/rte_ring_version.map |   7 +
 3 files changed, 587 insertions(+), 42 deletions(-)
  

Comments

Ola Liljedahl Jan. 22, 2019, 10:12 a.m. UTC | #1
On Fri, 2019-01-18 at 09:23 -0600, Gage Eads wrote:
> This commit adds support for non-blocking circular ring enqueue and
> dequeue
> functions. The ring uses a 128-bit compare-and-swap instruction, and
> thus
> is currently limited to x86_64.
>
> The algorithm is based on the original rte ring (derived from
> FreeBSD's
> bufring.h) and inspired by Michael and Scott's non-blocking
> concurrent
> queue. Importantly, it adds a modification counter to each ring entry
> to
> ensure only one thread can write to an unused entry.
> -----
> Algorithm:
>
> Multi-producer non-blocking enqueue:
> 1. Move the producer head index 'n' locations forward, effectively
>    reserving 'n' locations.
> 2. For each pointer:
>  a. Read the producer tail index, then ring[tail]. If ring[tail]'s
>     modification counter isn't 'tail', retry.
>  b. Construct the new entry: {pointer, tail + ring size}
>  c. Compare-and-swap the old entry with the new. If unsuccessful, the
>     next loop iteration will try to enqueue this pointer again.
>  d. Compare-and-swap the tail index with 'tail + 1', whether or not
> step 2c
>     succeeded. This guarantees threads can make forward progress.
>
> Multi-consumer non-blocking dequeue:
> 1. Move the consumer head index 'n' locations forward, effectively
>    reserving 'n' pointers to be dequeued.
> 2. Copy 'n' pointers into the caller's object table (ignoring the
>    modification counter), starting from ring[tail], then compare-and-
> swap
>    the tail index with 'tail + n'.  If unsuccessful, repeat step 2.
>
> -----
> Discussion:
>
> There are two cases where the ABA problem is mitigated:
> 1. Enqueueing a pointer to the ring: without a modification counter
>    tied to the tail index, the index could become stale by the time
> the
>    enqueue happens, causing it to overwrite valid data. Tying the
>    counter to the tail index gives us an expected value (as opposed
> to,
>    say, a monotonically incrementing counter).
>
>    Since the counter will eventually wrap, there is potential for the
> ABA
>    problem. However, using a 64-bit counter makes this likelihood
>    effectively zero.
>
> 2. Updating a tail index: the ABA problem can occur if the thread is
>    preempted and the tail index wraps around. However, using 64-bit
> indexes
>    makes this likelihood effectively zero.
>
> With no contention, an enqueue of n pointers uses (1 + 2n) CAS
> operations
> and a dequeue of n pointers uses 2. This algorithm has worse average-
> case
> performance than the regular rte ring (particularly a highly-
> contended ring
> with large bulk accesses), however:
> - For applications with preemptible pthreads, the regular rte ring's
>   worst-case performance (i.e. one thread being preempted in the
>   update_tail() critical section) is much worse than the non-blocking
>   ring's.
> - Software caching can mitigate the average case performance for
> ring-based
>   algorithms. For example, a non-blocking ring based mempool (a
> likely use
>   case for this ring) with per-thread caching.
>
> The non-blocking ring is enabled via a new flag, RING_F_NB. Because
> the
> ring's memsize is now a function of its flags (the non-blocking ring
> requires 128b for each entry), this commit adds a new argument
> ('flags') to
> rte_ring_get_memsize(). An API deprecation notice will be sent in a
> separate commit.
>
> For ease-of-use, existing ring enqueue and dequeue functions work on
> both
> regular and non-blocking rings. This introduces an additional branch
> in
> the datapath, but this should be a highly predictable branch.
> ring_perf_autotest shows a negligible performance impact; it's hard
> to
> distinguish a real difference versus system noise.
>
>                                   | ring_perf_autotest cycles with
> branch -
>              Test                 |   ring_perf_autotest cycles
> without
> ------------------------------------------------------------------
> SP/SC single enq/dequeue          | 0.33
> MP/MC single enq/dequeue          | -4.00
> SP/SC burst enq/dequeue (size 8)  | 0.00
> MP/MC burst enq/dequeue (size 8)  | 0.00
> SP/SC burst enq/dequeue (size 32) | 0.00
> MP/MC burst enq/dequeue (size 32) | 0.00
> SC empty dequeue                  | 1.00
> MC empty dequeue                  | 0.00
>
> Single lcore:
> SP/SC bulk enq/dequeue (size 8)   | 0.49
> MP/MC bulk enq/dequeue (size 8)   | 0.08
> SP/SC bulk enq/dequeue (size 32)  | 0.07
> MP/MC bulk enq/dequeue (size 32)  | 0.09
>
> Two physical cores:
> SP/SC bulk enq/dequeue (size 8)   | 0.19
> MP/MC bulk enq/dequeue (size 8)   | -0.37
> SP/SC bulk enq/dequeue (size 32)  | 0.09
> MP/MC bulk enq/dequeue (size 32)  | -0.05
>
> Two NUMA nodes:
> SP/SC bulk enq/dequeue (size 8)   | -1.96
> MP/MC bulk enq/dequeue (size 8)   | 0.88
> SP/SC bulk enq/dequeue (size 32)  | 0.10
> MP/MC bulk enq/dequeue (size 32)  | 0.46
>
> Test setup: x86_64 build with default config, dual-socket Xeon E5-
> 2699 v4,
> running on isolcpus cores with a tickless scheduler. Each test run
> three
> times and the results averaged.
>
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> ---
>  lib/librte_ring/rte_ring.c           |  72 ++++-
>  lib/librte_ring/rte_ring.h           | 550
> +++++++++++++++++++++++++++++++++--
>  lib/librte_ring/rte_ring_version.map |   7 +
>  3 files changed, 587 insertions(+), 42 deletions(-)
>
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> index d215acecc..f3378dccd 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>
>  /* return the size of memory occupied by a ring */
>  ssize_t
> -rte_ring_get_memsize(unsigned count)
> +rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
>  {
> -ssize_t sz;
> +ssize_t sz, elt_sz;
>
>  /* count must be a power of 2 */
>  if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
>  return -EINVAL;
>  }
>
> -sz = sizeof(struct rte_ring) + count * sizeof(void *);
> +elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) :
> sizeof(void *);
> +
> +sz = sizeof(struct rte_ring) + count * elt_sz;
>  sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
>  return sz;
>  }
> +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
> +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
> +       unsigned int flags),
> +  rte_ring_get_memsize_v1905);
> +
> +ssize_t
> +rte_ring_get_memsize_v20(unsigned int count)
> +{
> +return rte_ring_get_memsize_v1905(count, 0);
> +}
> +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
>
>  int
>  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> @@ -82,8 +95,6 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  if (ret < 0 || ret >= (int)sizeof(r->name))
>  return -ENAMETOOLONG;
>  r->flags = flags;
> -r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP :
> __IS_MP;
> -r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC :
> __IS_MC;
>
>  if (flags & RING_F_EXACT_SZ) {
>  r->size = rte_align32pow2(count + 1);
> @@ -100,8 +111,30 @@ rte_ring_init(struct rte_ring *r, const char
> *name, unsigned count,
>  r->mask = count - 1;
>  r->capacity = r->mask;
>  }
> -r->prod.head = r->cons.head = 0;
> -r->prod.tail = r->cons.tail = 0;
> +
> +if (flags & RING_F_NB) {
> +uint64_t i;
> +
> +r->prod_64.single = (flags & RING_F_SP_ENQ) ?
> __IS_SP : __IS_MP;
> +r->cons_64.single = (flags & RING_F_SC_DEQ) ?
> __IS_SC : __IS_MC;
> +r->prod_64.head = r->cons_64.head = 0;
> +r->prod_64.tail = r->cons_64.tail = 0;
> +
> +for (i = 0; i < r->size; i++) {
> +struct nb_ring_entry *ring_ptr, *base;
> +
> +base = ((struct nb_ring_entry *)&r[1]);
> +
> +ring_ptr = &base[i & r->mask];
> +
> +ring_ptr->cnt = i;
> +}
> +} else {
> +r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP :
> __IS_MP;
> +r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC :
> __IS_MC;
> +r->prod.head = r->cons.head = 0;
> +r->prod.tail = r->cons.tail = 0;
> +}
>
>  return 0;
>  }
> @@ -123,11 +156,19 @@ rte_ring_create(const char *name, unsigned
> count, int socket_id,
>
>  ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head,
> rte_ring_list);
>
> +#if !defined(RTE_ARCH_X86_64)
> +if (flags & RING_F_NB) {
> +printf("RING_F_NB is only supported on x86-64
> platforms\n");
> +rte_errno = EINVAL;
> +return NULL;
> +}
> +#endif
> +
>  /* for an exact size ring, round up from count to a power of
> two */
>  if (flags & RING_F_EXACT_SZ)
>  count = rte_align32pow2(count + 1);
>
> -ring_size = rte_ring_get_memsize(count);
> +ring_size = rte_ring_get_memsize(count, flags);
>  if (ring_size < 0) {
>  rte_errno = ring_size;
>  return NULL;
> @@ -227,10 +268,17 @@ rte_ring_dump(FILE *f, const struct rte_ring
> *r)
>  fprintf(f, "  flags=%x\n", r->flags);
>  fprintf(f, "  size=%"PRIu32"\n", r->size);
>  fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
> -fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> -fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> -fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> -fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +if (r->flags & RING_F_NB) {
> +fprintf(f, "  ct=%"PRIu64"\n", r->cons_64.tail);
> +fprintf(f, "  ch=%"PRIu64"\n", r->cons_64.head);
> +fprintf(f, "  pt=%"PRIu64"\n", r->prod_64.tail);
> +fprintf(f, "  ph=%"PRIu64"\n", r->prod_64.head);
> +} else {
> +fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> +fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> +fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> +fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +}
>  fprintf(f, "  used=%u\n", rte_ring_count(r));
>  fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
>  }
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index b270a4746..08c9de6a6 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -134,6 +134,18 @@ struct rte_ring {
>   */
>  #define RING_F_EXACT_SZ 0x0004
>  #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> +/**
> + * The ring uses non-blocking enqueue and dequeue functions. These
> functions
> + * do not have the "non-preemptive" constraint of a regular rte
> ring, and thus
> + * are suited for applications using preemptible pthreads. However,
> the
> + * non-blocking functions have worse average-case performance than
> their
> + * regular rte ring counterparts. When used as the handler for a
> mempool,
> + * per-thread caching can mitigate the performance difference by
> reducing the
> + * number (and contention) of ring accesses.
> + *
> + * This flag is only supported on x86_64 platforms.
> + */
> +#define RING_F_NB 0x0008
>
>  /* @internal defines for passing to the enqueue dequeue worker
> functions */
>  #define __IS_SP 1
> @@ -151,11 +163,15 @@ struct rte_ring {
>   *
>   * @param count
>   *   The number of elements in the ring (must be a power of 2).
> + * @param flags
> + *   The flags the ring will be created with.
>   * @return
>   *   - The memory size needed for the ring on success.
>   *   - -EINVAL if count is not a power of 2.
>   */
> -ssize_t rte_ring_get_memsize(unsigned count);
> +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int
> flags);
> +ssize_t rte_ring_get_memsize_v20(unsigned int count);
> +ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int
> flags);
>
>  /**
>   * Initialize a ring structure.
> @@ -188,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior
> when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-
> power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   0 on success, or a negative value on error.
>   */
> @@ -223,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const
> char *name, unsigned count,
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior
> when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-
> power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   On success, the pointer to the new allocated ring. NULL on
> error with
>   *    rte_errno set appropriately. Possible errno values include:
>   *    - E_RTE_NO_CONFIG - function could not get pointer to
> rte_config structure
>   *    - E_RTE_SECONDARY - function was called from a secondary
> process instance
> - *    - EINVAL - count provided is not a power of 2
> + *    - EINVAL - count provided is not a power of 2, or RING_F_NB is
> used on an
> + *      unsupported platform
>   *    - ENOSPC - the maximum number of memzones has already been
> allocated
>   *    - EEXIST - a memzone with the same name already exists
>   *    - ENOMEM - no appropriate memory area found in which to create
> memzone
> @@ -284,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct
> rte_ring *r);
>  } \
>  } while (0)
>
> +/* The actual enqueue of pointers on the ring.
> + * Used only by the single-producer non-blocking enqueue function,
> but
> + * out-lined here for code readability.
> + */
> +#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do {
> \
> +unsigned int i; \
> +const uint32_t size = (r)->size; \
> +size_t idx = prod_head & (r)->mask; \
> +size_t new_cnt = prod_head + size; \
> +struct nb_ring_entry *ring = (struct nb_ring_entry
> *)ring_start; \
> +if (likely(idx + n < size)) { \
> +for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4,
> idx += 4) { \
> +ring[idx].ptr = obj_table[i]; \
> +ring[idx].cnt = new_cnt + i;  \
> +ring[idx + 1].ptr = obj_table[i + 1]; \
> +ring[idx + 1].cnt = new_cnt + i + 1;  \
> +ring[idx + 2].ptr = obj_table[i + 2]; \
> +ring[idx + 2].cnt = new_cnt + i + 2;  \
> +ring[idx + 3].ptr = obj_table[i + 3]; \
> +ring[idx + 3].cnt = new_cnt + i + 3;  \
> +} \
> +switch (n & 0x3) { \
> +case 3: \
> +ring[idx].cnt = new_cnt + i; \
> +ring[idx++].ptr = obj_table[i++]; /*
> fallthrough */ \
> +case 2: \
> +ring[idx].cnt = new_cnt + i; \
> +ring[idx++].ptr = obj_table[i++]; /*
> fallthrough */ \
> +case 1: \
> +ring[idx].cnt = new_cnt + i; \
> +ring[idx++].ptr = obj_table[i++]; \
> +} \
> +} else { \
> +for (i = 0; idx < size; i++, idx++) { \
> +ring[idx].cnt = new_cnt + i;  \
> +ring[idx].ptr = obj_table[i]; \
> +} \
> +for (idx = 0; i < n; i++, idx++) {    \
> +ring[idx].cnt = new_cnt + i;  \
> +ring[idx].ptr = obj_table[i]; \
> +} \
> +} \
> +} while (0)
> +
>  /* the actual copy of pointers on the ring to obj_table.
>   * Placed here since identical code needed in both
>   * single and multi consumer dequeue functions */
> @@ -315,6 +384,39 @@ void rte_ring_dump(FILE *f, const struct
> rte_ring *r);
>  } \
>  } while (0)
>
> +/* The actual copy of pointers on the ring to obj_table.
> + * Placed here since identical code needed in both
> + * single and multi consumer non-blocking dequeue functions.
> + */
> +#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do {
> \
> +unsigned int i; \
> +size_t idx = cons_head & (r)->mask; \
> +const uint32_t size = (r)->size; \
> +struct nb_ring_entry *ring = (struct nb_ring_entry
> *)ring_start; \
> +if (likely(idx + n < size)) { \
> +for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx
> += 4) {\
> +obj_table[i] = ring[idx].ptr; \
> +obj_table[i + 1] = ring[idx + 1].ptr; \
> +obj_table[i + 2] = ring[idx + 2].ptr; \
> +obj_table[i + 3] = ring[idx + 3].ptr; \
> +} \
> +switch (n & 0x3) { \
> +case 3: \
> +obj_table[i++] = ring[idx++].ptr; /*
> fallthrough */ \
> +case 2: \
> +obj_table[i++] = ring[idx++].ptr; /*
> fallthrough */ \
> +case 1: \
> +obj_table[i++] = ring[idx++].ptr; \
> +} \
> +} else { \
> +for (i = 0; idx < size; i++, idx++) \
> +obj_table[i] = ring[idx].ptr; \
> +for (idx = 0; i < n; i++, idx++) \
> +obj_table[i] = ring[idx].ptr; \
> +} \
> +} while (0)
> +
> +
>  /* Between load and load. there might be cpu reorder in weak model
>   * (powerpc/arm).
>   * There are 2 choices for the users
> @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> rte_ring *r);
>  #endif
>  #include "rte_ring_generic_64.h"
>
> +/* @internal 128-bit structure used by the non-blocking ring */
> +struct nb_ring_entry {
> +void *ptr; /**< Data pointer */
> +uint64_t cnt; /**< Modification counter */
Why not make 'cnt' uintptr_t? This way 32-bit architectures will also
be supported.

> +};
> +
> +/* The non-blocking ring algorithm is based on the original rte ring
> (derived
> + * from FreeBSD's bufring.h) and inspired by Michael and Scott's
> non-blocking
> + * concurrent queue.
> + */
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (single-
> producer only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has
> finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const
> *obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *free_space)
> +{
> +uint32_t free_entries;
> +size_t head, next;
> +
> +n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> + &head, &next,
> &free_entries);
> +if (n == 0)
> +goto end;
> +
> +ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +r->prod_64.tail += n;
Don't we need release order (or smp_wmb) between writing of the ring
pointers and the update of tail? By updating the tail pointer, we are
synchronising with a consumer.

> +
> +end:
> +if (free_space != NULL)
> +*free_space = free_entries - n;
> +return n;
> +}
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (multi-
> producer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has
> finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const
> *obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *free_space)
> +{
> +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> +RTE_SET_USED(r);
> +RTE_SET_USED(obj_table);
> +RTE_SET_USED(n);
> +RTE_SET_USED(behavior);
> +RTE_SET_USED(free_space);
> +#ifndef ALLOW_EXPERIMENTAL_API
> +printf("[%s()] RING_F_NB requires an experimental API."
> +       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> +       , __func__);
> +#endif
> +return 0;
> +#endif
> +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> +size_t head, next, tail;
> +uint32_t free_entries;
> +unsigned int i;
> +
> +n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> + &head, &next,
> &free_entries);
> +if (n == 0)
> +goto end;
> +
> +for (i = 0; i < n; /* i incremented if enqueue succeeds */)
> {
> +struct nb_ring_entry old_value, new_value;
> +struct nb_ring_entry *ring_ptr;
> +
> +/* Enqueue to the tail entry. If another thread wins
> the race,
> + * retry with the new tail.
> + */
> +tail = r->prod_64.tail;
> +
> +ring_ptr = &((struct nb_ring_entry *)&r[1])[tail &
> r->mask];
This is a very ugly cast. Also I think it is unnecessary. What's
preventing this from being written without a cast? Perhaps the ring
array needs to be a union of "void *" and struct nb_ring_entry?

> +
> +old_value = *ring_ptr;
> +
> +/* If the tail entry's modification counter doesn't
> match the
> + * producer tail index, it's already been updated.
> + */
> +if (old_value.cnt != tail)
> +continue;
Continue restarts the loop at the condition test in the for statement,
'i' and 'n' are unchanged. Then we re-read 'prod_64.tail' and
'ring[tail]'. If some other thread never updates 'prod_64.tail', the
test here (ring[tail].cnt != tail) will still be false and we will spin
forever.
Waiting for other threads <=> blocking behaviour so this is not a non-
blocking design.

> +
> +/* Prepare the new entry. The cnt field mitigates
> the ABA
> + * problem on the ring write.
> + */
> +new_value.ptr = obj_table[i];
> +new_value.cnt = tail + r->size;
> +
> +if (rte_atomic128_cmpset((volatile rte_int128_t
> *)ring_ptr,
> + (rte_int128_t *)&old_value,
> + (rte_int128_t
> *)&new_value))
> +i++;
> +
> +/* Every thread attempts the cmpset, so they don't
> have to wait
> + * for the thread that successfully enqueued to the
> ring.
> + * Using a 64-bit tail mitigates the ABA problem
> here.
> + *
> + * Built-in used to handle variable-sized tail
> index.
> + */
But prod_64.tail is 64 bits so not really variable size?

> +__sync_bool_compare_and_swap(&r->prod_64.tail, tail,
> tail + 1);
What memory order is required here? Why not use
__atomic_compare_exchange() with explicit memory order parameters?

> +}
> +
> +end:
> +if (free_space != NULL)
> +*free_space = free_entries - n;
> +return n;
> +#endif
> +}
> +
> +/**
> + * @internal Enqueue several objects on the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> the ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head
> update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has
> finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const
> *obj_table,
> + unsigned int n, enum
> rte_ring_queue_behavior behavior,
> + unsigned int is_sp, unsigned int
> *free_space)
> +{
> +if (is_sp)
> +return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> +   behavior,
> free_space);
> +else
> +return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> +   behavior,
> free_space);
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (single-
> consumer only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue
> has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n
> only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *available)
> +{
> +size_t head, next;
> +uint32_t entries;
> +
> +n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> + &head, &next, &entries);
> +if (n == 0)
> +goto end;
> +
> +DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +r->cons_64.tail += n;
Memory ordering? Consumer synchronises with producer.

> +
> +end:
> +if (available != NULL)
> +*available = entries - n;
> +return n;
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (multi-
> consumer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue
> has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n
> only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
> +    unsigned int n,
> +    enum rte_ring_queue_behavior behavior,
> +    unsigned int *available)
> +{
> +size_t head, next;
> +uint32_t entries;
> +
> +n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
> + &head, &next, &entries);
> +if (n == 0)
> +goto end;
> +
> +while (1) {
> +size_t tail = r->cons_64.tail;
> +
> +/* Dequeue from the cons tail onwards. If multiple
> threads read
> + * the same pointers, the thread that successfully
> performs the
> + * CAS will keep them and the other(s) will retry.
> + */
> +DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
> +
> +next = tail + n;
> +
> +/* Built-in used to handle variable-sized tail
> index. */
> +if (__sync_bool_compare_and_swap(&r->cons_64.tail,
> tail, next))
> +/* There is potential for the ABA problem
> here, but
> + * that is mitigated by the large (64-bit)
> tail.
> + */
> +break;
> +}
> +
> +end:
> +if (available != NULL)
> +*available = entries - n;
> +return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue
> has finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n
> only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
> + unsigned int n, enum rte_ring_queue_behavior
> behavior,
> + unsigned int is_sc, unsigned int *available)
> +{
> +if (is_sc)
> +return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
> +   behavior,
> available);
> +else
> +return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
> +   behavior,
> available);
> +}
> +
>  /**
>   * @internal Enqueue several objects on the ring
>   *
> @@ -438,8 +853,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_MP, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_MP,
> +free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_MP,
> +     free_space);
>  }
>
>  /**
> @@ -461,8 +882,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_SP, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_SP,
> +free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_SP,
> +     free_space);
>  }
>
>  /**
> @@ -488,8 +915,14 @@ static __rte_always_inline unsigned int
>  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>        unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -r->prod.single, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> ,
> +r->prod_64.single,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> +     r->prod.single,
> free_space);
>  }
>
>  /**
> @@ -572,8 +1005,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_MC, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_MC,
> +available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_MC,
> +     available);
>  }
>
>  /**
> @@ -596,8 +1035,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -__IS_SC, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> , __IS_SC,
> +available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> __IS_SC,
> +     available);
>  }
>
>  /**
> @@ -623,8 +1068,14 @@ static __rte_always_inline unsigned int
>  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned
> int n,
>  unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> RTE_RING_QUEUE_FIXED,
> -r->cons.single, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_FIXED
> ,
> +r->cons_64.single,
> available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_FIXED,
> +     r->cons.single,
> available);
>  }
>
>  /**
> @@ -699,9 +1150,13 @@ rte_ring_dequeue(struct rte_ring *r, void
> **obj_p)
>  static inline unsigned
>  rte_ring_count(const struct rte_ring *r)
>  {
> -uint32_t prod_tail = r->prod.tail;
> -uint32_t cons_tail = r->cons.tail;
> -uint32_t count = (prod_tail - cons_tail) & r->mask;
> +uint32_t count;
> +
> +if (r->flags & RING_F_NB)
> +count = (r->prod_64.tail - r->cons_64.tail) & r-
> >mask;
> +else
> +count = (r->prod.tail - r->cons.tail) & r->mask;
> +
>  return (count > r->capacity) ? r->capacity : count;
>  }
>
> @@ -821,8 +1276,14 @@ static __rte_always_inline unsigned
>  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_MP,
> free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_MP,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_MP, free_space);
>  }
>
>  /**
> @@ -844,8 +1305,14 @@ static __rte_always_inline unsigned
>  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const
> *obj_table,
>   unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_SP,
> free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_SP,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_SP, free_space);
>  }
>
>  /**
> @@ -871,8 +1338,14 @@ static __rte_always_inline unsigned
>  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>        unsigned int n, unsigned int *free_space)
>  {
> -return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> -r->prod.single, free_space);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +r->prod_64.single,
> free_space);
> +else
> +return __rte_ring_do_enqueue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     r->prod.single,
> free_space);
>  }
>
>  /**
> @@ -899,8 +1372,14 @@ static __rte_always_inline unsigned
>  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_MC,
> available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_MC, available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_MC, available);
>  }
>
>  /**
> @@ -924,8 +1403,14 @@ static __rte_always_inline unsigned
>  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE, __IS_SC,
> available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +__IS_SC, available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     __IS_SC, available);
>  }
>
>  /**
> @@ -951,9 +1436,14 @@ static __rte_always_inline unsigned
>  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
>  unsigned int n, unsigned int *available)
>  {
> -return __rte_ring_do_dequeue(r, obj_table, n,
> -RTE_RING_QUEUE_VARIABLE,
> -r->cons.single, available);
> +if (r->flags & RING_F_NB)
> +return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +RTE_RING_QUEUE_VARIA
> BLE,
> +r->cons_64.single,
> available);
> +else
> +return __rte_ring_do_dequeue(r, obj_table, n,
> +     RTE_RING_QUEUE_VARIABLE
> ,
> +     r->cons.single,
> available);
>  }
>
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_version.map
> b/lib/librte_ring/rte_ring_version.map
> index d935efd0d..8969467af 100644
> --- a/lib/librte_ring/rte_ring_version.map
> +++ b/lib/librte_ring/rte_ring_version.map
> @@ -17,3 +17,10 @@ DPDK_2.2 {
>  rte_ring_free;
>
>  } DPDK_2.0;
> +
> +DPDK_19.05 {
> +global:
> +
> +rte_ring_get_memsize;
> +
> +} DPDK_2.2;
IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
  
Ola Liljedahl Jan. 22, 2019, 2:49 p.m. UTC | #2
(resending without the confidential footer, think I figured it out, ignore the
previous email from me in this thread)

-- Ola

On Fri, 2019-01-18 at 09:23 -0600, Gage Eads wrote:
> This commit adds support for non-blocking circular ring enqueue and dequeue
> functions. The ring uses a 128-bit compare-and-swap instruction, and thus
> is currently limited to x86_64.
> 
> The algorithm is based on the original rte ring (derived from FreeBSD's
> bufring.h) and inspired by Michael and Scott's non-blocking concurrent
> queue. Importantly, it adds a modification counter to each ring entry to
> ensure only one thread can write to an unused entry.
> 
> -----
> Algorithm:
> 
> Multi-producer non-blocking enqueue:
> 1. Move the producer head index 'n' locations forward, effectively
>    reserving 'n' locations.
> 2. For each pointer:
>  a. Read the producer tail index, then ring[tail]. If ring[tail]'s
>     modification counter isn't 'tail', retry.
>  b. Construct the new entry: {pointer, tail + ring size}
>  c. Compare-and-swap the old entry with the new. If unsuccessful, the
>     next loop iteration will try to enqueue this pointer again.
>  d. Compare-and-swap the tail index with 'tail + 1', whether or not step 2c
>     succeeded. This guarantees threads can make forward progress.
> 
> Multi-consumer non-blocking dequeue:
> 1. Move the consumer head index 'n' locations forward, effectively
>    reserving 'n' pointers to be dequeued.
> 2. Copy 'n' pointers into the caller's object table (ignoring the
>    modification counter), starting from ring[tail], then compare-and-swap
>    the tail index with 'tail + n'.  If unsuccessful, repeat step 2.
> 
> -----
> Discussion:
> 
> There are two cases where the ABA problem is mitigated:
> 1. Enqueueing a pointer to the ring: without a modification counter
>    tied to the tail index, the index could become stale by the time the
>    enqueue happens, causing it to overwrite valid data. Tying the
>    counter to the tail index gives us an expected value (as opposed to,
>    say, a monotonically incrementing counter).
> 
>    Since the counter will eventually wrap, there is potential for the ABA
>    problem. However, using a 64-bit counter makes this likelihood
>    effectively zero.
> 
> 2. Updating a tail index: the ABA problem can occur if the thread is
>    preempted and the tail index wraps around. However, using 64-bit indexes
>    makes this likelihood effectively zero.
> 
> With no contention, an enqueue of n pointers uses (1 + 2n) CAS operations
> and a dequeue of n pointers uses 2. This algorithm has worse average-case
> performance than the regular rte ring (particularly a highly-contended ring
> with large bulk accesses), however:
> - For applications with preemptible pthreads, the regular rte ring's
>   worst-case performance (i.e. one thread being preempted in the
>   update_tail() critical section) is much worse than the non-blocking
>   ring's.
> - Software caching can mitigate the average case performance for ring-based
>   algorithms. For example, a non-blocking ring based mempool (a likely use
>   case for this ring) with per-thread caching.
> 
> The non-blocking ring is enabled via a new flag, RING_F_NB. Because the
> ring's memsize is now a function of its flags (the non-blocking ring
> requires 128b for each entry), this commit adds a new argument ('flags') to
> rte_ring_get_memsize(). An API deprecation notice will be sent in a
> separate commit.
> 
> For ease-of-use, existing ring enqueue and dequeue functions work on both
> regular and non-blocking rings. This introduces an additional branch in
> the datapath, but this should be a highly predictable branch.
> ring_perf_autotest shows a negligible performance impact; it's hard to
> distinguish a real difference versus system noise.
> 
>                                   | ring_perf_autotest cycles with branch -
>              Test                 |   ring_perf_autotest cycles without
> ------------------------------------------------------------------
> SP/SC single enq/dequeue          | 0.33
> MP/MC single enq/dequeue          | -4.00
> SP/SC burst enq/dequeue (size 8)  | 0.00
> MP/MC burst enq/dequeue (size 8)  | 0.00
> SP/SC burst enq/dequeue (size 32) | 0.00
> MP/MC burst enq/dequeue (size 32) | 0.00
> SC empty dequeue                  | 1.00
> MC empty dequeue                  | 0.00
> 
> Single lcore:
> SP/SC bulk enq/dequeue (size 8)   | 0.49
> MP/MC bulk enq/dequeue (size 8)   | 0.08
> SP/SC bulk enq/dequeue (size 32)  | 0.07
> MP/MC bulk enq/dequeue (size 32)  | 0.09
> 
> Two physical cores:
> SP/SC bulk enq/dequeue (size 8)   | 0.19
> MP/MC bulk enq/dequeue (size 8)   | -0.37
> SP/SC bulk enq/dequeue (size 32)  | 0.09
> MP/MC bulk enq/dequeue (size 32)  | -0.05
> 
> Two NUMA nodes:
> SP/SC bulk enq/dequeue (size 8)   | -1.96
> MP/MC bulk enq/dequeue (size 8)   | 0.88
> SP/SC bulk enq/dequeue (size 32)  | 0.10
> MP/MC bulk enq/dequeue (size 32)  | 0.46
> 
> Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
> running on isolcpus cores with a tickless scheduler. Each test run three
> times and the results averaged.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> ---
>  lib/librte_ring/rte_ring.c           |  72 ++++-
>  lib/librte_ring/rte_ring.h           | 550 +++++++++++++++++++++++++++++++++-
> -
>  lib/librte_ring/rte_ring_version.map |   7 +
>  3 files changed, 587 insertions(+), 42 deletions(-)
> 
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> index d215acecc..f3378dccd 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
>  
>  /* return the size of memory occupied by a ring */
>  ssize_t
> -rte_ring_get_memsize(unsigned count)
> +rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
>  {
> -	ssize_t sz;
> +	ssize_t sz, elt_sz;
>  
>  	/* count must be a power of 2 */
>  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
>  		return -EINVAL;
>  	}
>  
> -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> +	elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *);
> +
> +	sz = sizeof(struct rte_ring) + count * elt_sz;
>  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
>  	return sz;
>  }
> +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
> +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
> +					       unsigned int flags),
> +		  rte_ring_get_memsize_v1905);
> +
> +ssize_t
> +rte_ring_get_memsize_v20(unsigned int count)
> +{
> +	return rte_ring_get_memsize_v1905(count, 0);
> +}
> +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
>  
>  int
>  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> @@ -82,8 +95,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned
> count,
>  	if (ret < 0 || ret >= (int)sizeof(r->name))
>  		return -ENAMETOOLONG;
>  	r->flags = flags;
> -	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> -	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
>  
>  	if (flags & RING_F_EXACT_SZ) {
>  		r->size = rte_align32pow2(count + 1);
> @@ -100,8 +111,30 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  		r->mask = count - 1;
>  		r->capacity = r->mask;
>  	}
> -	r->prod.head = r->cons.head = 0;
> -	r->prod.tail = r->cons.tail = 0;
> +
> +	if (flags & RING_F_NB) {
> +		uint64_t i;
> +
> +		r->prod_64.single = (flags & RING_F_SP_ENQ) ? __IS_SP :
> __IS_MP;
> +		r->cons_64.single = (flags & RING_F_SC_DEQ) ? __IS_SC :
> __IS_MC;
> +		r->prod_64.head = r->cons_64.head = 0;
> +		r->prod_64.tail = r->cons_64.tail = 0;
> +
> +		for (i = 0; i < r->size; i++) {
> +			struct nb_ring_entry *ring_ptr, *base;
> +
> +			base = ((struct nb_ring_entry *)&r[1]);
> +
> +			ring_ptr = &base[i & r->mask];
> +
> +			ring_ptr->cnt = i;
> +		}
> +	} else {
> +		r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
> +		r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
> +		r->prod.head = r->cons.head = 0;
> +		r->prod.tail = r->cons.tail = 0;
> +	}
>  
>  	return 0;
>  }
> @@ -123,11 +156,19 @@ rte_ring_create(const char *name, unsigned count, int
> socket_id,
>  
>  	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
>  
> +#if !defined(RTE_ARCH_X86_64)
> +	if (flags & RING_F_NB) {
> +		printf("RING_F_NB is only supported on x86-64 platforms\n");
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +#endif
> +
>  	/* for an exact size ring, round up from count to a power of two */
>  	if (flags & RING_F_EXACT_SZ)
>  		count = rte_align32pow2(count + 1);
>  
> -	ring_size = rte_ring_get_memsize(count);
> +	ring_size = rte_ring_get_memsize(count, flags);
>  	if (ring_size < 0) {
>  		rte_errno = ring_size;
>  		return NULL;
> @@ -227,10 +268,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
>  	fprintf(f, "  flags=%x\n", r->flags);
>  	fprintf(f, "  size=%"PRIu32"\n", r->size);
>  	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
> -	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> -	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> -	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> -	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +	if (r->flags & RING_F_NB) {
> +		fprintf(f, "  ct=%"PRIu64"\n", r->cons_64.tail);
> +		fprintf(f, "  ch=%"PRIu64"\n", r->cons_64.head);
> +		fprintf(f, "  pt=%"PRIu64"\n", r->prod_64.tail);
> +		fprintf(f, "  ph=%"PRIu64"\n", r->prod_64.head);
> +	} else {
> +		fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
> +		fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
> +		fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
> +		fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
> +	}
>  	fprintf(f, "  used=%u\n", rte_ring_count(r));
>  	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
>  }
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index b270a4746..08c9de6a6 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -134,6 +134,18 @@ struct rte_ring {
>   */
>  #define RING_F_EXACT_SZ 0x0004
>  #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> +/**
> + * The ring uses non-blocking enqueue and dequeue functions. These functions
> + * do not have the "non-preemptive" constraint of a regular rte ring, and
> thus
> + * are suited for applications using preemptible pthreads. However, the
> + * non-blocking functions have worse average-case performance than their
> + * regular rte ring counterparts. When used as the handler for a mempool,
> + * per-thread caching can mitigate the performance difference by reducing the
> + * number (and contention) of ring accesses.
> + *
> + * This flag is only supported on x86_64 platforms.
> + */
> +#define RING_F_NB 0x0008
>  
>  /* @internal defines for passing to the enqueue dequeue worker functions */
>  #define __IS_SP 1
> @@ -151,11 +163,15 @@ struct rte_ring {
>   *
>   * @param count
>   *   The number of elements in the ring (must be a power of 2).
> + * @param flags
> + *   The flags the ring will be created with.
>   * @return
>   *   - The memory size needed for the ring on success.
>   *   - -EINVAL if count is not a power of 2.
>   */
> -ssize_t rte_ring_get_memsize(unsigned count);
> +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
> +ssize_t rte_ring_get_memsize_v20(unsigned int count);
> +ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
>  
>  /**
>   * Initialize a ring structure.
> @@ -188,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   0 on success, or a negative value on error.
>   */
> @@ -223,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>   *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
>   *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
>   *      is "single-consumer". Otherwise, it is "multi-consumers".
> + *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
> + *      number, but up to half the ring space may be wasted.
> + *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
> + *      non-blocking variants of the dequeue and enqueue functions.
>   * @return
>   *   On success, the pointer to the new allocated ring. NULL on error with
>   *    rte_errno set appropriately. Possible errno values include:
>   *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
>   *    - E_RTE_SECONDARY - function was called from a secondary process
> instance
> - *    - EINVAL - count provided is not a power of 2
> + *    - EINVAL - count provided is not a power of 2, or RING_F_NB is used on
> an
> + *      unsupported platform
>   *    - ENOSPC - the maximum number of memzones has already been allocated
>   *    - EEXIST - a memzone with the same name already exists
>   *    - ENOMEM - no appropriate memory area found in which to create memzone
> @@ -284,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  	} \
>  } while (0)
>  
> +/* The actual enqueue of pointers on the ring.
> + * Used only by the single-producer non-blocking enqueue function, but
> + * out-lined here for code readability.
> + */
> +#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \
> +	unsigned int i; \
> +	const uint32_t size = (r)->size; \
> +	size_t idx = prod_head & (r)->mask; \
> +	size_t new_cnt = prod_head + size; \
> +	struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) {
> \
> +			ring[idx].ptr = obj_table[i]; \
> +			ring[idx].cnt = new_cnt + i;  \
> +			ring[idx + 1].ptr = obj_table[i + 1]; \
> +			ring[idx + 1].cnt = new_cnt + i + 1;  \
> +			ring[idx + 2].ptr = obj_table[i + 2]; \
> +			ring[idx + 2].cnt = new_cnt + i + 2;  \
> +			ring[idx + 3].ptr = obj_table[i + 3]; \
> +			ring[idx + 3].cnt = new_cnt + i + 3;  \
> +		} \
> +		switch (n & 0x3) { \
> +		case 3: \
> +			ring[idx].cnt = new_cnt + i; \
> +			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
> +		case 2: \
> +			ring[idx].cnt = new_cnt + i; \
> +			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
> +		case 1: \
> +			ring[idx].cnt = new_cnt + i; \
> +			ring[idx++].ptr = obj_table[i++]; \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++) { \
> +			ring[idx].cnt = new_cnt + i;  \
> +			ring[idx].ptr = obj_table[i]; \
> +		} \
> +		for (idx = 0; i < n; i++, idx++) {    \
> +			ring[idx].cnt = new_cnt + i;  \
> +			ring[idx].ptr = obj_table[i]; \
> +		} \
> +	} \
> +} while (0)
> +
>  /* the actual copy of pointers on the ring to obj_table.
>   * Placed here since identical code needed in both
>   * single and multi consumer dequeue functions */
> @@ -315,6 +384,39 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  	} \
>  } while (0)
>  
> +/* The actual copy of pointers on the ring to obj_table.
> + * Placed here since identical code needed in both
> + * single and multi consumer non-blocking dequeue functions.
> + */
> +#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \
> +	unsigned int i; \
> +	size_t idx = cons_head & (r)->mask; \
> +	const uint32_t size = (r)->size; \
> +	struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> +			obj_table[i] = ring[idx].ptr; \
> +			obj_table[i + 1] = ring[idx + 1].ptr; \
> +			obj_table[i + 2] = ring[idx + 2].ptr; \
> +			obj_table[i + 3] = ring[idx + 3].ptr; \
> +		} \
> +		switch (n & 0x3) { \
> +		case 3: \
> +			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
> +		case 2: \
> +			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
> +		case 1: \
> +			obj_table[i++] = ring[idx++].ptr; \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++) \
> +			obj_table[i] = ring[idx].ptr; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			obj_table[i] = ring[idx].ptr; \
> +	} \
> +} while (0)
> +
> +
>  /* Between load and load. there might be cpu reorder in weak model
>   * (powerpc/arm).
>   * There are 2 choices for the users
> @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
>  #endif
>  #include "rte_ring_generic_64.h"
>  
> +/* @internal 128-bit structure used by the non-blocking ring */
> +struct nb_ring_entry {
> +	void *ptr; /**< Data pointer */
> +	uint64_t cnt; /**< Modification counter */
Why not make 'cnt' uintptr_t? This way 32-bit architectures will also
be supported. I think there are some claims that DPDK still supports e.g. ARMv7a
and possibly also 32-bit x86?

> +};
> +
> +/* The non-blocking ring algorithm is based on the original rte ring (derived
> + * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking
> + * concurrent queue.
> + */
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (single-producer only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *free_space)
> +{
> +	uint32_t free_entries;
> +	size_t head, next;
> +
> +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> +					 &head, &next, &free_entries);
> +	if (n == 0)
> +		goto end;
> +
> +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +	r->prod_64.tail += n;
Don't we need release order when (or smp_wmb between) writing of the ring
pointers and the update of tail? By updating the tail pointer, we are
synchronising with a consumer.

I prefer using __atomic operations even for load and store. You can see which
parts of the code that synchronise with each other, e.g. store-release to some
location synchronises with load-acquire from the same location. If you don't
know how different threads synchronise with each other, you are very likely to
make mistakes.

> +
> +end:
> +	if (free_space != NULL)
> +		*free_space = free_entries - n;
> +	return n;
> +}
> +
> +/**
> + * @internal
> + *   Enqueue several objects on the non-blocking ring (multi-producer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *free_space)
> +{
> +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> +	RTE_SET_USED(r);
> +	RTE_SET_USED(obj_table);
> +	RTE_SET_USED(n);
> +	RTE_SET_USED(behavior);
> +	RTE_SET_USED(free_space);
> +#ifndef ALLOW_EXPERIMENTAL_API
> +	printf("[%s()] RING_F_NB requires an experimental API."
> +	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> +	       , __func__);
> +#endif
> +	return 0;
> +#endif
> +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> +	size_t head, next, tail;
> +	uint32_t free_entries;
> +	unsigned int i;
> +
> +	n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> +					 &head, &next, &free_entries);
> +	if (n == 0)
> +		goto end;
> +
> +	for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> +		struct nb_ring_entry old_value, new_value;
> +		struct nb_ring_entry *ring_ptr;
> +
> +		/* Enqueue to the tail entry. If another thread wins the
> race,
> +		 * retry with the new tail.
> +		 */
> +		tail = r->prod_64.tail;
> +
> +		ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask];
This is an ugly expression and cast. Also I think it is unnecessary. What's
preventing this from being written without a cast? Perhaps the ring array needs
to be a union of "void *" and struct nb_ring_entry?

> +
> +		old_value = *ring_ptr;
> +
> +		/* If the tail entry's modification counter doesn't match the
> +		 * producer tail index, it's already been updated.
> +		 */
> +		if (old_value.cnt != tail)
> +			continue;
Continue restarts the loop at the condition test in the for statement,
'i' and 'n' are unchanged. Then we re-read 'prod_64.tail' and
'ring[tail & mask]'. If some other thread never updates 'prod_64.tail', the
test here (ring[tail].cnt != tail) will still be true and we will spin
forever.

Waiting for other threads <=> blocking behaviour so this is not a non-
blocking design.

> +
> +		/* Prepare the new entry. The cnt field mitigates the ABA
> +		 * problem on the ring write.
> +		 */
> +		new_value.ptr = obj_table[i];
> +		new_value.cnt = tail + r->size;
> +
> +		if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
> +					 (rte_int128_t *)&old_value,
> +					 (rte_int128_t *)&new_value))
> +			i++;
> +
> +		/* Every thread attempts the cmpset, so they don't have to
> wait
> +		 * for the thread that successfully enqueued to the ring.
> +		 * Using a 64-bit tail mitigates the ABA problem here.
> +		 *
> +		 * Built-in used to handle variable-sized tail index.
> +		 */
But prod_64.tail is 64 bits so not really variable size?

> +		__sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail +
> 1);
What memory order is required here? Why not use
__atomic_compare_exchange() with explicit memory order parameters?

> +	}
> +
> +end:
> +	if (free_space != NULL)
> +		*free_space = free_entries - n;
> +	return n;
> +#endif
> +}
> +
> +/**
> + * @internal Enqueue several objects on the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, enum rte_ring_queue_behavior
> behavior,
> +			 unsigned int is_sp, unsigned int *free_space)
> +{
> +	if (is_sp)
> +		return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> +						   behavior, free_space);
> +	else
> +		return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> +						   behavior, free_space);
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (single-consumer
> only)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *available)
> +{
> +	size_t head, next;
> +	uint32_t entries;
> +
> +	n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> +					 &head, &next, &entries);
> +	if (n == 0)
> +		goto end;
> +
> +	DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> +
> +	r->cons_64.tail += n;
Memory ordering? Consumer synchronises with producer.

> +
> +end:
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * @internal
> + *   Dequeue several objects from the non-blocking ring (multi-consumer safe)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
> +			    unsigned int n,
> +			    enum rte_ring_queue_behavior behavior,
> +			    unsigned int *available)
> +{
> +	size_t head, next;
> +	uint32_t entries;
> +
> +	n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
> +					 &head, &next, &entries);
> +	if (n == 0)
> +		goto end;
> +
> +	while (1) {
> +		size_t tail = r->cons_64.tail;
> +
> +		/* Dequeue from the cons tail onwards. If multiple threads
> read
> +		 * the same pointers, the thread that successfully performs
> the
> +		 * CAS will keep them and the other(s) will retry.
> +		 */
> +		DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
> +
> +		next = tail + n;
> +
> +		/* Built-in used to handle variable-sized tail index. */
> +		if (__sync_bool_compare_and_swap(&r->cons_64.tail, tail,
> next))
> +			/* There is potential for the ABA problem here, but
> +			 * that is mitigated by the large (64-bit) tail.
> +			 */
> +			break;
> +	}
> +
> +end:
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the non-blocking ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
> +		 unsigned int n, enum rte_ring_queue_behavior behavior,
> +		 unsigned int is_sc, unsigned int *available)
> +{
> +	if (is_sc)
> +		return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
> +						   behavior, available);
> +	else
> +		return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
> +						   behavior, available);
> +}
> +
>  /**
>   * @internal Enqueue several objects on the ring
>   *
> @@ -438,8 +853,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_MP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_MP,
> +						free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_MP,
> +					     free_space);
>  }
>  
>  /**
> @@ -461,8 +882,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_SP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_SP,
> +						free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_SP,
> +					     free_space);
>  }
>  
>  /**
> @@ -488,8 +915,14 @@ static __rte_always_inline unsigned int
>  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			r->prod.single, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> +						r->prod_64.single,
> free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED,
> +					     r->prod.single, free_space);
>  }
>  
>  /**
> @@ -572,8 +1005,14 @@ static __rte_always_inline unsigned int
>  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_MC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_MC,
> +						available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_MC,
> +					     available);
>  }
>  
>  /**
> @@ -596,8 +1035,14 @@ static __rte_always_inline unsigned int
>  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -			__IS_SC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> __IS_SC,
> +						available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED, __IS_SC,
> +					     available);
>  }
>  
>  /**
> @@ -623,8 +1068,14 @@ static __rte_always_inline unsigned int
>  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
>  		unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
> -				r->cons.single, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_FIXED,
> +						r->cons_64.single,
> available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_FIXED,
> +					     r->cons.single, available);
>  }
>  
>  /**
> @@ -699,9 +1150,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
>  static inline unsigned
>  rte_ring_count(const struct rte_ring *r)
>  {
> -	uint32_t prod_tail = r->prod.tail;
> -	uint32_t cons_tail = r->cons.tail;
> -	uint32_t count = (prod_tail - cons_tail) & r->mask;
> +	uint32_t count;
> +
> +	if (r->flags & RING_F_NB)
> +		count = (r->prod_64.tail - r->cons_64.tail) & r->mask;
> +	else
> +		count = (r->prod.tail - r->cons.tail) & r->mask;
> +
>  	return (count > r->capacity) ? r->capacity : count;
>  }
>  
> @@ -821,8 +1276,14 @@ static __rte_always_inline unsigned
>  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_MP, free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_MP, free_space);
>  }
>  
>  /**
> @@ -844,8 +1305,14 @@ static __rte_always_inline unsigned
>  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  			 unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_SP, free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_SP, free_space);
>  }
>  
>  /**
> @@ -871,8 +1338,14 @@ static __rte_always_inline unsigned
>  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  		      unsigned int n, unsigned int *free_space)
>  {
> -	return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> -			r->prod.single, free_space);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_enqueue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						r->prod_64.single,
> free_space);
> +	else
> +		return __rte_ring_do_enqueue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     r->prod.single, free_space);
>  }
>  
>  /**
> @@ -899,8 +1372,14 @@ static __rte_always_inline unsigned
>  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_MC, available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_MC, available);
>  }
>  
>  /**
> @@ -924,8 +1403,14 @@ static __rte_always_inline unsigned
>  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						__IS_SC, available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     __IS_SC, available);
>  }
>  
>  /**
> @@ -951,9 +1436,14 @@ static __rte_always_inline unsigned
>  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
>  		unsigned int n, unsigned int *available)
>  {
> -	return __rte_ring_do_dequeue(r, obj_table, n,
> -				RTE_RING_QUEUE_VARIABLE,
> -				r->cons.single, available);
> +	if (r->flags & RING_F_NB)
> +		return __rte_ring_do_nb_dequeue(r, obj_table, n,
> +						RTE_RING_QUEUE_VARIABLE,
> +						r->cons_64.single,
> available);
> +	else
> +		return __rte_ring_do_dequeue(r, obj_table, n,
> +					     RTE_RING_QUEUE_VARIABLE,
> +					     r->cons.single, available);
>  }
>  
>  #ifdef __cplusplus
> diff --git a/lib/librte_ring/rte_ring_version.map
> b/lib/librte_ring/rte_ring_version.map
> index d935efd0d..8969467af 100644
> --- a/lib/librte_ring/rte_ring_version.map
> +++ b/lib/librte_ring/rte_ring_version.map
> @@ -17,3 +17,10 @@ DPDK_2.2 {
>  	rte_ring_free;
>  
>  } DPDK_2.0;
> +
> +DPDK_19.05 {
> +	global:
> +
> +	rte_ring_get_memsize;
> +
> +} DPDK_2.2;
  
Eads, Gage Jan. 22, 2019, 9:31 p.m. UTC | #3
Hi Ola,

<snip>

> > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> > rte_ring *r);
> >  #endif
> >  #include "rte_ring_generic_64.h"
> >
> > +/* @internal 128-bit structure used by the non-blocking ring */
> > +struct nb_ring_entry {
> > +	void *ptr; /**< Data pointer */
> > +	uint64_t cnt; /**< Modification counter */
> Why not make 'cnt' uintptr_t? This way 32-bit architectures will also be
> supported. I think there are some claims that DPDK still supports e.g. ARMv7a
> and possibly also 32-bit x86?

I chose a 64-bit modification counter because (practically speaking) the ABA problem will not occur with such a large counter -- definitely not within my lifetime. See the "Discussion" section of the commit message for more information.

With a 32-bit counter, there is a very (very) low likelihood of it, but it is possible. Personally, I don't feel comfortable providing such code, because a) I doubt all users would understand the implementation well enough to do the risk/reward analysis, and b) such a bug would be near impossible to reproduce and root-cause if it did occur.

> 
> > +};
> > +
> > +/* The non-blocking ring algorithm is based on the original rte ring
> > +(derived
> > + * from FreeBSD's bufring.h) and inspired by Michael and Scott's
> > +non-blocking
> > + * concurrent queue.
> > + */
> > +
> > +/**
> > + * @internal
> > + *   Enqueue several objects on the non-blocking ring
> > +(single-producer only)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > +ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > +the ring
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has
> > +finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
> > +			    unsigned int n,
> > +			    enum rte_ring_queue_behavior behavior,
> > +			    unsigned int *free_space)
> > +{
> > +	uint32_t free_entries;
> > +	size_t head, next;
> > +
> > +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> > +					 &head, &next, &free_entries);
> > +	if (n == 0)
> > +		goto end;
> > +
> > +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > +
> > +	r->prod_64.tail += n;
> Don't we need release order when (or smp_wmb between) writing of the ring
> pointers and the update of tail? By updating the tail pointer, we are
> synchronising with a consumer.
> 
> I prefer using __atomic operations even for load and store. You can see which
> parts of the code that synchronise with each other, e.g. store-release to some
> location synchronises with load-acquire from the same location. If you don't
> know how different threads synchronise with each other, you are very likely to
> make mistakes.
> 

You can tell this code was written when I thought x86-64 was the only viable target :). Yes, you are correct.

With regards to using __atomic intrinsics, I'm planning on taking a similar approach to the functions duplicated in rte_ring_generic.h and rte_ring_c11_mem.h: one version that uses rte_atomic functions (and thus stricter memory ordering) and one that uses __atomic intrinsics (and thus can benefit from more relaxed memory ordering).

> > +
> > +end:
> > +	if (free_space != NULL)
> > +		*free_space = free_entries - n;
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal
> > + *   Enqueue several objects on the non-blocking ring (multi-producer
> > +safe)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > +ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > +the ring
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has
> > +finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
> > +			    unsigned int n,
> > +			    enum rte_ring_queue_behavior behavior,
> > +			    unsigned int *free_space)
> > +{
> > +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> > +	RTE_SET_USED(r);
> > +	RTE_SET_USED(obj_table);
> > +	RTE_SET_USED(n);
> > +	RTE_SET_USED(behavior);
> > +	RTE_SET_USED(free_space);
> > +#ifndef ALLOW_EXPERIMENTAL_API
> > +	printf("[%s()] RING_F_NB requires an experimental API."
> > +	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> > +	       , __func__);
> > +#endif
> > +	return 0;
> > +#endif
> > +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> > +	size_t head, next, tail;
> > +	uint32_t free_entries;
> > +	unsigned int i;
> > +
> > +	n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> > +					 &head, &next, &free_entries);
> > +	if (n == 0)
> > +		goto end;
> > +
> > +	for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> > +		struct nb_ring_entry old_value, new_value;
> > +		struct nb_ring_entry *ring_ptr;
> > +
> > +		/* Enqueue to the tail entry. If another thread wins the
> > race,
> > +		 * retry with the new tail.
> > +		 */
> > +		tail = r->prod_64.tail;
> > +
> > +		ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask];
> This is an ugly expression and cast. Also I think it is unnecessary. What's
> preventing this from being written without a cast? Perhaps the ring array needs
> to be a union of "void *" and struct nb_ring_entry?

The cast is necessary for the correct pointer arithmetic (let "uintptr_t base == &r[1]"):
- With cast: ring_ptr = base + sizeof(struct nb_ring_entry) * (tail & r->mask);
- W/o cast: ring_ptr = base + sizeof(struct rte_ring) * (tail & r->mask);

FWIW, this is essentially the same as is done with the second argument (&r[1]) to ENQUEUE_PTRS and DEQUEUE_PTRS, but there it's split across multiple lines of code. The equivalent here would be:
 
struct nb_ring_entry *ring_base = (struct nb_ring_entry*)&r[1];
ring_ptr = ring_base[tail & r->mask];

Which is more legible, I think.

There is no ring array structure in which to add a union; the ring array is a contiguous chunk of memory that immediately follows after the end of a struct rte_ring. We interpret the memory there according to the ring entry data type (void * for regular rings and struct nb_ring_entry for non-blocking rings).

> 
> > +
> > +		old_value = *ring_ptr;
> > +
> > +		/* If the tail entry's modification counter doesn't match the
> > +		 * producer tail index, it's already been updated.
> > +		 */
> > +		if (old_value.cnt != tail)
> > +			continue;
> Continue restarts the loop at the condition test in the for statement, 'i' and 'n'
> are unchanged. Then we re-read 'prod_64.tail' and 'ring[tail & mask]'. If some
> other thread never updates 'prod_64.tail', the test here (ring[tail].cnt != tail) will
> still be true and we will spin forever.
> 
> Waiting for other threads <=> blocking behaviour so this is not a non- blocking
> design.
> 

You're absolutely right. The if-statement was added as optimization to avoid 128-bit cmpset operations that are known to fail, but in this form it violates the non-blocking design.

I see two solutions: 1) drop the if-statement altogether, or 2) attempt to update prod_64.tail before continuing. Both require every thread to attempt to update prod_64.tail on every iteration, but #2 will result in fewer failed 128-bit cmpsets.

> > +
> > +		/* Prepare the new entry. The cnt field mitigates the ABA
> > +		 * problem on the ring write.
> > +		 */
> > +		new_value.ptr = obj_table[i];
> > +		new_value.cnt = tail + r->size;
> > +
> > +		if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
> > +					 (rte_int128_t *)&old_value,
> > +					 (rte_int128_t *)&new_value))
> > +			i++;
> > +
> > +		/* Every thread attempts the cmpset, so they don't have to
> > wait
> > +		 * for the thread that successfully enqueued to the ring.
> > +		 * Using a 64-bit tail mitigates the ABA problem here.
> > +		 *
> > +		 * Built-in used to handle variable-sized tail index.
> > +		 */
> But prod_64.tail is 64 bits so not really variable size?
> 

(See next comment)

> > +		__sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail +
> > 1);
> What memory order is required here? Why not use
> __atomic_compare_exchange() with explicit memory order parameters?
> 

This is an artifact from an older patchset that used uintptr_t, and before I learned that other platforms could support this non-blocking algorithm (hence the __sync type intrinsic was sufficient).

At any rate, as described earlier in this response, I plan on writing these functions using the __atomic builtins in the next patchset.

> > +	}
> > +
> > +end:
> > +	if (free_space != NULL)
> > +		*free_space = free_entries - n;
> > +	return n;
> > +#endif
> > +}
> > +
> > +/**
> > + * @internal Enqueue several objects on the non-blocking ring
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > +ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > +the ring
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer head
> > +update
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has
> > +finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
> > +			 unsigned int n, enum rte_ring_queue_behavior
> > behavior,
> > +			 unsigned int is_sp, unsigned int *free_space) {
> > +	if (is_sp)
> > +		return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> > +						   behavior, free_space);
> > +	else
> > +		return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> > +						   behavior, free_space);
> > +}
> > +
> > +/**
> > + * @internal
> > + *   Dequeue several objects from the non-blocking ring
> > +(single-consumer
> > only)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to pull from the ring.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> > + the ring
> > + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> > + the ring
> > + * @param available
> > + *   returns the number of remaining ring entries after the dequeue
> > + has
> > finished
> > + * @return
> > + *   - Actual number of objects dequeued.
> > + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> > +			    unsigned int n,
> > +			    enum rte_ring_queue_behavior behavior,
> > +			    unsigned int *available)
> > +{
> > +	size_t head, next;
> > +	uint32_t entries;
> > +
> > +	n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> > +					 &head, &next, &entries);
> > +	if (n == 0)
> > +		goto end;
> > +
> > +	DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > +
> > +	r->cons_64.tail += n;
> Memory ordering? Consumer synchronises with producer.
> 

Agreed, that is missing here. Will fix.

Thanks,
Gage

> --
> Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype
> ola.liljedahl
  
Ola Liljedahl Jan. 23, 2019, 10:16 a.m. UTC | #4
On Tue, 2019-01-22 at 21:31 +0000, Eads, Gage wrote:
> Hi Ola,
> 
> <snip>
> 
> > 
> > > 
> > > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> > > rte_ring *r);
> > >  #endif
> > >  #include "rte_ring_generic_64.h"
> > > 
> > > +/* @internal 128-bit structure used by the non-blocking ring */
> > > +struct nb_ring_entry {
> > > +	void *ptr; /**< Data pointer */
> > > +	uint64_t cnt; /**< Modification counter */
> > Why not make 'cnt' uintptr_t? This way 32-bit architectures will also be
> > supported. I think there are some claims that DPDK still supports e.g.
> > ARMv7a
> > and possibly also 32-bit x86?
> I chose a 64-bit modification counter because (practically speaking) the ABA
> problem will not occur with such a large counter -- definitely not within my
> lifetime. See the "Discussion" section of the commit message for more
> information.
> 
> With a 32-bit counter, there is a very (very) low likelihood of it, but it is
> possible. Personally, I don't feel comfortable providing such code, because a)
> I doubt all users would understand the implementation well enough to do the
> risk/reward analysis, and b) such a bug would be near impossible to reproduce
> and root-cause if it did occur.
With a 64-bit counter (and 32-bit pointer), 32-bit architectures (e.g. ARMv7a
and probably x86 as well) won't be able to support this as they at best support
64-bit CAS (ARMv7a has LDREXD/STREXD). So you are essentially putting a 64-bit
(and 128-bit CAS) requirement on the implementation.

> 
> > 
> > 
> > > 
> > > +};
> > > +
> > > +/* The non-blocking ring algorithm is based on the original rte ring
> > > +(derived
> > > + * from FreeBSD's bufring.h) and inspired by Michael and Scott's
> > > +non-blocking
> > > + * concurrent queue.
> > > + */
> > > +
> > > +/**
> > > + * @internal
> > > + *   Enqueue several objects on the non-blocking ring
> > > +(single-producer only)
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param behavior
> > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > > +ring
> > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > > +the ring
> > > + * @param free_space
> > > + *   returns the amount of space after the enqueue operation has
> > > +finished
> > > + * @return
> > > + *   Actual number of objects enqueued.
> > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
> > > +			    unsigned int n,
> > > +			    enum rte_ring_queue_behavior behavior,
> > > +			    unsigned int *free_space)
> > > +{
> > > +	uint32_t free_entries;
> > > +	size_t head, next;
> > > +
> > > +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> > > +					 &head, &next, &free_entries);
> > > +	if (n == 0)
> > > +		goto end;
> > > +
> > > +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > > +
> > > +	r->prod_64.tail += n;
> > Don't we need release order when (or smp_wmb between) writing of the ring
> > pointers and the update of tail? By updating the tail pointer, we are
> > synchronising with a consumer.
> > 
> > I prefer using __atomic operations even for load and store. You can see
> > which
> > parts of the code that synchronise with each other, e.g. store-release to
> > some
> > location synchronises with load-acquire from the same location. If you don't
> > know how different threads synchronise with each other, you are very likely
> > to
> > make mistakes.
> > 
> You can tell this code was written when I thought x86-64 was the only viable
> target :). Yes, you are correct.
> 
> With regards to using __atomic intrinsics, I'm planning on taking a similar
> approach to the functions duplicated in rte_ring_generic.h and
> rte_ring_c11_mem.h: one version that uses rte_atomic functions (and thus
> stricter memory ordering) and one that uses __atomic intrinsics (and thus can
> benefit from more relaxed memory ordering).
What's the advantage of having two different implementations? What is the
disadvantage?

The existing ring buffer code originally had only the "legacy" implementation
which was kept when the __atomic implementation was added. The reason claimed
was that some older compilers for x86 do not support GCC __atomic builtins. But
I thought there was consensus that new functionality could have only __atomic
implementations.

Does the non-blocking ring buffer implementation have to support these older
compilers? Will the applications that require these older compiler be updated to
utilise the non-blocking ring buffer?

> 
> > 
> > > 
> > > +
> > > +end:
> > > +	if (free_space != NULL)
> > > +		*free_space = free_entries - n;
> > > +	return n;
> > > +}
> > > +
> > > +/**
> > > + * @internal
> > > + *   Enqueue several objects on the non-blocking ring (multi-producer
> > > +safe)
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param behavior
> > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > > +ring
> > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > > +the ring
> > > + * @param free_space
> > > + *   returns the amount of space after the enqueue operation has
> > > +finished
> > > + * @return
> > > + *   Actual number of objects enqueued.
> > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
> > > +			    unsigned int n,
> > > +			    enum rte_ring_queue_behavior behavior,
> > > +			    unsigned int *free_space)
> > > +{
> > > +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> > > +	RTE_SET_USED(r);
> > > +	RTE_SET_USED(obj_table);
> > > +	RTE_SET_USED(n);
> > > +	RTE_SET_USED(behavior);
> > > +	RTE_SET_USED(free_space);
> > > +#ifndef ALLOW_EXPERIMENTAL_API
> > > +	printf("[%s()] RING_F_NB requires an experimental API."
> > > +	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> > > +	       , __func__);
> > > +#endif
> > > +	return 0;
> > > +#endif
> > > +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> > > +	size_t head, next, tail;
> > > +	uint32_t free_entries;
> > > +	unsigned int i;
> > > +
> > > +	n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> > > +					 &head, &next, &free_entries);
> > > +	if (n == 0)
> > > +		goto end;
> > > +
> > > +	for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> > > +		struct nb_ring_entry old_value, new_value;
> > > +		struct nb_ring_entry *ring_ptr;
> > > +
> > > +		/* Enqueue to the tail entry. If another thread wins the
> > > race,
> > > +		 * retry with the new tail.
> > > +		 */
> > > +		tail = r->prod_64.tail;
> > > +
> > > +		ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r-
> > > >mask];
> > This is an ugly expression and cast. Also I think it is unnecessary. What's
> > preventing this from being written without a cast? Perhaps the ring array
> > needs
> > to be a union of "void *" and struct nb_ring_entry?
> The cast is necessary for the correct pointer arithmetic (let "uintptr_t base
> == &r[1]"):
Yes I know the C language.

> - With cast: ring_ptr = base + sizeof(struct nb_ring_entry) * (tail & r-
> >mask);
> - W/o cast: ring_ptr = base + sizeof(struct rte_ring) * (tail & r->mask);
> 
> FWIW, this is essentially the same as is done with the second argument (&r[1])
> to ENQUEUE_PTRS and DEQUEUE_PTRS, but there it's split across multiple lines
> of code. The equivalent here would be:
>  
> struct nb_ring_entry *ring_base = (struct nb_ring_entry*)&r[1];
> ring_ptr = ring_base[tail & r->mask];
> 
> Which is more legible, I think.
The RTE ring buffer code is not very legible to start with.

> 
> There is no ring array structure in which to add a union; the ring array is a
> contiguous chunk of memory that immediately follows after the end of a struct
> rte_ring. We interpret the memory there according to the ring entry data type
> (void * for regular rings and struct nb_ring_entry for non-blocking rings).
My worry is that we are abusing the C language and creating a monster of fragile
C code that will be more and more difficult to understand and to maintain. At
some point you have to think the question "Are we doing the right thing?".

> 
> > 
> > 
> > > 
> > > +
> > > +		old_value = *ring_ptr;
> > > +
> > > +		/* If the tail entry's modification counter doesn't match
> > > the
> > > +		 * producer tail index, it's already been updated.
> > > +		 */
> > > +		if (old_value.cnt != tail)
> > > +			continue;
> > Continue restarts the loop at the condition test in the for statement, 'i'
> > and 'n'
> > are unchanged. Then we re-read 'prod_64.tail' and 'ring[tail & mask]'. If
> > some
> > other thread never updates 'prod_64.tail', the test here (ring[tail].cnt !=
> > tail) will
> > still be true and we will spin forever.
> > 
> > Waiting for other threads <=> blocking behaviour so this is not a non-
> > blocking
> > design.
> > 
> You're absolutely right. The if-statement was added as optimization to avoid
> 128-bit cmpset operations that are known to fail, but in this form it violates
> the non-blocking design.
> 
> I see two solutions: 1) drop the if-statement altogether, or 2) attempt to
> update prod_64.tail before continuing. Both require every thread to attempt to
> update prod_64.tail on every iteration, but #2 will result in fewer failed
> 128-bit cmpsets.
> 
> > 
> > > 
> > > +
> > > +		/* Prepare the new entry. The cnt field mitigates the ABA
> > > +		 * problem on the ring write.
> > > +		 */
> > > +		new_value.ptr = obj_table[i];
> > > +		new_value.cnt = tail + r->size;
> > > +
> > > +		if (rte_atomic128_cmpset((volatile rte_int128_t
> > > *)ring_ptr,
> > > +					 (rte_int128_t *)&old_value,
> > > +					 (rte_int128_t *)&new_value))
> > > +			i++;
> > > +
> > > +		/* Every thread attempts the cmpset, so they don't have
> > > to
> > > wait
> > > +		 * for the thread that successfully enqueued to the ring.
> > > +		 * Using a 64-bit tail mitigates the ABA problem here.
> > > +		 *
> > > +		 * Built-in used to handle variable-sized tail index.
> > > +		 */
> > But prod_64.tail is 64 bits so not really variable size?
> > 
> (See next comment)
> 
> > 
> > > 
> > > +		__sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail
> > > +
> > > 1);
> > What memory order is required here? Why not use
> > __atomic_compare_exchange() with explicit memory order parameters?
> > 
> This is an artifact from an older patchset that used uintptr_t, and before I
> learned that other platforms could support this non-blocking algorithm (hence
> the __sync type intrinsic was sufficient).
> 
> At any rate, as described earlier in this response, I plan on writing these
> functions using the __atomic builtins in the next patchset.
Great.

> 
> > 
> > > 
> > > +	}
> > > +
> > > +end:
> > > +	if (free_space != NULL)
> > > +		*free_space = free_entries - n;
> > > +	return n;
> > > +#endif
> > > +}
> > > +
> > > +/**
> > > + * @internal Enqueue several objects on the non-blocking ring
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param behavior
> > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > > +ring
> > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > > +the ring
> > > + * @param is_sp
> > > + *   Indicates whether to use single producer or multi-producer head
> > > +update
> > > + * @param free_space
> > > + *   returns the amount of space after the enqueue operation has
> > > +finished
> > > + * @return
> > > + *   Actual number of objects enqueued.
> > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
> > > +			 unsigned int n, enum rte_ring_queue_behavior
> > > behavior,
> > > +			 unsigned int is_sp, unsigned int *free_space) {
> > > +	if (is_sp)
> > > +		return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> > > +						   behavior, free_space);
> > > +	else
> > > +		return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> > > +						   behavior, free_space);
> > > +}
> > > +
> > > +/**
> > > + * @internal
> > > + *   Dequeue several objects from the non-blocking ring
> > > +(single-consumer
> > > only)
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param n
> > > + *   The number of objects to pull from the ring.
> > > + * @param behavior
> > > + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> > > + the ring
> > > + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> > > + the ring
> > > + * @param available
> > > + *   returns the number of remaining ring entries after the dequeue
> > > + has
> > > finished
> > > + * @return
> > > + *   - Actual number of objects dequeued.
> > > + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> > > +			    unsigned int n,
> > > +			    enum rte_ring_queue_behavior behavior,
> > > +			    unsigned int *available)
> > > +{
> > > +	size_t head, next;
> > > +	uint32_t entries;
> > > +
> > > +	n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> > > +					 &head, &next, &entries);
> > > +	if (n == 0)
> > > +		goto end;
> > > +
> > > +	DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > > +
> > > +	r->cons_64.tail += n;
> > Memory ordering? Consumer synchronises with producer.
> > 
> Agreed, that is missing here. Will fix.
> 
> Thanks,
> Gage
> 
> > 
> > --
> > Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype
> > ola.liljedahl
  
Eads, Gage Jan. 25, 2019, 5:21 p.m. UTC | #5
> -----Original Message-----
> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> Sent: Wednesday, January 23, 2019 4:16 AM
> To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> arybchenko@solarflare.com; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>
> Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking implementation
> 
> On Tue, 2019-01-22 at 21:31 +0000, Eads, Gage wrote:
> > Hi Ola,
> >
> > <snip>
> >
> > >
> > > >
> > > > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> > > > rte_ring *r);
> > > >  #endif
> > > >  #include "rte_ring_generic_64.h"
> > > >
> > > > +/* @internal 128-bit structure used by the non-blocking ring */
> > > > +struct nb_ring_entry {
> > > > +	void *ptr; /**< Data pointer */
> > > > +	uint64_t cnt; /**< Modification counter */
> > > Why not make 'cnt' uintptr_t? This way 32-bit architectures will
> > > also be supported. I think there are some claims that DPDK still supports e.g.
> > > ARMv7a
> > > and possibly also 32-bit x86?
> > I chose a 64-bit modification counter because (practically speaking)
> > the ABA problem will not occur with such a large counter -- definitely
> > not within my lifetime. See the "Discussion" section of the commit
> > message for more information.
> >
> > With a 32-bit counter, there is a very (very) low likelihood of it,
> > but it is possible. Personally, I don't feel comfortable providing
> > such code, because a) I doubt all users would understand the
> > implementation well enough to do the risk/reward analysis, and b) such
> > a bug would be near impossible to reproduce and root-cause if it did occur.
> With a 64-bit counter (and 32-bit pointer), 32-bit architectures (e.g. ARMv7a and
> probably x86 as well) won't be able to support this as they at best support 64-bit
> CAS (ARMv7a has LDREXD/STREXD). So you are essentially putting a 64-bit (and
> 128-bit CAS) requirement on the implementation.
> 

Yes, I am. I tried to make that clear in the cover letter.

> >
> > >
> > >
> > > >
> > > > +};
> > > > +
> > > > +/* The non-blocking ring algorithm is based on the original rte
> > > > +ring (derived
> > > > + * from FreeBSD's bufring.h) and inspired by Michael and Scott's
> > > > +non-blocking
> > > > + * concurrent queue.
> > > > + */
> > > > +
> > > > +/**
> > > > + * @internal
> > > > + *   Enqueue several objects on the non-blocking ring
> > > > +(single-producer only)
> > > > + *
> > > > + * @param r
> > > > + *   A pointer to the ring structure.
> > > > + * @param obj_table
> > > > + *   A pointer to a table of void * pointers (objects).
> > > > + * @param n
> > > > + *   The number of objects to add in the ring from the obj_table.
> > > > + * @param behavior
> > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to
> > > > +the ring
> > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > > > +to the ring
> > > > + * @param free_space
> > > > + *   returns the amount of space after the enqueue operation has
> > > > +finished
> > > > + * @return
> > > > + *   Actual number of objects enqueued.
> > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > + */
> > > > +static __rte_always_inline unsigned int
> > > > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
> > > > +			    unsigned int n,
> > > > +			    enum rte_ring_queue_behavior behavior,
> > > > +			    unsigned int *free_space)
> > > > +{
> > > > +	uint32_t free_entries;
> > > > +	size_t head, next;
> > > > +
> > > > +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> > > > +					 &head, &next, &free_entries);
> > > > +	if (n == 0)
> > > > +		goto end;
> > > > +
> > > > +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > > > +
> > > > +	r->prod_64.tail += n;
> > > Don't we need release order when (or smp_wmb between) writing of the
> > > ring pointers and the update of tail? By updating the tail pointer,
> > > we are synchronising with a consumer.
> > >
> > > I prefer using __atomic operations even for load and store. You can
> > > see which parts of the code that synchronise with each other, e.g.
> > > store-release to some location synchronises with load-acquire from
> > > the same location. If you don't know how different threads
> > > synchronise with each other, you are very likely to make mistakes.
> > >
> > You can tell this code was written when I thought x86-64 was the only
> > viable target :). Yes, you are correct.
> >
> > With regards to using __atomic intrinsics, I'm planning on taking a
> > similar approach to the functions duplicated in rte_ring_generic.h and
> > rte_ring_c11_mem.h: one version that uses rte_atomic functions (and
> > thus stricter memory ordering) and one that uses __atomic intrinsics
> > (and thus can benefit from more relaxed memory ordering).
> What's the advantage of having two different implementations? What is the
> disadvantage?
> 
> The existing ring buffer code originally had only the "legacy" implementation
> which was kept when the __atomic implementation was added. The reason
> claimed was that some older compilers for x86 do not support GCC __atomic
> builtins. But I thought there was consensus that new functionality could have
> only __atomic implementations.
> 

When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was left disabled for thunderx[1] for performance reasons. Assuming that hasn't changed, the advantage to having two versions is to best support all of DPDK's platforms. The disadvantage is of course duplicated code and the additional maintenance burden.

That said, if the thunderx maintainers are ok with it, I'm certainly open to only doing the __atomic version. Note that even in the __atomic version, based on Honnapa's findings[2], using a DPDK-defined rte_atomic128_cmpset() (with additional arguments to support machines with weak consistency) appears to be a better option than __atomic_compare_exchange_16.

I couldn't find the discussion about new functionality using __atomic going forward -- can you send a link?

[1] https://mails.dpdk.org/archives/dev/2017-December/082853.html
[2] http://mails.dpdk.org/archives/dev/2019-January/124002.html

> Does the non-blocking ring buffer implementation have to support these older
> compilers? Will the applications that require these older compiler be updated to
> utilise the non-blocking ring buffer?
> 

(See above -- compiler versions wasn't a consideration here.)

> >
> > >
> > > >
> > > > +
> > > > +end:
> > > > +	if (free_space != NULL)
> > > > +		*free_space = free_entries - n;
> > > > +	return n;
> > > > +}
> > > > +
> > > > +/**
> > > > + * @internal
> > > > + *   Enqueue several objects on the non-blocking ring
> > > > +(multi-producer
> > > > +safe)
> > > > + *
> > > > + * @param r
> > > > + *   A pointer to the ring structure.
> > > > + * @param obj_table
> > > > + *   A pointer to a table of void * pointers (objects).
> > > > + * @param n
> > > > + *   The number of objects to add in the ring from the obj_table.
> > > > + * @param behavior
> > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to
> > > > +the ring
> > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > > > +to the ring
> > > > + * @param free_space
> > > > + *   returns the amount of space after the enqueue operation has
> > > > +finished
> > > > + * @return
> > > > + *   Actual number of objects enqueued.
> > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > + */
> > > > +static __rte_always_inline unsigned int
> > > > +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const
> *obj_table,
> > > > +			    unsigned int n,
> > > > +			    enum rte_ring_queue_behavior behavior,
> > > > +			    unsigned int *free_space)
> > > > +{
> > > > +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> > > > +	RTE_SET_USED(r);
> > > > +	RTE_SET_USED(obj_table);
> > > > +	RTE_SET_USED(n);
> > > > +	RTE_SET_USED(behavior);
> > > > +	RTE_SET_USED(free_space);
> > > > +#ifndef ALLOW_EXPERIMENTAL_API
> > > > +	printf("[%s()] RING_F_NB requires an experimental API."
> > > > +	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> > > > +	       , __func__);
> > > > +#endif
> > > > +	return 0;
> > > > +#endif
> > > > +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> > > > +	size_t head, next, tail;
> > > > +	uint32_t free_entries;
> > > > +	unsigned int i;
> > > > +
> > > > +	n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> > > > +					 &head, &next, &free_entries);
> > > > +	if (n == 0)
> > > > +		goto end;
> > > > +
> > > > +	for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> > > > +		struct nb_ring_entry old_value, new_value;
> > > > +		struct nb_ring_entry *ring_ptr;
> > > > +
> > > > +		/* Enqueue to the tail entry. If another thread wins the
> > > > race,
> > > > +		 * retry with the new tail.
> > > > +		 */
> > > > +		tail = r->prod_64.tail;
> > > > +
> > > > +		ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r-
> > > > >mask];
> > > This is an ugly expression and cast. Also I think it is unnecessary.
> > > What's preventing this from being written without a cast? Perhaps
> > > the ring array needs to be a union of "void *" and struct
> > > nb_ring_entry?
> > The cast is necessary for the correct pointer arithmetic (let
> > "uintptr_t base == &r[1]"):
> Yes I know the C language.
> 
> > - With cast: ring_ptr = base + sizeof(struct nb_ring_entry) * (tail &
> > r-
> > >mask);
> > - W/o cast: ring_ptr = base + sizeof(struct rte_ring) * (tail &
> > r->mask);
> >
> > FWIW, this is essentially the same as is done with the second argument
> > (&r[1]) to ENQUEUE_PTRS and DEQUEUE_PTRS, but there it's split across
> > multiple lines of code. The equivalent here would be:
> >
> > struct nb_ring_entry *ring_base = (struct nb_ring_entry*)&r[1];
> > ring_ptr = ring_base[tail & r->mask];
> >
> > Which is more legible, I think.
> The RTE ring buffer code is not very legible to start with.
> 
> >
> > There is no ring array structure in which to add a union; the ring
> > array is a contiguous chunk of memory that immediately follows after
> > the end of a struct rte_ring. We interpret the memory there according
> > to the ring entry data type (void * for regular rings and struct nb_ring_entry for
> non-blocking rings).
> My worry is that we are abusing the C language and creating a monster of
> fragile C code that will be more and more difficult to understand and to
> maintain. At some point you have to think the question "Are we doing the right
> thing?".
>

I'm not aware of any fragility/maintainability issues in the ring code (though perhaps the maintainers have a different view!), and personally I find the code fairly legible. If you have a specific suggestion, I'll look into incorporating it.

Thanks,
Gage

</snip>
  
Ola Liljedahl Jan. 28, 2019, 10:35 a.m. UTC | #6
On Fri, 2019-01-25 at 17:21 +0000, Eads, Gage wrote:
> 
> > 
> > -----Original Message-----
> > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > Sent: Wednesday, January 23, 2019 4:16 AM
> > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > arybchenko@solarflare.com; Ananyev, Konstantin
> > <konstantin.ananyev@intel.com>
> > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > implementation
> > 
> > On Tue, 2019-01-22 at 21:31 +0000, Eads, Gage wrote:
> > > 
> > > Hi Ola,
> > > 
> > > <snip>
> > > 
> > > > 
> > > > 
> > > > > 
> > > > > 
> > > > > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> > > > > rte_ring *r);
> > > > >  #endif
> > > > >  #include "rte_ring_generic_64.h"
> > > > > 
> > > > > +/* @internal 128-bit structure used by the non-blocking ring */
> > > > > +struct nb_ring_entry {
> > > > > +	void *ptr; /**< Data pointer */
> > > > > +	uint64_t cnt; /**< Modification counter */
> > > > Why not make 'cnt' uintptr_t? This way 32-bit architectures will
> > > > also be supported. I think there are some claims that DPDK still
> > > > supports e.g.
> > > > ARMv7a
> > > > and possibly also 32-bit x86?
> > > I chose a 64-bit modification counter because (practically speaking)
> > > the ABA problem will not occur with such a large counter -- definitely
> > > not within my lifetime. See the "Discussion" section of the commit
> > > message for more information.
> > > 
> > > With a 32-bit counter, there is a very (very) low likelihood of it,
> > > but it is possible. Personally, I don't feel comfortable providing
> > > such code, because a) I doubt all users would understand the
> > > implementation well enough to do the risk/reward analysis, and b) such
> > > a bug would be near impossible to reproduce and root-cause if it did
> > > occur.
> > With a 64-bit counter (and 32-bit pointer), 32-bit architectures (e.g.
> > ARMv7a and
> > probably x86 as well) won't be able to support this as they at best support
> > 64-bit
> > CAS (ARMv7a has LDREXD/STREXD). So you are essentially putting a 64-bit (and
> > 128-bit CAS) requirement on the implementation.
> > 
> Yes, I am. I tried to make that clear in the cover letter.
> 
> > 
> > > 
> > > 
> > > > 
> > > > 
> > > > 
> > > > > 
> > > > > 
> > > > > +};
> > > > > +
> > > > > +/* The non-blocking ring algorithm is based on the original rte
> > > > > +ring (derived
> > > > > + * from FreeBSD's bufring.h) and inspired by Michael and Scott's
> > > > > +non-blocking
> > > > > + * concurrent queue.
> > > > > + */
> > > > > +
> > > > > +/**
> > > > > + * @internal
> > > > > + *   Enqueue several objects on the non-blocking ring
> > > > > +(single-producer only)
> > > > > + *
> > > > > + * @param r
> > > > > + *   A pointer to the ring structure.
> > > > > + * @param obj_table
> > > > > + *   A pointer to a table of void * pointers (objects).
> > > > > + * @param n
> > > > > + *   The number of objects to add in the ring from the obj_table.
> > > > > + * @param behavior
> > > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to
> > > > > +the ring
> > > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > > > > +to the ring
> > > > > + * @param free_space
> > > > > + *   returns the amount of space after the enqueue operation has
> > > > > +finished
> > > > > + * @return
> > > > > + *   Actual number of objects enqueued.
> > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > + */
> > > > > +static __rte_always_inline unsigned int
> > > > > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const
> > > > > *obj_table,
> > > > > +			    unsigned int n,
> > > > > +			    enum rte_ring_queue_behavior behavior,
> > > > > +			    unsigned int *free_space)
> > > > > +{
> > > > > +	uint32_t free_entries;
> > > > > +	size_t head, next;
> > > > > +
> > > > > +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> > > > > +					 &head, &next,
> > > > > &free_entries);
> > > > > +	if (n == 0)
> > > > > +		goto end;
> > > > > +
> > > > > +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > > > > +
> > > > > +	r->prod_64.tail += n;
> > > > Don't we need release order when (or smp_wmb between) writing of the
> > > > ring pointers and the update of tail? By updating the tail pointer,
> > > > we are synchronising with a consumer.
> > > > 
> > > > I prefer using __atomic operations even for load and store. You can
> > > > see which parts of the code that synchronise with each other, e.g.
> > > > store-release to some location synchronises with load-acquire from
> > > > the same location. If you don't know how different threads
> > > > synchronise with each other, you are very likely to make mistakes.
> > > > 
> > > You can tell this code was written when I thought x86-64 was the only
> > > viable target :). Yes, you are correct.
> > > 
> > > With regards to using __atomic intrinsics, I'm planning on taking a
> > > similar approach to the functions duplicated in rte_ring_generic.h and
> > > rte_ring_c11_mem.h: one version that uses rte_atomic functions (and
> > > thus stricter memory ordering) and one that uses __atomic intrinsics
> > > (and thus can benefit from more relaxed memory ordering).
From a code point of view, I strongly prefer the atomic operations to be visible
in the top level code, not hidden in subroutines. For correctness, it is vital
that memory accesses are performed with the required ordering and that acquire
and release matches up. Hiding e.g. load-acquire and store-release in
subroutines (in a different file!) make this difficult. There have already been
such bugs found in rte_ring.

> > What's the advantage of having two different implementations? What is the
> > disadvantage?
> > 
> > The existing ring buffer code originally had only the "legacy"
> > implementation
> > which was kept when the __atomic implementation was added. The reason
> > claimed was that some older compilers for x86 do not support GCC __atomic
> > builtins. But I thought there was consensus that new functionality could
> > have
> > only __atomic implementations.
> > 
> When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was left disabled
> for thunderx[1] for performance reasons. Assuming that hasn't changed, the
> advantage to having two versions is to best support all of DPDK's platforms.
> The disadvantage is of course duplicated code and the additional maintenance
> burden.
The only way I see that a C11 memory model implementation can be slower than
using smp_wmb/rmb is if you need to order loads before a synchronizing store and
there are also outstanding stores which do not require ordering. smp_rmb()
handles this while store-release will also (unnecessarily) order those
outstanding stores. This situation occurs e.g. in ring buffer dequeue operations
where ring slots are read (and possibly written to thread-private memory) before
the ring slots are release (e.g. using CAS-release or store-release).

I imagine that the LSU/cache subsystem on ThunderX/OCTEON-TX also have something
to do with this problem. If there are a large amounts of stores pending in the
load/store unit, store-release might have to wait for a long time before the
synchronizing store can complete.

> 
> That said, if the thunderx maintainers are ok with it, I'm certainly open to
> only doing the __atomic version. Note that even in the __atomic version, based
> on Honnapa's findings[2], using a DPDK-defined rte_atomic128_cmpset() (with
> additional arguments to support machines with weak consistency) appears to be
> a better option than __atomic_compare_exchange_16.
__atomic_compare_exchange_16() is not guaranteed to be lock-free. It is not
lock-free on ARM/AArch64 and the support in GCC is formally broken (can't use
cmpexchg16b to implement __atomic_load_16).

So yes, I think DPDK will have to define and implement the 128-bit atomic
compare and exchange operation (whatever it will be called). For compatibility
with ARMv8.0, we can't require the "old" value returned by a failed compare-
exchange operation to be read atomically (LDXP does not guaranteed atomicity by
itself). But this is seldom a problem, many designs read the memory location
using two separate 64-bit loads (so not atomic) anyway, it is a successful
atomic compare exchange operation which provides atomicity.

> 
> I couldn't find the discussion about new functionality using __atomic going
> forward -- can you send a link?
> 
> [1] https://mails.dpdk.org/archives/dev/2017-December/082853.html
> [2] http://mails.dpdk.org/archives/dev/2019-January/124002.html
> 
> > 
> > Does the non-blocking ring buffer implementation have to support these older
> > compilers? Will the applications that require these older compiler be
> > updated to
> > utilise the non-blocking ring buffer?
> > 
> (See above -- compiler versions wasn't a consideration here.)
> 
> > 
> > > 
> > > 
> > > > 
> > > > 
> > > > > 
> > > > > 
> > > > > +
> > > > > +end:
> > > > > +	if (free_space != NULL)
> > > > > +		*free_space = free_entries - n;
> > > > > +	return n;
> > > > > +}
> > > > > +
> > > > > +/**
> > > > > + * @internal
> > > > > + *   Enqueue several objects on the non-blocking ring
> > > > > +(multi-producer
> > > > > +safe)
> > > > > + *
> > > > > + * @param r
> > > > > + *   A pointer to the ring structure.
> > > > > + * @param obj_table
> > > > > + *   A pointer to a table of void * pointers (objects).
> > > > > + * @param n
> > > > > + *   The number of objects to add in the ring from the obj_table.
> > > > > + * @param behavior
> > > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to
> > > > > +the ring
> > > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > > > > +to the ring
> > > > > + * @param free_space
> > > > > + *   returns the amount of space after the enqueue operation has
> > > > > +finished
> > > > > + * @return
> > > > > + *   Actual number of objects enqueued.
> > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > + */
> > > > > +static __rte_always_inline unsigned int
> > > > > +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const
> > *obj_table,
> > > 
> > > > 
> > > > > 
> > > > > +			    unsigned int n,
> > > > > +			    enum rte_ring_queue_behavior behavior,
> > > > > +			    unsigned int *free_space)
> > > > > +{
> > > > > +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> > > > > +	RTE_SET_USED(r);
> > > > > +	RTE_SET_USED(obj_table);
> > > > > +	RTE_SET_USED(n);
> > > > > +	RTE_SET_USED(behavior);
> > > > > +	RTE_SET_USED(free_space);
> > > > > +#ifndef ALLOW_EXPERIMENTAL_API
> > > > > +	printf("[%s()] RING_F_NB requires an experimental API."
> > > > > +	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> > > > > +	       , __func__);
> > > > > +#endif
> > > > > +	return 0;
> > > > > +#endif
> > > > > +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> > > > > +	size_t head, next, tail;
> > > > > +	uint32_t free_entries;
> > > > > +	unsigned int i;
> > > > > +
> > > > > +	n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> > > > > +					 &head, &next,
> > > > > &free_entries);
> > > > > +	if (n == 0)
> > > > > +		goto end;
> > > > > +
> > > > > +	for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> > > > > +		struct nb_ring_entry old_value, new_value;
> > > > > +		struct nb_ring_entry *ring_ptr;
> > > > > +
> > > > > +		/* Enqueue to the tail entry. If another thread wins
> > > > > the
> > > > > race,
> > > > > +		 * retry with the new tail.
> > > > > +		 */
> > > > > +		tail = r->prod_64.tail;
> > > > > +
> > > > > +		ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r-
> > > > > > 
> > > > > > mask];
> > > > This is an ugly expression and cast. Also I think it is unnecessary.
> > > > What's preventing this from being written without a cast? Perhaps
> > > > the ring array needs to be a union of "void *" and struct
> > > > nb_ring_entry?
> > > The cast is necessary for the correct pointer arithmetic (let
> > > "uintptr_t base == &r[1]"):
> > Yes I know the C language.
> > 
> > > 
> > > - With cast: ring_ptr = base + sizeof(struct nb_ring_entry) * (tail &
> > > r-
> > > > 
> > > > mask);
> > > - W/o cast: ring_ptr = base + sizeof(struct rte_ring) * (tail &
> > > r->mask);
> > > 
> > > FWIW, this is essentially the same as is done with the second argument
> > > (&r[1]) to ENQUEUE_PTRS and DEQUEUE_PTRS, but there it's split across
> > > multiple lines of code. The equivalent here would be:
> > > 
> > > struct nb_ring_entry *ring_base = (struct nb_ring_entry*)&r[1];
> > > ring_ptr = ring_base[tail & r->mask];
> > > 
> > > Which is more legible, I think.
> > The RTE ring buffer code is not very legible to start with.
> > 
> > > 
> > > 
> > > There is no ring array structure in which to add a union; the ring
> > > array is a contiguous chunk of memory that immediately follows after
> > > the end of a struct rte_ring. We interpret the memory there according
> > > to the ring entry data type (void * for regular rings and struct
> > > nb_ring_entry for
> > non-blocking rings).
> > My worry is that we are abusing the C language and creating a monster of
> > fragile C code that will be more and more difficult to understand and to
> > maintain. At some point you have to think the question "Are we doing the
> > right
> > thing?".
> > 
> I'm not aware of any fragility/maintainability issues in the ring code (though
> perhaps the maintainers have a different view!), and personally I find the
> code fairly legible. If you have a specific suggestion, I'll look into
> incorporating it.
> 
> Thanks,
> Gage
> 
> </snip>
  
Jerin Jacob Kollanukkaran Jan. 28, 2019, 1:34 p.m. UTC | #7
On Fri, 2019-01-25 at 17:21 +0000, Eads, Gage wrote:
> > -----Original Message-----
> > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > Sent: Wednesday, January 23, 2019 4:16 AM
> > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > arybchenko@solarflare.com; Ananyev, Konstantin
> > <konstantin.ananyev@intel.com>
> > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > implementation
> > 
> > s.
> > > > 
> > > You can tell this code was written when I thought x86-64 was the
> > > only
> > > viable target :). Yes, you are correct.
> > > 
> > > With regards to using __atomic intrinsics, I'm planning on taking
> > > a
> > > similar approach to the functions duplicated in
> > > rte_ring_generic.h and
> > > rte_ring_c11_mem.h: one version that uses rte_atomic functions
> > > (and
> > > thus stricter memory ordering) and one that uses __atomic
> > > intrinsics
> > > (and thus can benefit from more relaxed memory ordering).
> > What's the advantage of having two different implementations? What
> > is the
> > disadvantage?
> > 
> > The existing ring buffer code originally had only the "legacy"
> > implementation
> > which was kept when the __atomic implementation was added. The
> > reason
> > claimed was that some older compilers for x86 do not support GCC
> > __atomic
> > builtins. But I thought there was consensus that new functionality
> > could have
> > only __atomic implementations.
> > 
> 
> When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was left
> disabled for thunderx[1] for performance reasons. Assuming that
> hasn't changed, the advantage to having two versions is to best
> support all of DPDK's platforms. The disadvantage is of course
> duplicated code and the additional maintenance burden.
> 
> That said, if the thunderx maintainers are ok with it, I'm certainly 

The ring code was so fundamental building block for DPDK, there was 
difference in performance and there was already legacy code so
introducing C11_MEM_MODEL was justified IMO. 

For the nonblocking implementation, I am happy to test with
three ARM64 microarchitectures and share the result with C11_MEM_MODEL
vs non C11_MEM_MODLE performance. We may need to consider PPC also
here. So IMO, based on the overall performance result may be can decide
the new code direction.
  
Ola Liljedahl Jan. 28, 2019, 1:43 p.m. UTC | #8
On Mon, 2019-01-28 at 13:34 +0000, Jerin Jacob Kollanukkaran wrote:
> On Fri, 2019-01-25 at 17:21 +0000, Eads, Gage wrote:
> > 
> > > 
> > > -----Original Message-----
> > > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > > Sent: Wednesday, January 23, 2019 4:16 AM
> > > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > > arybchenko@solarflare.com; Ananyev, Konstantin
> > > <konstantin.ananyev@intel.com>
> > > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > > implementation
> > > 
> > > s.
> > > > 
> > > > > 
> > > > > 
> > > > You can tell this code was written when I thought x86-64 was the
> > > > only
> > > > viable target :). Yes, you are correct.
> > > > 
> > > > With regards to using __atomic intrinsics, I'm planning on taking
> > > > a
> > > > similar approach to the functions duplicated in
> > > > rte_ring_generic.h and
> > > > rte_ring_c11_mem.h: one version that uses rte_atomic functions
> > > > (and
> > > > thus stricter memory ordering) and one that uses __atomic
> > > > intrinsics
> > > > (and thus can benefit from more relaxed memory ordering).
> > > What's the advantage of having two different implementations? What
> > > is the
> > > disadvantage?
> > > 
> > > The existing ring buffer code originally had only the "legacy"
> > > implementation
> > > which was kept when the __atomic implementation was added. The
> > > reason
> > > claimed was that some older compilers for x86 do not support GCC
> > > __atomic
> > > builtins. But I thought there was consensus that new functionality
> > > could have
> > > only __atomic implementations.
> > > 
> > When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was left
> > disabled for thunderx[1] for performance reasons. Assuming that
> > hasn't changed, the advantage to having two versions is to best
> > support all of DPDK's platforms. The disadvantage is of course
> > duplicated code and the additional maintenance burden.
> > 
> > That said, if the thunderx maintainers are ok with it, I'm certainly 
> The ring code was so fundamental building block for DPDK, there was 
> difference in performance and there was already legacy code so
> introducing C11_MEM_MODEL was justified IMO. 
> 
> For the nonblocking implementation, I am happy to test with
> three ARM64 microarchitectures and share the result with C11_MEM_MODEL
> vs non C11_MEM_MODLE performance.
We should ensure the C11 memory model version enforces minimal ordering
requirements:
1) when computing number of available slots, allow for underflow (head and tail
observed in unexpected order) instead of imposing read order with an additional
read barrier.
2) We could cheat a little and use an explicit LoadStore barrier instead of
 store-release/cas-release in dequeue (which only reads the ring). At least see
if this improves performance. See such a patch here:
https://github.com/ARM-software/progress64/commit/84c48e9c84100eb5b2d15e54f0dbf7
8dfa468805

Ideally, C/C++ would have an __ATOMIC_RELEASE_READSONLY memory model to use in
situations where the shared data was only read before being released.

>  We may need to consider PPC also
> here. So IMO, based on the overall performance result may be can decide
> the new code direction.
Does PPC (64-bit POWER?) have support for double-word (128-bit) CAS?

>
  
Jerin Jacob Kollanukkaran Jan. 28, 2019, 2:04 p.m. UTC | #9
On Mon, 2019-01-28 at 13:43 +0000, Ola Liljedahl wrote:
> On Mon, 2019-01-28 at 13:34 +0000, Jerin Jacob Kollanukkaran wrote:
> > On Fri, 2019-01-25 at 17:21 +0000, Eads, Gage wrote:
> > > > -----Original Message-----
> > > > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > > > Sent: Wednesday, January 23, 2019 4:16 AM
> > > > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > > > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > > > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > > > arybchenko@solarflare.com; Ananyev, Konstantin
> > > > <konstantin.ananyev@intel.com>
> > > > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > > > implementation
> > > > 
> > > > s.
> > > > > > 
> > > > > You can tell this code was written when I thought x86-64 was
> > > > > the
> > > > > only
> > > > > viable target :). Yes, you are correct.
> > > > > 
> > > > > With regards to using __atomic intrinsics, I'm planning on
> > > > > taking
> > > > > a
> > > > > similar approach to the functions duplicated in
> > > > > rte_ring_generic.h and
> > > > > rte_ring_c11_mem.h: one version that uses rte_atomic
> > > > > functions
> > > > > (and
> > > > > thus stricter memory ordering) and one that uses __atomic
> > > > > intrinsics
> > > > > (and thus can benefit from more relaxed memory ordering).
> > > > What's the advantage of having two different implementations?
> > > > What
> > > > is the
> > > > disadvantage?
> > > > 
> > > > The existing ring buffer code originally had only the "legacy"
> > > > implementation
> > > > which was kept when the __atomic implementation was added. The
> > > > reason
> > > > claimed was that some older compilers for x86 do not support
> > > > GCC
> > > > __atomic
> > > > builtins. But I thought there was consensus that new
> > > > functionality
> > > > could have
> > > > only __atomic implementations.
> > > > 
> > > When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was
> > > left
> > > disabled for thunderx[1] for performance reasons. Assuming that
> > > hasn't changed, the advantage to having two versions is to best
> > > support all of DPDK's platforms. The disadvantage is of course
> > > duplicated code and the additional maintenance burden.
> > > 
> > > That said, if the thunderx maintainers are ok with it, I'm
> > > certainly 
> > The ring code was so fundamental building block for DPDK, there
> > was 
> > difference in performance and there was already legacy code so
> > introducing C11_MEM_MODEL was justified IMO. 
> > 
> > For the nonblocking implementation, I am happy to test with
> > three ARM64 microarchitectures and share the result with
> > C11_MEM_MODEL
> > vs non C11_MEM_MODLE performance.
> We should ensure the C11 memory model version enforces minimal
> ordering
> requirements:

I agree.

I think, We should have enough test case for performance measurement in
order to choose algorithms and quantify the other variables like C11 vs
non C11, LDXP/STXP vs CASP etc.


> 1) when computing number of available slots, allow for underflow
> (head and tail
> observed in unexpected order) instead of imposing read order with an
> additional
> read barrier.
> 2) We could cheat a little and use an explicit LoadStore barrier
> instead of
>  store-release/cas-release in dequeue (which only reads the ring). At
> least see
> if this improves performance. See such a patch here:
> https://github.com/ARM-software/progress64/commit/84c48e9c84100eb5b2d15e54f0dbf7
> 8dfa468805
> 
> Ideally, C/C++ would have an __ATOMIC_RELEASE_READSONLY memory model
> to use in
> situations where the shared data was only read before being released.
> 
> >  We may need to consider PPC also
> > here. So IMO, based on the overall performance result may be can
> > decide
> > the new code direction.
> Does PPC (64-bit POWER?) have support for double-word (128-bit) CAS?

I dont know, I was telling wrt in general C11 mem model for PPC.

> 
> -- 
> Ola Liljedahl, Networking System Architect, Arm
> Phone +46706866373, Skype ola.liljedahl
>
  
Ola Liljedahl Jan. 28, 2019, 2:06 p.m. UTC | #10
On Mon, 2019-01-28 at 14:04 +0000, Jerin Jacob Kollanukkaran wrote:
> > Does PPC (64-bit POWER?) have support for double-word (128-bit) CAS?
> 
> I dont know, I was telling wrt in general C11 mem model for PPC.
Sorry, I misunderstood.
  
Eads, Gage Jan. 28, 2019, 6:54 p.m. UTC | #11
> -----Original Message-----
> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> Sent: Monday, January 28, 2019 4:36 AM
> To: jerinj@marvell.com; mczekaj@marvell.com; Eads, Gage
> <gage.eads@intel.com>; dev@dpdk.org
> Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> arybchenko@solarflare.com; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>
> Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking implementation
> 
> On Fri, 2019-01-25 at 17:21 +0000, Eads, Gage wrote:
> >
> > >
> > > -----Original Message-----
> > > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > > Sent: Wednesday, January 23, 2019 4:16 AM
> > > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > > arybchenko@solarflare.com; Ananyev, Konstantin
> > > <konstantin.ananyev@intel.com>
> > > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > > implementation
> > >
> > > On Tue, 2019-01-22 at 21:31 +0000, Eads, Gage wrote:
> > > >
> > > > Hi Ola,
> > > >
> > > > <snip>
> > > >
> > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> > > > > > rte_ring *r);
> > > > > >  #endif
> > > > > >  #include "rte_ring_generic_64.h"
> > > > > >
> > > > > > +/* @internal 128-bit structure used by the non-blocking ring
> > > > > > +*/ struct nb_ring_entry {
> > > > > > +	void *ptr; /**< Data pointer */
> > > > > > +	uint64_t cnt; /**< Modification counter */
> > > > > Why not make 'cnt' uintptr_t? This way 32-bit architectures will
> > > > > also be supported. I think there are some claims that DPDK still
> > > > > supports e.g.
> > > > > ARMv7a
> > > > > and possibly also 32-bit x86?
> > > > I chose a 64-bit modification counter because (practically
> > > > speaking) the ABA problem will not occur with such a large counter
> > > > -- definitely not within my lifetime. See the "Discussion" section
> > > > of the commit message for more information.
> > > >
> > > > With a 32-bit counter, there is a very (very) low likelihood of
> > > > it, but it is possible. Personally, I don't feel comfortable
> > > > providing such code, because a) I doubt all users would understand
> > > > the implementation well enough to do the risk/reward analysis, and
> > > > b) such a bug would be near impossible to reproduce and root-cause
> > > > if it did occur.
> > > With a 64-bit counter (and 32-bit pointer), 32-bit architectures (e.g.
> > > ARMv7a and
> > > probably x86 as well) won't be able to support this as they at best
> > > support 64-bit CAS (ARMv7a has LDREXD/STREXD). So you are
> > > essentially putting a 64-bit (and 128-bit CAS) requirement on the
> > > implementation.
> > >
> > Yes, I am. I tried to make that clear in the cover letter.
> >
> > >
> > > >
> > > >
> > > > >
> > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > +};
> > > > > > +
> > > > > > +/* The non-blocking ring algorithm is based on the original
> > > > > > +rte ring (derived
> > > > > > + * from FreeBSD's bufring.h) and inspired by Michael and
> > > > > > +Scott's non-blocking
> > > > > > + * concurrent queue.
> > > > > > + */
> > > > > > +
> > > > > > +/**
> > > > > > + * @internal
> > > > > > + *   Enqueue several objects on the non-blocking ring
> > > > > > +(single-producer only)
> > > > > > + *
> > > > > > + * @param r
> > > > > > + *   A pointer to the ring structure.
> > > > > > + * @param obj_table
> > > > > > + *   A pointer to a table of void * pointers (objects).
> > > > > > + * @param n
> > > > > > + *   The number of objects to add in the ring from the obj_table.
> > > > > > + * @param behavior
> > > > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items
> > > > > > +to the ring
> > > > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as
> > > > > > +possible to the ring
> > > > > > + * @param free_space
> > > > > > + *   returns the amount of space after the enqueue operation
> > > > > > +has finished
> > > > > > + * @return
> > > > > > + *   Actual number of objects enqueued.
> > > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > > + */
> > > > > > +static __rte_always_inline unsigned int
> > > > > > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const
> > > > > > *obj_table,
> > > > > > +			    unsigned int n,
> > > > > > +			    enum rte_ring_queue_behavior behavior,
> > > > > > +			    unsigned int *free_space) {
> > > > > > +	uint32_t free_entries;
> > > > > > +	size_t head, next;
> > > > > > +
> > > > > > +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> > > > > > +					 &head, &next,
> > > > > > &free_entries);
> > > > > > +	if (n == 0)
> > > > > > +		goto end;
> > > > > > +
> > > > > > +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > > > > > +
> > > > > > +	r->prod_64.tail += n;
> > > > > Don't we need release order when (or smp_wmb between) writing of
> > > > > the ring pointers and the update of tail? By updating the tail
> > > > > pointer, we are synchronising with a consumer.
> > > > >
> > > > > I prefer using __atomic operations even for load and store. You
> > > > > can see which parts of the code that synchronise with each other, e.g.
> > > > > store-release to some location synchronises with load-acquire
> > > > > from the same location. If you don't know how different threads
> > > > > synchronise with each other, you are very likely to make mistakes.
> > > > >
> > > > You can tell this code was written when I thought x86-64 was the
> > > > only viable target :). Yes, you are correct.
> > > >
> > > > With regards to using __atomic intrinsics, I'm planning on taking
> > > > a similar approach to the functions duplicated in
> > > > rte_ring_generic.h and
> > > > rte_ring_c11_mem.h: one version that uses rte_atomic functions
> > > > (and thus stricter memory ordering) and one that uses __atomic
> > > > intrinsics (and thus can benefit from more relaxed memory ordering).
> From a code point of view, I strongly prefer the atomic operations to be visible
> in the top level code, not hidden in subroutines. For correctness, it is vital that
> memory accesses are performed with the required ordering and that acquire and
> release matches up. Hiding e.g. load-acquire and store-release in subroutines (in
> a different file!) make this difficult. There have already been such bugs found in
> rte_ring.
> 

After working on the acq/rel ordering this weekend, I agree. This'll be easier/cleaner if we end up only using the C11 version.

> > > What's the advantage of having two different implementations? What
> > > is the disadvantage?
> > >
> > > The existing ring buffer code originally had only the "legacy"
> > > implementation
> > > which was kept when the __atomic implementation was added. The
> > > reason claimed was that some older compilers for x86 do not support
> > > GCC __atomic builtins. But I thought there was consensus that new
> > > functionality could have only __atomic implementations.
> > >
> > When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was left
> > disabled for thunderx[1] for performance reasons. Assuming that hasn't
> > changed, the advantage to having two versions is to best support all of DPDK's
> platforms.
> > The disadvantage is of course duplicated code and the additional
> > maintenance burden.
> The only way I see that a C11 memory model implementation can be slower
> than using smp_wmb/rmb is if you need to order loads before a synchronizing
> store and there are also outstanding stores which do not require ordering.
> smp_rmb() handles this while store-release will also (unnecessarily) order those
> outstanding stores. This situation occurs e.g. in ring buffer dequeue operations
> where ring slots are read (and possibly written to thread-private memory) before
> the ring slots are release (e.g. using CAS-release or store-release).
> 
> I imagine that the LSU/cache subsystem on ThunderX/OCTEON-TX also have
> something to do with this problem. If there are a large amounts of stores
> pending in the load/store unit, store-release might have to wait for a long time
> before the synchronizing store can complete.
> 
> >
> > That said, if the thunderx maintainers are ok with it, I'm certainly
> > open to only doing the __atomic version. Note that even in the
> > __atomic version, based on Honnapa's findings[2], using a DPDK-defined
> > rte_atomic128_cmpset() (with additional arguments to support machines
> > with weak consistency) appears to be a better option than
> __atomic_compare_exchange_16.
> __atomic_compare_exchange_16() is not guaranteed to be lock-free. It is not
> lock-free on ARM/AArch64 and the support in GCC is formally broken (can't use
> cmpexchg16b to implement __atomic_load_16).
> 
> So yes, I think DPDK will have to define and implement the 128-bit atomic
> compare and exchange operation (whatever it will be called). For compatibility
> with ARMv8.0, we can't require the "old" value returned by a failed compare-
> exchange operation to be read atomically (LDXP does not guaranteed atomicity
> by itself). But this is seldom a problem, many designs read the memory location
> using two separate 64-bit loads (so not atomic) anyway, it is a successful atomic
> compare exchange operation which provides atomicity.
> 

Ok. I agree, I don't expect that to be a problem. The 128-bit CAS patch I just submitted[1] (which was developed before reading this) will have to be changed.

[1] http://mails.dpdk.org/archives/dev/2019-January/124159.html

Thanks,
Gage

</snip>
  
Eads, Gage Jan. 28, 2019, 6:59 p.m. UTC | #12
> -----Original Message-----
> From: Jerin Jacob Kollanukkaran [mailto:jerinj@marvell.com]
> Sent: Monday, January 28, 2019 7:34 AM
> To: Ola.Liljedahl@arm.com; Maciej Czekaj <mczekaj@marvell.com>; Eads, Gage
> <gage.eads@intel.com>; dev@dpdk.org
> Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd@arm.com;
> Richardson, Bruce <bruce.richardson@intel.com>; arybchenko@solarflare.com;
> Ananyev, Konstantin <konstantin.ananyev@intel.com>
> Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking implementation
> 
> On Fri, 2019-01-25 at 17:21 +0000, Eads, Gage wrote:
> > > -----Original Message-----
> > > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > > Sent: Wednesday, January 23, 2019 4:16 AM
> > > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > > arybchenko@solarflare.com; Ananyev, Konstantin
> > > <konstantin.ananyev@intel.com>
> > > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > > implementation
> > >
> > > s.
> > > > >
> > > > You can tell this code was written when I thought x86-64 was the
> > > > only viable target :). Yes, you are correct.
> > > >
> > > > With regards to using __atomic intrinsics, I'm planning on taking
> > > > a similar approach to the functions duplicated in
> > > > rte_ring_generic.h and
> > > > rte_ring_c11_mem.h: one version that uses rte_atomic functions
> > > > (and thus stricter memory ordering) and one that uses __atomic
> > > > intrinsics (and thus can benefit from more relaxed memory
> > > > ordering).
> > > What's the advantage of having two different implementations? What
> > > is the disadvantage?
> > >
> > > The existing ring buffer code originally had only the "legacy"
> > > implementation
> > > which was kept when the __atomic implementation was added. The
> > > reason claimed was that some older compilers for x86 do not support
> > > GCC __atomic builtins. But I thought there was consensus that new
> > > functionality could have only __atomic implementations.
> > >
> >
> > When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was left
> > disabled for thunderx[1] for performance reasons. Assuming that hasn't
> > changed, the advantage to having two versions is to best support all
> > of DPDK's platforms. The disadvantage is of course duplicated code and
> > the additional maintenance burden.
> >
> > That said, if the thunderx maintainers are ok with it, I'm certainly
> 
> The ring code was so fundamental building block for DPDK, there was difference
> in performance and there was already legacy code so introducing
> C11_MEM_MODEL was justified IMO.
> 
> For the nonblocking implementation, I am happy to test with three ARM64
> microarchitectures and share the result with C11_MEM_MODEL vs non
> C11_MEM_MODLE performance. We may need to consider PPC also here. So
> IMO, based on the overall performance result may be can decide the new code
> direction.

Appreciate the help. Please hold off any testing until we've had a chance to incorporate ideas from lfring, which will definitely affect performance.
  
Ola Liljedahl Jan. 28, 2019, 10:31 p.m. UTC | #13
On Mon, 2019-01-28 at 18:54 +0000, Eads, Gage wrote:
> 
> > 
> > -----Original Message-----
> > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > Sent: Monday, January 28, 2019 4:36 AM
> > To: jerinj@marvell.com; mczekaj@marvell.com; Eads, Gage
> > <gage.eads@intel.com>; dev@dpdk.org
> > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > arybchenko@solarflare.com; Ananyev, Konstantin
> > <konstantin.ananyev@intel.com>
> > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > implementation
> > 
> > On Fri, 2019-01-25 at 17:21 +0000, Eads, Gage wrote:
> > > 
> > > 
> > > > 
> > > > 
> > > > -----Original Message-----
> > > > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > > > Sent: Wednesday, January 23, 2019 4:16 AM
> > > > To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> > > > Cc: olivier.matz@6wind.com; stephen@networkplumber.org; nd
> > > > <nd@arm.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> > > > arybchenko@solarflare.com; Ananyev, Konstantin
> > > > <konstantin.ananyev@intel.com>
> > > > Subject: Re: [dpdk-dev] [PATCH v3 2/5] ring: add a non-blocking
> > > > implementation
> > > > 
> > > > On Tue, 2019-01-22 at 21:31 +0000, Eads, Gage wrote:
> > > > > 
> > > > > 
> > > > > Hi Ola,
> > > > > 
> > > > > <snip>
> > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> > > > > > > rte_ring *r);
> > > > > > >  #endif
> > > > > > >  #include "rte_ring_generic_64.h"
> > > > > > > 
> > > > > > > +/* @internal 128-bit structure used by the non-blocking ring
> > > > > > > +*/ struct nb_ring_entry {
> > > > > > > +	void *ptr; /**< Data pointer */
> > > > > > > +	uint64_t cnt; /**< Modification counter */
> > > > > > Why not make 'cnt' uintptr_t? This way 32-bit architectures will
> > > > > > also be supported. I think there are some claims that DPDK still
> > > > > > supports e.g.
> > > > > > ARMv7a
> > > > > > and possibly also 32-bit x86?
> > > > > I chose a 64-bit modification counter because (practically
> > > > > speaking) the ABA problem will not occur with such a large counter
> > > > > -- definitely not within my lifetime. See the "Discussion" section
> > > > > of the commit message for more information.
> > > > > 
> > > > > With a 32-bit counter, there is a very (very) low likelihood of
> > > > > it, but it is possible. Personally, I don't feel comfortable
> > > > > providing such code, because a) I doubt all users would understand
> > > > > the implementation well enough to do the risk/reward analysis, and
> > > > > b) such a bug would be near impossible to reproduce and root-cause
> > > > > if it did occur.
> > > > With a 64-bit counter (and 32-bit pointer), 32-bit architectures (e.g.
> > > > ARMv7a and
> > > > probably x86 as well) won't be able to support this as they at best
> > > > support 64-bit CAS (ARMv7a has LDREXD/STREXD). So you are
> > > > essentially putting a 64-bit (and 128-bit CAS) requirement on the
> > > > implementation.
> > > > 
> > > Yes, I am. I tried to make that clear in the cover letter.
> > > 
> > > > 
> > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > +};
> > > > > > > +
> > > > > > > +/* The non-blocking ring algorithm is based on the original
> > > > > > > +rte ring (derived
> > > > > > > + * from FreeBSD's bufring.h) and inspired by Michael and
> > > > > > > +Scott's non-blocking
> > > > > > > + * concurrent queue.
> > > > > > > + */
> > > > > > > +
> > > > > > > +/**
> > > > > > > + * @internal
> > > > > > > + *   Enqueue several objects on the non-blocking ring
> > > > > > > +(single-producer only)
> > > > > > > + *
> > > > > > > + * @param r
> > > > > > > + *   A pointer to the ring structure.
> > > > > > > + * @param obj_table
> > > > > > > + *   A pointer to a table of void * pointers (objects).
> > > > > > > + * @param n
> > > > > > > + *   The number of objects to add in the ring from the obj_table.
> > > > > > > + * @param behavior
> > > > > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items
> > > > > > > +to the ring
> > > > > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as
> > > > > > > +possible to the ring
> > > > > > > + * @param free_space
> > > > > > > + *   returns the amount of space after the enqueue operation
> > > > > > > +has finished
> > > > > > > + * @return
> > > > > > > + *   Actual number of objects enqueued.
> > > > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n
> > > > > > > only.
> > > > > > > + */
> > > > > > > +static __rte_always_inline unsigned int
> > > > > > > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const
> > > > > > > *obj_table,
> > > > > > > +			    unsigned int n,
> > > > > > > +			    enum rte_ring_queue_behavior
> > > > > > > behavior,
> > > > > > > +			    unsigned int *free_space) {
> > > > > > > +	uint32_t free_entries;
> > > > > > > +	size_t head, next;
> > > > > > > +
> > > > > > > +	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> > > > > > > +					 &head, &next,
> > > > > > > &free_entries);
> > > > > > > +	if (n == 0)
> > > > > > > +		goto end;
> > > > > > > +
> > > > > > > +	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > > > > > > +
> > > > > > > +	r->prod_64.tail += n;
> > > > > > Don't we need release order when (or smp_wmb between) writing of
> > > > > > the ring pointers and the update of tail? By updating the tail
> > > > > > pointer, we are synchronising with a consumer.
> > > > > > 
> > > > > > I prefer using __atomic operations even for load and store. You
> > > > > > can see which parts of the code that synchronise with each other,
> > > > > > e.g.
> > > > > > store-release to some location synchronises with load-acquire
> > > > > > from the same location. If you don't know how different threads
> > > > > > synchronise with each other, you are very likely to make mistakes.
> > > > > > 
> > > > > You can tell this code was written when I thought x86-64 was the
> > > > > only viable target :). Yes, you are correct.
> > > > > 
> > > > > With regards to using __atomic intrinsics, I'm planning on taking
> > > > > a similar approach to the functions duplicated in
> > > > > rte_ring_generic.h and
> > > > > rte_ring_c11_mem.h: one version that uses rte_atomic functions
> > > > > (and thus stricter memory ordering) and one that uses __atomic
> > > > > intrinsics (and thus can benefit from more relaxed memory ordering).
> > From a code point of view, I strongly prefer the atomic operations to be
> > visible
> > in the top level code, not hidden in subroutines. For correctness, it is
> > vital that
> > memory accesses are performed with the required ordering and that acquire
> > and
> > release matches up. Hiding e.g. load-acquire and store-release in
> > subroutines (in
> > a different file!) make this difficult. There have already been such bugs
> > found in
> > rte_ring.
> > 
> After working on the acq/rel ordering this weekend, I agree. This'll be
> easier/cleaner if we end up only using the C11 version.
Fabulous!

As I wrote in a response to Jerin, with a small cheat (LoadStore fence+store-
relaxed instead of store-release in the dequeue function where we only read
shared data in the critical section), C11 should provide the same ordering and
thus the same performance as the explicit barrier version. Benchmarking will
show.

> 
> > 
> > > 
> > > > 
> > > > What's the advantage of having two different implementations? What
> > > > is the disadvantage?
> > > > 
> > > > The existing ring buffer code originally had only the "legacy"
> > > > implementation
> > > > which was kept when the __atomic implementation was added. The
> > > > reason claimed was that some older compilers for x86 do not support
> > > > GCC __atomic builtins. But I thought there was consensus that new
> > > > functionality could have only __atomic implementations.
> > > > 
> > > When CONFIG_RTE_RING_USE_C11_MEM_MODEL was introduced, it was left
> > > disabled for thunderx[1] for performance reasons. Assuming that hasn't
> > > changed, the advantage to having two versions is to best support all of
> > > DPDK's
> > platforms.
> > > 
> > > The disadvantage is of course duplicated code and the additional
> > > maintenance burden.
> > The only way I see that a C11 memory model implementation can be slower
> > than using smp_wmb/rmb is if you need to order loads before a synchronizing
> > store and there are also outstanding stores which do not require ordering.
> > smp_rmb() handles this while store-release will also (unnecessarily) order
> > those
> > outstanding stores. This situation occurs e.g. in ring buffer dequeue
> > operations
> > where ring slots are read (and possibly written to thread-private memory)
> > before
> > the ring slots are release (e.g. using CAS-release or store-release).
> > 
> > I imagine that the LSU/cache subsystem on ThunderX/OCTEON-TX also have
> > something to do with this problem. If there are a large amounts of stores
> > pending in the load/store unit, store-release might have to wait for a long
> > time
> > before the synchronizing store can complete.
> > 
> > > 
> > > 
> > > That said, if the thunderx maintainers are ok with it, I'm certainly
> > > open to only doing the __atomic version. Note that even in the
> > > __atomic version, based on Honnapa's findings[2], using a DPDK-defined
> > > rte_atomic128_cmpset() (with additional arguments to support machines
> > > with weak consistency) appears to be a better option than
> > __atomic_compare_exchange_16.
> > __atomic_compare_exchange_16() is not guaranteed to be lock-free. It is not
> > lock-free on ARM/AArch64 and the support in GCC is formally broken (can't
> > use
> > cmpexchg16b to implement __atomic_load_16).
> > 
> > So yes, I think DPDK will have to define and implement the 128-bit atomic
> > compare and exchange operation (whatever it will be called). For
> > compatibility
> > with ARMv8.0, we can't require the "old" value returned by a failed compare-
> > exchange operation to be read atomically (LDXP does not guaranteed atomicity
> > by itself). But this is seldom a problem, many designs read the memory
> > location
> > using two separate 64-bit loads (so not atomic) anyway, it is a successful
> > atomic
> > compare exchange operation which provides atomicity.
> > 
> Ok. I agree, I don't expect that to be a problem. The 128-bit CAS patch I just
> submitted[1] (which was developed before reading this) will have to be
> changed.
> 
> [1] http://mails.dpdk.org/archives/dev/2019-January/124159.html
I will take a look and comnment on this.

> 
> Thanks,
> Gage
> 
> </snip>
  

Patch

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d215acecc..f3378dccd 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -45,9 +45,9 @@  EAL_REGISTER_TAILQ(rte_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
 {
-	ssize_t sz;
+	ssize_t sz, elt_sz;
 
 	/* count must be a power of 2 */
 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
@@ -57,10 +57,23 @@  rte_ring_get_memsize(unsigned count)
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *);
+
+	sz = sizeof(struct rte_ring) + count * elt_sz;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
+					       unsigned int flags),
+		  rte_ring_get_memsize_v1905);
+
+ssize_t
+rte_ring_get_memsize_v20(unsigned int count)
+{
+	return rte_ring_get_memsize_v1905(count, 0);
+}
+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
 
 int
 rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
@@ -82,8 +95,6 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	if (ret < 0 || ret >= (int)sizeof(r->name))
 		return -ENAMETOOLONG;
 	r->flags = flags;
-	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
-	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
 
 	if (flags & RING_F_EXACT_SZ) {
 		r->size = rte_align32pow2(count + 1);
@@ -100,8 +111,30 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 		r->mask = count - 1;
 		r->capacity = r->mask;
 	}
-	r->prod.head = r->cons.head = 0;
-	r->prod.tail = r->cons.tail = 0;
+
+	if (flags & RING_F_NB) {
+		uint64_t i;
+
+		r->prod_64.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+		r->cons_64.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+		r->prod_64.head = r->cons_64.head = 0;
+		r->prod_64.tail = r->cons_64.tail = 0;
+
+		for (i = 0; i < r->size; i++) {
+			struct nb_ring_entry *ring_ptr, *base;
+
+			base = ((struct nb_ring_entry *)&r[1]);
+
+			ring_ptr = &base[i & r->mask];
+
+			ring_ptr->cnt = i;
+		}
+	} else {
+		r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+		r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+		r->prod.head = r->cons.head = 0;
+		r->prod.tail = r->cons.tail = 0;
+	}
 
 	return 0;
 }
@@ -123,11 +156,19 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 
 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
 
+#if !defined(RTE_ARCH_X86_64)
+	if (flags & RING_F_NB) {
+		printf("RING_F_NB is only supported on x86-64 platforms\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+#endif
+
 	/* for an exact size ring, round up from count to a power of two */
 	if (flags & RING_F_EXACT_SZ)
 		count = rte_align32pow2(count + 1);
 
-	ring_size = rte_ring_get_memsize(count);
+	ring_size = rte_ring_get_memsize(count, flags);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -227,10 +268,17 @@  rte_ring_dump(FILE *f, const struct rte_ring *r)
 	fprintf(f, "  flags=%x\n", r->flags);
 	fprintf(f, "  size=%"PRIu32"\n", r->size);
 	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
-	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
-	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	if (r->flags & RING_F_NB) {
+		fprintf(f, "  ct=%"PRIu64"\n", r->cons_64.tail);
+		fprintf(f, "  ch=%"PRIu64"\n", r->cons_64.head);
+		fprintf(f, "  pt=%"PRIu64"\n", r->prod_64.tail);
+		fprintf(f, "  ph=%"PRIu64"\n", r->prod_64.head);
+	} else {
+		fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
+		fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
+		fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
+		fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	}
 	fprintf(f, "  used=%u\n", rte_ring_count(r));
 	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
 }
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index b270a4746..08c9de6a6 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -134,6 +134,18 @@  struct rte_ring {
  */
 #define RING_F_EXACT_SZ 0x0004
 #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+/**
+ * The ring uses non-blocking enqueue and dequeue functions. These functions
+ * do not have the "non-preemptive" constraint of a regular rte ring, and thus
+ * are suited for applications using preemptible pthreads. However, the
+ * non-blocking functions have worse average-case performance than their
+ * regular rte ring counterparts. When used as the handler for a mempool,
+ * per-thread caching can mitigate the performance difference by reducing the
+ * number (and contention) of ring accesses.
+ *
+ * This flag is only supported on x86_64 platforms.
+ */
+#define RING_F_NB 0x0008
 
 /* @internal defines for passing to the enqueue dequeue worker functions */
 #define __IS_SP 1
@@ -151,11 +163,15 @@  struct rte_ring {
  *
  * @param count
  *   The number of elements in the ring (must be a power of 2).
+ * @param flags
+ *   The flags the ring will be created with.
  * @return
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
+ssize_t rte_ring_get_memsize_v20(unsigned int count);
+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
 
 /**
  * Initialize a ring structure.
@@ -188,6 +204,10 @@  ssize_t rte_ring_get_memsize(unsigned count);
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
+ *      non-blocking variants of the dequeue and enqueue functions.
  * @return
  *   0 on success, or a negative value on error.
  */
@@ -223,12 +243,17 @@  int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
+ *      non-blocking variants of the dequeue and enqueue functions.
  * @return
  *   On success, the pointer to the new allocated ring. NULL on error with
  *    rte_errno set appropriately. Possible errno values include:
  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
  *    - E_RTE_SECONDARY - function was called from a secondary process instance
- *    - EINVAL - count provided is not a power of 2
+ *    - EINVAL - count provided is not a power of 2, or RING_F_NB is used on an
+ *      unsupported platform
  *    - ENOSPC - the maximum number of memzones has already been allocated
  *    - EEXIST - a memzone with the same name already exists
  *    - ENOMEM - no appropriate memory area found in which to create memzone
@@ -284,6 +309,50 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	} \
 } while (0)
 
+/* The actual enqueue of pointers on the ring.
+ * Used only by the single-producer non-blocking enqueue function, but
+ * out-lined here for code readability.
+ */
+#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \
+	unsigned int i; \
+	const uint32_t size = (r)->size; \
+	size_t idx = prod_head & (r)->mask; \
+	size_t new_cnt = prod_head + size; \
+	struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
+			ring[idx].ptr = obj_table[i]; \
+			ring[idx].cnt = new_cnt + i;  \
+			ring[idx + 1].ptr = obj_table[i + 1]; \
+			ring[idx + 1].cnt = new_cnt + i + 1;  \
+			ring[idx + 2].ptr = obj_table[i + 2]; \
+			ring[idx + 2].cnt = new_cnt + i + 2;  \
+			ring[idx + 3].ptr = obj_table[i + 3]; \
+			ring[idx + 3].cnt = new_cnt + i + 3;  \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			ring[idx].cnt = new_cnt + i; \
+			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+		case 2: \
+			ring[idx].cnt = new_cnt + i; \
+			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+		case 1: \
+			ring[idx].cnt = new_cnt + i; \
+			ring[idx++].ptr = obj_table[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) { \
+			ring[idx].cnt = new_cnt + i;  \
+			ring[idx].ptr = obj_table[i]; \
+		} \
+		for (idx = 0; i < n; i++, idx++) {    \
+			ring[idx].cnt = new_cnt + i;  \
+			ring[idx].ptr = obj_table[i]; \
+		} \
+	} \
+} while (0)
+
 /* the actual copy of pointers on the ring to obj_table.
  * Placed here since identical code needed in both
  * single and multi consumer dequeue functions */
@@ -315,6 +384,39 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	} \
 } while (0)
 
+/* The actual copy of pointers on the ring to obj_table.
+ * Placed here since identical code needed in both
+ * single and multi consumer non-blocking dequeue functions.
+ */
+#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \
+	unsigned int i; \
+	size_t idx = cons_head & (r)->mask; \
+	const uint32_t size = (r)->size; \
+	struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
+			obj_table[i] = ring[idx].ptr; \
+			obj_table[i + 1] = ring[idx + 1].ptr; \
+			obj_table[i + 2] = ring[idx + 2].ptr; \
+			obj_table[i + 3] = ring[idx + 3].ptr; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+		case 2: \
+			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+		case 1: \
+			obj_table[i++] = ring[idx++].ptr; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			obj_table[i] = ring[idx].ptr; \
+		for (idx = 0; i < n; i++, idx++) \
+			obj_table[i] = ring[idx].ptr; \
+	} \
+} while (0)
+
+
 /* Between load and load. there might be cpu reorder in weak model
  * (powerpc/arm).
  * There are 2 choices for the users
@@ -331,6 +433,319 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
 #endif
 #include "rte_ring_generic_64.h"
 
+/* @internal 128-bit structure used by the non-blocking ring */
+struct nb_ring_entry {
+	void *ptr; /**< Data pointer */
+	uint64_t cnt; /**< Modification counter */
+};
+
+/* The non-blocking ring algorithm is based on the original rte ring (derived
+ * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking
+ * concurrent queue.
+ */
+
+/**
+ * @internal
+ *   Enqueue several objects on the non-blocking ring (single-producer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+	uint32_t free_entries;
+	size_t head, next;
+
+	n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
+					 &head, &next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+	r->prod_64.tail += n;
+
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the non-blocking ring (multi-producer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
+	RTE_SET_USED(r);
+	RTE_SET_USED(obj_table);
+	RTE_SET_USED(n);
+	RTE_SET_USED(behavior);
+	RTE_SET_USED(free_space);
+#ifndef ALLOW_EXPERIMENTAL_API
+	printf("[%s()] RING_F_NB requires an experimental API."
+	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+	       , __func__);
+#endif
+	return 0;
+#endif
+#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
+	size_t head, next, tail;
+	uint32_t free_entries;
+	unsigned int i;
+
+	n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
+					 &head, &next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
+		struct nb_ring_entry old_value, new_value;
+		struct nb_ring_entry *ring_ptr;
+
+		/* Enqueue to the tail entry. If another thread wins the race,
+		 * retry with the new tail.
+		 */
+		tail = r->prod_64.tail;
+
+		ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask];
+
+		old_value = *ring_ptr;
+
+		/* If the tail entry's modification counter doesn't match the
+		 * producer tail index, it's already been updated.
+		 */
+		if (old_value.cnt != tail)
+			continue;
+
+		/* Prepare the new entry. The cnt field mitigates the ABA
+		 * problem on the ring write.
+		 */
+		new_value.ptr = obj_table[i];
+		new_value.cnt = tail + r->size;
+
+		if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
+					 (rte_int128_t *)&old_value,
+					 (rte_int128_t *)&new_value))
+			i++;
+
+		/* Every thread attempts the cmpset, so they don't have to wait
+		 * for the thread that successfully enqueued to the ring.
+		 * Using a 64-bit tail mitigates the ABA problem here.
+		 *
+		 * Built-in used to handle variable-sized tail index.
+		 */
+		__sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail + 1);
+	}
+
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+#endif
+}
+
+/**
+ * @internal Enqueue several objects on the non-blocking ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, enum rte_ring_queue_behavior behavior,
+			 unsigned int is_sp, unsigned int *free_space)
+{
+	if (is_sp)
+		return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
+						   behavior, free_space);
+	else
+		return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
+						   behavior, free_space);
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the non-blocking ring (single-consumer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	size_t head, next;
+	uint32_t entries;
+
+	n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
+					 &head, &next, &entries);
+	if (n == 0)
+		goto end;
+
+	DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+	r->cons_64.tail += n;
+
+end:
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the non-blocking ring (multi-consumer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	size_t head, next;
+	uint32_t entries;
+
+	n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
+					 &head, &next, &entries);
+	if (n == 0)
+		goto end;
+
+	while (1) {
+		size_t tail = r->cons_64.tail;
+
+		/* Dequeue from the cons tail onwards. If multiple threads read
+		 * the same pointers, the thread that successfully performs the
+		 * CAS will keep them and the other(s) will retry.
+		 */
+		DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
+
+		next = tail + n;
+
+		/* Built-in used to handle variable-sized tail index. */
+		if (__sync_bool_compare_and_swap(&r->cons_64.tail, tail, next))
+			/* There is potential for the ABA problem here, but
+			 * that is mitigated by the large (64-bit) tail.
+			 */
+			break;
+	}
+
+end:
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
+/**
+ * @internal Dequeue several objects from the non-blocking ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
+		 unsigned int n, enum rte_ring_queue_behavior behavior,
+		 unsigned int is_sc, unsigned int *available)
+{
+	if (is_sc)
+		return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
+						   behavior, available);
+	else
+		return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
+						   behavior, available);
+}
+
 /**
  * @internal Enqueue several objects on the ring
  *
@@ -438,8 +853,14 @@  static __rte_always_inline unsigned int
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MP, free_space);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_MP,
+						free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_MP,
+					     free_space);
 }
 
 /**
@@ -461,8 +882,14 @@  static __rte_always_inline unsigned int
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SP, free_space);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_SP,
+						free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_SP,
+					     free_space);
 }
 
 /**
@@ -488,8 +915,14 @@  static __rte_always_inline unsigned int
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			r->prod.single, free_space);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED,
+						r->prod_64.single, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED,
+					     r->prod.single, free_space);
 }
 
 /**
@@ -572,8 +1005,14 @@  static __rte_always_inline unsigned int
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MC, available);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_MC,
+						available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_MC,
+					     available);
 }
 
 /**
@@ -596,8 +1035,14 @@  static __rte_always_inline unsigned int
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SC, available);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_SC,
+						available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_SC,
+					     available);
 }
 
 /**
@@ -623,8 +1068,14 @@  static __rte_always_inline unsigned int
 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
 		unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-				r->cons.single, available);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED,
+						r->cons_64.single, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED,
+					     r->cons.single, available);
 }
 
 /**
@@ -699,9 +1150,13 @@  rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	uint32_t count = (prod_tail - cons_tail) & r->mask;
+	uint32_t count;
+
+	if (r->flags & RING_F_NB)
+		count = (r->prod_64.tail - r->cons_64.tail) & r->mask;
+	else
+		count = (r->prod.tail - r->cons.tail) & r->mask;
+
 	return (count > r->capacity) ? r->capacity : count;
 }
 
@@ -821,8 +1276,14 @@  static __rte_always_inline unsigned
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_MP, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_MP, free_space);
 }
 
 /**
@@ -844,8 +1305,14 @@  static __rte_always_inline unsigned
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_SP, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_SP, free_space);
 }
 
 /**
@@ -871,8 +1338,14 @@  static __rte_always_inline unsigned
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
-			r->prod.single, free_space);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						r->prod_64.single, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     r->prod.single, free_space);
 }
 
 /**
@@ -899,8 +1372,14 @@  static __rte_always_inline unsigned
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_MC, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_MC, available);
 }
 
 /**
@@ -924,8 +1403,14 @@  static __rte_always_inline unsigned
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_SC, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_SC, available);
 }
 
 /**
@@ -951,9 +1436,14 @@  static __rte_always_inline unsigned
 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-				RTE_RING_QUEUE_VARIABLE,
-				r->cons.single, available);
+	if (r->flags & RING_F_NB)
+		return __rte_ring_do_nb_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						r->cons_64.single, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     r->cons.single, available);
 }
 
 #ifdef __cplusplus
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index d935efd0d..8969467af 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -17,3 +17,10 @@  DPDK_2.2 {
 	rte_ring_free;
 
 } DPDK_2.0;
+
+DPDK_19.05 {
+	global:
+
+	rte_ring_get_memsize;
+
+} DPDK_2.2;