[RFC] lfring: lock-free ring buffer

Message ID 1548678513-14348-1-git-send-email-ola.liljedahl@arm.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series [RFC] lfring: lock-free ring buffer |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Ola Liljedahl Jan. 28, 2019, 12:28 p.m. UTC
  Lock-free MP/MC ring buffer with SP/SC options.
The ring buffer metadata only maintains one set of head and tail
pointers. Tail is updated on enqueue, head is updated on dequeue.
The ring slots between head and tail always contains valid (unconsumed)
slots.
Each ring slot consists of a struct of data pointer ("ptr") and ring
index ("idx"). Slot.idx is only updated on enqueue with the new ring
index. The slot index is used to ensure that slots are written
sequentially (important for FIFO ordering). It is also used to
synchronise multiple producers.

MP enqueue scans the ring from the tail until head+size, looking for an
empty slot as indicated by slot.idx equaling the ring index of
the previous lap (index - ring size). The empty slot is written (.ptr =
data, .idx = index) using CAS. If CAS fails, some other thread wrote
the slot and the thread continues with the next slot.

When all elements have been enqueued or if the ring buffer is full, the
thread updates the ring buffer tail pointer (unless this has not already
been updated by some other thread). Thus this thread will help other
threads that have written ring slots but not yet updated the tail pointer.

If a slot with an index different from current or previous lap is found,
it is assumed that the thread has fallen behind (e.g. been preempted)
and the local tail pointer is (conditionally) updated from the ring buffer
metadata in order to quickly catch up.

MP dequeue reads (up to) N slots from the head index and attempts to
commit the operation using CAS on the head pointer.

Enqueue N elements: N+1 CAS operations (but the last CAS operation might
not be necessary during contention as producers will help each other)
Dequeue N elements: 1 CAS operation

As the lock-free ring supports both MT-safe (MP/MC) and single-threaded
(SP/SC) operations, there is a question whether the modes should be
chosen only when creating a ring buffer or if the application can mix
MT-safe and single-threaded enqueue (or dequeue) calls. To follow the
pattern set by rte_ring, specific functions for MP and SP enqueue and
MC and SC dequeue are made available. The wisdom of this ability can
be questioned. The lock-free design allows threads to be blocked for
long periods of time without interfering with the operation of the ring
buffer but mixing (lock-free) MT-safe and single-threaded calls requires
that there are on such threads that wake up at the wrong moment (when a
single-threaded operation is in progress).

128-bit lock-free atomic compare exchange operations for AArch64
(ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these
functions to find a different home and possibly a different API.
Note that a 'frail' version atomic_compare_exchange is implemented,
this means that in the case of failure, the old value returned is
not guaranteed to be atomically read (as this is not possible on ARMv8.0
without also writing to the location). Atomically read values are
often not necessary, it is a successful compare and exchange operation
which provides atomicity.

Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com>
---
 config/common_base                       |   5 +
 lib/Makefile                             |   2 +
 lib/librte_lfring/Makefile               |  22 ++
 lib/librte_lfring/lockfree16.h           | 144 ++++++++
 lib/librte_lfring/meson.build            |   6 +
 lib/librte_lfring/rte_lfring.c           | 261 ++++++++++++++
 lib/librte_lfring/rte_lfring.h           | 567 +++++++++++++++++++++++++++++++
 lib/librte_lfring/rte_lfring_version.map |  19 ++
 8 files changed, 1026 insertions(+)
 create mode 100644 lib/librte_lfring/Makefile
 create mode 100644 lib/librte_lfring/lockfree16.h
 create mode 100644 lib/librte_lfring/meson.build
 create mode 100644 lib/librte_lfring/rte_lfring.c
 create mode 100644 lib/librte_lfring/rte_lfring.h
 create mode 100644 lib/librte_lfring/rte_lfring_version.map
  

Comments

Eads, Gage Jan. 28, 2019, 9:04 p.m. UTC | #1
Hey Ola,

> -----Original Message-----
> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> Sent: Monday, January 28, 2019 6:29 AM
> To: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; Richardson, Bruce
> <bruce.richardson@intel.com>
> Cc: nd <nd@arm.com>; Ola Liljedahl <Ola.Liljedahl@arm.com>
> Subject: [RFC] lfring: lock-free ring buffer
> 
> Lock-free MP/MC ring buffer with SP/SC options.
> The ring buffer metadata only maintains one set of head and tail pointers. Tail is
> updated on enqueue, head is updated on dequeue.
> The ring slots between head and tail always contains valid (unconsumed) slots.
> Each ring slot consists of a struct of data pointer ("ptr") and ring index ("idx").
> Slot.idx is only updated on enqueue with the new ring index. The slot index is
> used to ensure that slots are written sequentially (important for FIFO ordering).
> It is also used to synchronise multiple producers.
> 
> MP enqueue scans the ring from the tail until head+size, looking for an empty
> slot as indicated by slot.idx equaling the ring index of the previous lap (index -
> ring size). The empty slot is written (.ptr = data, .idx = index) using CAS. If CAS
> fails, some other thread wrote the slot and the thread continues with the next
> slot.
> 
> When all elements have been enqueued or if the ring buffer is full, the thread
> updates the ring buffer tail pointer (unless this has not already been updated by
> some other thread). Thus this thread will help other threads that have written
> ring slots but not yet updated the tail pointer.
> 
> If a slot with an index different from current or previous lap is found, it is
> assumed that the thread has fallen behind (e.g. been preempted) and the local
> tail pointer is (conditionally) updated from the ring buffer metadata in order to
> quickly catch up.
> 
> MP dequeue reads (up to) N slots from the head index and attempts to commit
> the operation using CAS on the head pointer.
> 
> Enqueue N elements: N+1 CAS operations (but the last CAS operation might not
> be necessary during contention as producers will help each other) Dequeue N
> elements: 1 CAS operation
> 
> As the lock-free ring supports both MT-safe (MP/MC) and single-threaded
> (SP/SC) operations, there is a question whether the modes should be chosen
> only when creating a ring buffer or if the application can mix MT-safe and single-
> threaded enqueue (or dequeue) calls. To follow the pattern set by rte_ring,
> specific functions for MP and SP enqueue and MC and SC dequeue are made
> available. The wisdom of this ability can be questioned. The lock-free design
> allows threads to be blocked for long periods of time without interfering with
> the operation of the ring buffer but mixing (lock-free) MT-safe and single-
> threaded calls requires that there are on such threads that wake up at the wrong
> moment (when a single-threaded operation is in progress).
> 

I see this as a user error. The rte_ring documentation is clear that ring_sp_enqueue and ring_sc_dequeue functions are not MT-safe, and (if I'm reading this correctly) this scenario involves having 2+ threads executing an operation in parallel.

> 128-bit lock-free atomic compare exchange operations for AArch64
> (ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these functions to
> find a different home and possibly a different API.
> Note that a 'frail' version atomic_compare_exchange is implemented, this
> means that in the case of failure, the old value returned is not guaranteed to be
> atomically read (as this is not possible on ARMv8.0 without also writing to the
> location). Atomically read values are often not necessary, it is a successful
> compare and exchange operation which provides atomicity.
> 
> Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com>

A few high-level thoughts:
1. The enqueue and dequeue functions are not compatible with the rte_ring's bulk semantics, where either all pointers or none are operated on. This is necessary to implement the ring API of course, but in a broader sense I think it's necessary for usability in general. For example, how would a user (or the ring mempool handler) distinguish between partial dequeues that occur because the ring is empty vs. those that occur due to a race? dequeue_bulk_mc could return 0 even though the ring is full, if its first CAS fails and the head has advanced beyond its local tail variable. The user could retry dequeue, of course, but how many times until they feel confident that the ring is truly empty and give up? I don't think we can avoid reserving ring slots prior to enqueueing/dequeueing.

2. On the plus side, the enqueue function design that allows it to use 1/2 the atomics of mine appears to be independent of reserving ring slots, and should transfer over fairly cleanly. I'm a little concerned about the performance variability it introduces (i.e. if the thread gets into "catch up" mode), particularly for larger rings, since real-time software values predictability. What if the reload criteria was instead something like:

#define ENQ_RETRY_LIMIT 32 //arbitrary

if (old.e.idx != tail - size) {
    if (++fail_cnt < ENQ_RETRY_LIMIT) {
        tail++;
    } else {
        fail_cnt = 0;
        tail = rte_lfring_reload(...); 
    }
    goto restart;
}
fail_cnt = 0;

3. Using a zero-length array to mark the start of the ring is a nice approach -- I'll incorporate that into the patchset.

At an algorithm level, I don't see any other differences. Implementation-wise, we'll need to nail the memory ordering flags to best support weak consistency machines, as you pointed out elsewhere.

Out of curiosity, is p64_lfring based on the nb ring patchset? I noticed it used a different algorithm until pretty recently.

Thanks,
Gage

</snip>
  
Ola Liljedahl Jan. 28, 2019, 10:26 p.m. UTC | #2
On Mon, 2019-01-28 at 21:04 +0000, Eads, Gage wrote:
> Hey Ola,
> 
> > 
> > -----Original Message-----
> > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > Sent: Monday, January 28, 2019 6:29 AM
> > To: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Honnappa Nagarahalli
> > <Honnappa.Nagarahalli@arm.com>; Richardson, Bruce
> > <bruce.richardson@intel.com>
> > Cc: nd <nd@arm.com>; Ola Liljedahl <Ola.Liljedahl@arm.com>
> > Subject: [RFC] lfring: lock-free ring buffer
> > 
> > Lock-free MP/MC ring buffer with SP/SC options.
> > The ring buffer metadata only maintains one set of head and tail pointers.
> > Tail is
> > updated on enqueue, head is updated on dequeue.
> > The ring slots between head and tail always contains valid (unconsumed)
> > slots.
> > Each ring slot consists of a struct of data pointer ("ptr") and ring index
> > ("idx").
> > Slot.idx is only updated on enqueue with the new ring index. The slot index
> > is
> > used to ensure that slots are written sequentially (important for FIFO
> > ordering).
> > It is also used to synchronise multiple producers.
> > 
> > MP enqueue scans the ring from the tail until head+size, looking for an
> > empty
> > slot as indicated by slot.idx equaling the ring index of the previous lap
> > (index -
> > ring size). The empty slot is written (.ptr = data, .idx = index) using CAS.
> > If CAS
> > fails, some other thread wrote the slot and the thread continues with the
> > next
> > slot.
> > 
> > When all elements have been enqueued or if the ring buffer is full, the
> > thread
> > updates the ring buffer tail pointer (unless this has not already been
> > updated by
> > some other thread). Thus this thread will help other threads that have
> > written
> > ring slots but not yet updated the tail pointer.
> > 
> > If a slot with an index different from current or previous lap is found, it
> > is
> > assumed that the thread has fallen behind (e.g. been preempted) and the
> > local
> > tail pointer is (conditionally) updated from the ring buffer metadata in
> > order to
> > quickly catch up.
> > 
> > MP dequeue reads (up to) N slots from the head index and attempts to commit
> > the operation using CAS on the head pointer.
> > 
> > Enqueue N elements: N+1 CAS operations (but the last CAS operation might not
> > be necessary during contention as producers will help each other) Dequeue N
> > elements: 1 CAS operation
> > 
> > As the lock-free ring supports both MT-safe (MP/MC) and single-threaded
> > (SP/SC) operations, there is a question whether the modes should be chosen
> > only when creating a ring buffer or if the application can mix MT-safe and
> > single-
> > threaded enqueue (or dequeue) calls. To follow the pattern set by rte_ring,
> > specific functions for MP and SP enqueue and MC and SC dequeue are made
> > available. The wisdom of this ability can be questioned. The lock-free
> > design
> > allows threads to be blocked for long periods of time without interfering
> > with
> > the operation of the ring buffer but mixing (lock-free) MT-safe and single-
> > threaded calls requires that there are on such threads that wake up at the
> > wrong
> > moment (when a single-threaded operation is in progress).
> > 
> I see this as a user error. The rte_ring documentation is clear that
> ring_sp_enqueue and ring_sc_dequeue functions are not MT-safe, and (if I'm
> reading this correctly) this scenario involves having 2+ threads executing an
> operation in parallel.
So the individual MP/SP/MC/SC functions should also be directly available to the
user? This is what I have implemented here (but not in my original PROGRESS64
implementation where MP/SP/MC/SC mode is fixed at creation). But I don't like
it.

> 
> > 
> > 128-bit lock-free atomic compare exchange operations for AArch64
> > (ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these functions
> > to
> > find a different home and possibly a different API.
> > Note that a 'frail' version atomic_compare_exchange is implemented, this
> > means that in the case of failure, the old value returned is not guaranteed
> > to be
> > atomically read (as this is not possible on ARMv8.0 without also writing to
> > the
> > location). Atomically read values are often not necessary, it is a
> > successful
> > compare and exchange operation which provides atomicity.
> > 
> > Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com>
> A few high-level thoughts:
> 1. The enqueue and dequeue functions are not compatible with the rte_ring's
> bulk semantics,
I wasn't considering 100% compatibility with other modules that already use
rte_ring, the code rather reflects the requirements of my own use cases. If we
want this, then we need to provide the same bulk enqueue/dequeue behaviour as
rte_ring.

>  where either all pointers or none are operated on. This is necessary to
> implement the ring API of course, but in a broader sense I think it's
> necessary for usability in general. For example, how would a user (or the ring
> mempool handler) distinguish between partial dequeues that occur because the
> ring is empty vs. those that occur due to a race? dequeue_bulk_mc could return
> 0 even though the ring is full, if its first CAS fails and the head has
> advanced beyond its local tail variable.
Tail could be re-read on a CAS(&head) failure. This is how I originally coded
ring buffer enqueue and dequeue functions. But CAS(&head) failure doesn't by
itself mean that *tail* has changed. So I hoisted the read of tail before the
CAS-loop which made the code slightly faster since we avoid unnecessarily
reading a location which might have been modified and thus could cause a cache
miss. Maybe this microptimisation isn't always a good idea (it could lead to
spurious queue full/empty failures). Another solution is to only re-read tail if
we see the queue as empty (or we can't acquire as many slots as the user
requested).

        uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
        uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
        do {
                actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
                if (unlikely(actual <= 0)) {
			/* Re-load tail only when queue looks empty */
        		tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
	     
          actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
			if
(unlikely(actual <= 0))
	                        return 0;
		}
                rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
        } while (!__atomic_compare_exchange_n(&r->head,
                                              &head,
                                              head + actual,
                                              /*weak*/false,
                                              __ATOMIC_RELEASE,
                                              __ATOMIC_RELAXED));
>  The user could retry dequeue, of course, but how many times until they feel
> confident that the ring is truly empty and give up? I don't think we can avoid
> reserving ring slots prior to enqueueing/dequeueing.
If tail is (re-) read inside the CAS-loop, there should be no spurious queue
empty failures and no need to repeatedly call the dequeue function "just to be
sure". The bulk dequeue/enqueue behaviour is separate.

> 
> 2. On the plus side, the enqueue function design that allows it to use 1/2 the
> atomics of mine appears to be independent of reserving ring slots, and should
> transfer over fairly cleanly. I'm a little concerned about the performance
> variability it introduces (i.e. if the thread gets into "catch up" mode), 
If a thread has to catch up, it means it is using a stale (head/tail) index
(more than one ring lap behaind). Better to try to load a fresh value (if one is
available) than to iterate over the ring until it has caught up. So I think this
is the better/faster design.

Catch up mode is not triggered by finding an occupied slot for the current lap
(that was just written by some other thread). Or at least this is the idea. If
we find a freshly written slot, we just move to the next slot.

> particularly for larger rings, since real-time software values predictability.
> What if the reload criteria was instead something like:
> 
> #define ENQ_RETRY_LIMIT 32 //arbitrary
> 
> if (old.e.idx != tail - size) {
>     if (++fail_cnt < ENQ_RETRY_LIMIT) {
>         tail++;
>     } else {
>         fail_cnt = 0;
>         tail = rte_lfring_reload(...); 
>     }
>     goto restart;
> }
> fail_cnt = 0;
There are three cases (slot index must be between q->tail and q->head + q-
>size):
slot.idx == tail - size: slot is free, try to write it
slot.idx == tail: slot has just been written (by other thread), skip to next
slot (tail++)
none of the above: thread is behind (at least one lap), re-load tail from q-
>tail

I think using the retry count actually delays catching up to a fresh position.

> 
> 3. Using a zero-length array to mark the start of the ring is a nice approach
> -- I'll incorporate that into the patchset.
> 
> At an algorithm level, I don't see any other differences. Implementation-wise, 
> we'll need to nail the memory ordering flags to best support weak consistency
> machines, as you pointed out elsewhere.
There is no pre-acquisition of slots in enqueue and dequeue. That separate step
makes lock-freedom impossible (I think).

With a merged implementation, I think the NON-BLOCKING/LOCK-FREE mode most have
separate implementations also for SP/SC. In non-blocking/lock-free mode (whether
MP/MC or SP/SC), only one set of head/tail pointers is used. In blocking mode,
two sets of head/tail are used. There isn't a set of SP/SC code that supports
both non-blocking/lock-free and blocking behaviours.

> 
> Out of curiosity, is p64_lfring based on the nb ring patchset? I noticed it
> used a different algorithm until pretty recently.
I created the original lfring design (which isn't that different I think) back
in November last and it seemed to work but I didn't think it could guarantee
FIFO order as elements were enqueued and dequeued wherever a thread found a
suitable slot and in my use cases, FIFO order is an important characteristic
(need to maintain ingress-to-egress packet order). Already then I was
considering adding the index to the ring slots to enforce FIFO enqueue/dequeue
order, I knew this would be possible on ARMv7a (which has 2x32 bit CAS using
exclusives) and ARMv8/AArch64 (which has 2x64-bit CAS using exclusives). I had
seen this design idea on the Internet before (e.g. here https://www.youtube.com/
watch?v=HP2InVqgBFM, I actually stopped viewing this presentation half-way
because I wanted to iron out the details myself), it's not a novel idea (sorry).
However I didn't have time to do this change before Christmas holidays and just
came back to work two weeks ago. So I would rather call this simultaneous
"invention" (or publication). Difficult to prove this of course...

> 
> Thanks,
> Gage
> 
> </snip>
  
Eads, Gage Jan. 30, 2019, 5:17 a.m. UTC | #3
> -----Original Message-----
> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> Sent: Monday, January 28, 2019 4:26 PM
> To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Richardson,
> Bruce <bruce.richardson@intel.com>; Eads, Gage <gage.eads@intel.com>;
> dev@dpdk.org
> Cc: nd <nd@arm.com>
> Subject: Re: [RFC] lfring: lock-free ring buffer
> 
> On Mon, 2019-01-28 at 21:04 +0000, Eads, Gage wrote:
> > Hey Ola,
> >
> > >
> > > -----Original Message-----
> > > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > > Sent: Monday, January 28, 2019 6:29 AM
> > > To: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Honnappa
> > > Nagarahalli <Honnappa.Nagarahalli@arm.com>; Richardson, Bruce
> > > <bruce.richardson@intel.com>
> > > Cc: nd <nd@arm.com>; Ola Liljedahl <Ola.Liljedahl@arm.com>
> > > Subject: [RFC] lfring: lock-free ring buffer
> > >
> > > Lock-free MP/MC ring buffer with SP/SC options.
> > > The ring buffer metadata only maintains one set of head and tail pointers.
> > > Tail is
> > > updated on enqueue, head is updated on dequeue.
> > > The ring slots between head and tail always contains valid
> > > (unconsumed) slots.
> > > Each ring slot consists of a struct of data pointer ("ptr") and ring
> > > index ("idx").
> > > Slot.idx is only updated on enqueue with the new ring index. The
> > > slot index is used to ensure that slots are written sequentially
> > > (important for FIFO ordering).
> > > It is also used to synchronise multiple producers.
> > >
> > > MP enqueue scans the ring from the tail until head+size, looking for
> > > an empty slot as indicated by slot.idx equaling the ring index of
> > > the previous lap (index - ring size). The empty slot is written
> > > (.ptr = data, .idx = index) using CAS.
> > > If CAS
> > > fails, some other thread wrote the slot and the thread continues
> > > with the next slot.
> > >
> > > When all elements have been enqueued or if the ring buffer is full,
> > > the thread updates the ring buffer tail pointer (unless this has not
> > > already been updated by some other thread). Thus this thread will
> > > help other threads that have written ring slots but not yet updated
> > > the tail pointer.
> > >
> > > If a slot with an index different from current or previous lap is
> > > found, it is assumed that the thread has fallen behind (e.g. been
> > > preempted) and the local tail pointer is (conditionally) updated
> > > from the ring buffer metadata in order to quickly catch up.
> > >
> > > MP dequeue reads (up to) N slots from the head index and attempts to
> > > commit the operation using CAS on the head pointer.
> > >
> > > Enqueue N elements: N+1 CAS operations (but the last CAS operation
> > > might not be necessary during contention as producers will help each
> > > other) Dequeue N
> > > elements: 1 CAS operation
> > >
> > > As the lock-free ring supports both MT-safe (MP/MC) and
> > > single-threaded
> > > (SP/SC) operations, there is a question whether the modes should be
> > > chosen only when creating a ring buffer or if the application can
> > > mix MT-safe and
> > > single-
> > > threaded enqueue (or dequeue) calls. To follow the pattern set by
> > > rte_ring, specific functions for MP and SP enqueue and MC and SC
> > > dequeue are made available. The wisdom of this ability can be
> > > questioned. The lock-free design allows threads to be blocked for
> > > long periods of time without interfering with the operation of the
> > > ring buffer but mixing (lock-free) MT-safe and single- threaded
> > > calls requires that there are on such threads that wake up at the
> > > wrong moment (when a single-threaded operation is in progress).
> > >
> > I see this as a user error. The rte_ring documentation is clear that
> > ring_sp_enqueue and ring_sc_dequeue functions are not MT-safe, and (if
> > I'm reading this correctly) this scenario involves having 2+ threads
> > executing an operation in parallel.
> So the individual MP/SP/MC/SC functions should also be directly available to the
> user? This is what I have implemented here (but not in my original PROGRESS64
> implementation where MP/SP/MC/SC mode is fixed at creation). But I don't like
> it.
> 
> >
> > >
> > > 128-bit lock-free atomic compare exchange operations for AArch64
> > > (ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these
> > > functions to find a different home and possibly a different API.
> > > Note that a 'frail' version atomic_compare_exchange is implemented,
> > > this means that in the case of failure, the old value returned is
> > > not guaranteed to be atomically read (as this is not possible on
> > > ARMv8.0 without also writing to the location). Atomically read
> > > values are often not necessary, it is a successful compare and
> > > exchange operation which provides atomicity.
> > >
> > > Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com>
> > A few high-level thoughts:
> > 1. The enqueue and dequeue functions are not compatible with the
> > rte_ring's bulk semantics,
> I wasn't considering 100% compatibility with other modules that already use
> rte_ring, the code rather reflects the requirements of my own use cases. If we
> want this, then we need to provide the same bulk enqueue/dequeue behaviour
> as rte_ring.
> 
> >  where either all pointers or none are operated on. This is necessary
> > to implement the ring API of course, but in a broader sense I think
> > it's necessary for usability in general. For example, how would a user
> > (or the ring mempool handler) distinguish between partial dequeues
> > that occur because the ring is empty vs. those that occur due to a
> > race? dequeue_bulk_mc could return
> > 0 even though the ring is full, if its first CAS fails and the head
> > has advanced beyond its local tail variable.
> Tail could be re-read on a CAS(&head) failure. This is how I originally coded ring
> buffer enqueue and dequeue functions. But CAS(&head) failure doesn't by itself
> mean that *tail* has changed. So I hoisted the read of tail before the CAS-loop
> which made the code slightly faster since we avoid unnecessarily reading a
> location which might have been modified and thus could cause a cache miss.
> Maybe this microptimisation isn't always a good idea (it could lead to spurious
> queue full/empty failures). Another solution is to only re-read tail if we see the
> queue as empty (or we can't acquire as many slots as the user requested).
> 
>         uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
>         uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
>         do {
>                 actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
>                 if (unlikely(actual <= 0)) {
> 			/* Re-load tail only when queue looks empty */
>         		tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
> 
>           actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
> 			if
> (unlikely(actual <= 0))
> 	                        return 0;
> 		}
>                 rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
>         } while (!__atomic_compare_exchange_n(&r->head,
>                                               &head,
>                                               head + actual,
>                                               /*weak*/false,
>                                               __ATOMIC_RELEASE,
>                                               __ATOMIC_RELAXED));
> >  The user could retry dequeue, of course, but how many times until
> > they feel confident that the ring is truly empty and give up? I don't
> > think we can avoid reserving ring slots prior to enqueueing/dequeueing.
> If tail is (re-) read inside the CAS-loop, there should be no spurious queue empty
> failures and no need to repeatedly call the dequeue function "just to be sure".
> The bulk dequeue/enqueue behaviour is separate.
> 
> >
> > 2. On the plus side, the enqueue function design that allows it to use
> > 1/2 the atomics of mine appears to be independent of reserving ring
> > slots, and should transfer over fairly cleanly. I'm a little concerned
> > about the performance variability it introduces (i.e. if the thread
> > gets into "catch up" mode),
> If a thread has to catch up, it means it is using a stale (head/tail) index (more
> than one ring lap behaind). Better to try to load a fresh value (if one is
> available) than to iterate over the ring until it has caught up. So I think this is the
> better/faster design.
> 
> Catch up mode is not triggered by finding an occupied slot for the current lap
> (that was just written by some other thread). Or at least this is the idea. If we
> find a freshly written slot, we just move to the next slot.
> 
> > particularly for larger rings, since real-time software values predictability.
> > What if the reload criteria was instead something like:
> >
> > #define ENQ_RETRY_LIMIT 32 //arbitrary
> >
> > if (old.e.idx != tail - size) {
> >     if (++fail_cnt < ENQ_RETRY_LIMIT) {
> >         tail++;
> >     } else {
> >         fail_cnt = 0;
> >         tail = rte_lfring_reload(...);
> >     }
> >     goto restart;
> > }
> > fail_cnt = 0;
> There are three cases (slot index must be between q->tail and q->head + q-
> >size):
> slot.idx == tail - size: slot is free, try to write it slot.idx == tail: slot has just been
> written (by other thread), skip to next slot (tail++) none of the above: thread is
> behind (at least one lap), re-load tail from q-
> >tail
> 
> I think using the retry count actually delays catching up to a fresh position.
> 

Miscommunication here -- by "catch up", I meant the case where the thread is behind but by less than one lap (the second case you describe). In the worst case, the thread would have to read N-1 (N = ring size) entries before reaching the next available entry, and N could easily be in the thousands. That's the performance variability I was referring to, and why I suggested capping the failed slot reads at 32. Maintaining a local fail_cnt variable is a small price to pay (relative to a CAS) to prevent a ring enqueue latency spike.

But you're right that we should still catch the 1+ lap behind case, so the reload criteria could be:

#define ENQ_RETRY_LIMIT 32 //arbitrary

if (old.e.idx != tail - size) {
    if (++fail_cnt < ENQ_RETRY_LIMIT && old.e.idx == tail) {
        tail++;
    } else {
        fail_cnt = 0;
        tail = rte_lfring_reload(...);
    }
    goto restart;
}
fail_cnt = 0;

> >
> > 3. Using a zero-length array to mark the start of the ring is a nice
> > approach
> > -- I'll incorporate that into the patchset.
> >
> > At an algorithm level, I don't see any other differences.
> > Implementation-wise, we'll need to nail the memory ordering flags to
> > best support weak consistency machines, as you pointed out elsewhere.
> There is no pre-acquisition of slots in enqueue and dequeue. That separate step
> makes lock-freedom impossible (I think).
> 

Can you elaborate? I don't currently see any other way to support rte_ring bulk semantics, which is necessary for creating a non-blocking mempool handler, so we should clear up any doubt.

In the NB ring patchset each thread reserves a number of slots before performing the enq/deq, but doesn't reserve *specific* slots (unlike rte_ring). This reservation is atomic, so that we never over-subscribe valid ring entries (for dequeue) or unused ring entries (for enqueue). This guarantees that the enq/deq operation will eventually complete, regardless of the behavior of other threads. This is why the enqueue loop doesn't check if space is available and the dequeue loop doesn't check if any more valid entries remain.

This sort of reservation should be compatible with lfring, but requires changes (e.g. two sets of head/tail pointers).

> With a merged implementation, I think the NON-BLOCKING/LOCK-FREE mode
> most have separate implementations also for SP/SC. In non-blocking/lock-free
> mode (whether MP/MC or SP/SC), only one set of head/tail pointers is used. In
> blocking mode, two sets of head/tail are used. There isn't a set of SP/SC code
> that supports both non-blocking/lock-free and blocking behaviours.
> 

Sure, I don't see a problem with that. The NB ring patchset has separate NB SP/SC code from the blocking SP/SC code as well.

> >
> > Out of curiosity, is p64_lfring based on the nb ring patchset? I
> > noticed it used a different algorithm until pretty recently.
> I created the original lfring design (which isn't that different I think) back in
> November last and it seemed to work but I didn't think it could guarantee FIFO
> order as elements were enqueued and dequeued wherever a thread found a
> suitable slot and in my use cases, FIFO order is an important characteristic (need
> to maintain ingress-to-egress packet order). Already then I was considering
> adding the index to the ring slots to enforce FIFO enqueue/dequeue order, I
> knew this would be possible on ARMv7a (which has 2x32 bit CAS using
> exclusives) and ARMv8/AArch64 (which has 2x64-bit CAS using exclusives). I had
> seen this design idea on the Internet before (e.g.
> here https://www.youtube.com/ watch?v=HP2InVqgBFM, I actually stopped
> viewing this presentation half-way because I wanted to iron out the details
> myself), it's not a novel idea (sorry).
> However I didn't have time to do this change before Christmas holidays and just
> came back to work two weeks ago. So I would rather call this simultaneous
> "invention" (or publication). Difficult to prove this of course...
> 
> >
> > Thanks,
> > Gage
> >
> > </snip>
> --
> Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype
> ola.liljedahl
  
Eads, Gage Jan. 30, 2019, 5:56 a.m. UTC | #4
> On Jan 29, 2019, at 11:17 PM, Eads, Gage <gage.eads@intel.com> wrote:
> 
> 
> 
>> -----Original Message-----
>> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
>> Sent: Monday, January 28, 2019 4:26 PM
>> To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Richardson,
>> Bruce <bruce.richardson@intel.com>; Eads, Gage <gage.eads@intel.com>;
>> dev@dpdk.org
>> Cc: nd <nd@arm.com>
>> Subject: Re: [RFC] lfring: lock-free ring buffer
>> 
>>> On Mon, 2019-01-28 at 21:04 +0000, Eads, Gage wrote:
>>> Hey Ola,
>>> 
>>>> 
>>>> -----Original Message-----
>>>> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
>>>> Sent: Monday, January 28, 2019 6:29 AM
>>>> To: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Honnappa
>>>> Nagarahalli <Honnappa.Nagarahalli@arm.com>; Richardson, Bruce
>>>> <bruce.richardson@intel.com>
>>>> Cc: nd <nd@arm.com>; Ola Liljedahl <Ola.Liljedahl@arm.com>
>>>> Subject: [RFC] lfring: lock-free ring buffer
>>>> 
>>>> Lock-free MP/MC ring buffer with SP/SC options.
>>>> The ring buffer metadata only maintains one set of head and tail pointers.
>>>> Tail is
>>>> updated on enqueue, head is updated on dequeue.
>>>> The ring slots between head and tail always contains valid
>>>> (unconsumed) slots.
>>>> Each ring slot consists of a struct of data pointer ("ptr") and ring
>>>> index ("idx").
>>>> Slot.idx is only updated on enqueue with the new ring index. The
>>>> slot index is used to ensure that slots are written sequentially
>>>> (important for FIFO ordering).
>>>> It is also used to synchronise multiple producers.
>>>> 
>>>> MP enqueue scans the ring from the tail until head+size, looking for
>>>> an empty slot as indicated by slot.idx equaling the ring index of
>>>> the previous lap (index - ring size). The empty slot is written
>>>> (.ptr = data, .idx = index) using CAS.
>>>> If CAS
>>>> fails, some other thread wrote the slot and the thread continues
>>>> with the next slot.
>>>> 
>>>> When all elements have been enqueued or if the ring buffer is full,
>>>> the thread updates the ring buffer tail pointer (unless this has not
>>>> already been updated by some other thread). Thus this thread will
>>>> help other threads that have written ring slots but not yet updated
>>>> the tail pointer.
>>>> 
>>>> If a slot with an index different from current or previous lap is
>>>> found, it is assumed that the thread has fallen behind (e.g. been
>>>> preempted) and the local tail pointer is (conditionally) updated
>>>> from the ring buffer metadata in order to quickly catch up.
>>>> 
>>>> MP dequeue reads (up to) N slots from the head index and attempts to
>>>> commit the operation using CAS on the head pointer.
>>>> 
>>>> Enqueue N elements: N+1 CAS operations (but the last CAS operation
>>>> might not be necessary during contention as producers will help each
>>>> other) Dequeue N
>>>> elements: 1 CAS operation
>>>> 
>>>> As the lock-free ring supports both MT-safe (MP/MC) and
>>>> single-threaded
>>>> (SP/SC) operations, there is a question whether the modes should be
>>>> chosen only when creating a ring buffer or if the application can
>>>> mix MT-safe and
>>>> single-
>>>> threaded enqueue (or dequeue) calls. To follow the pattern set by
>>>> rte_ring, specific functions for MP and SP enqueue and MC and SC
>>>> dequeue are made available. The wisdom of this ability can be
>>>> questioned. The lock-free design allows threads to be blocked for
>>>> long periods of time without interfering with the operation of the
>>>> ring buffer but mixing (lock-free) MT-safe and single- threaded
>>>> calls requires that there are on such threads that wake up at the
>>>> wrong moment (when a single-threaded operation is in progress).
>>>> 
>>> I see this as a user error. The rte_ring documentation is clear that
>>> ring_sp_enqueue and ring_sc_dequeue functions are not MT-safe, and (if
>>> I'm reading this correctly) this scenario involves having 2+ threads
>>> executing an operation in parallel.
>> So the individual MP/SP/MC/SC functions should also be directly available to the
>> user? This is what I have implemented here (but not in my original PROGRESS64
>> implementation where MP/SP/MC/SC mode is fixed at creation). But I don't like
>> it.
>> 
>>> 
>>>> 
>>>> 128-bit lock-free atomic compare exchange operations for AArch64
>>>> (ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these
>>>> functions to find a different home and possibly a different API.
>>>> Note that a 'frail' version atomic_compare_exchange is implemented,
>>>> this means that in the case of failure, the old value returned is
>>>> not guaranteed to be atomically read (as this is not possible on
>>>> ARMv8.0 without also writing to the location). Atomically read
>>>> values are often not necessary, it is a successful compare and
>>>> exchange operation which provides atomicity.
>>>> 
>>>> Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com>
>>> A few high-level thoughts:
>>> 1. The enqueue and dequeue functions are not compatible with the
>>> rte_ring's bulk semantics,
>> I wasn't considering 100% compatibility with other modules that already use
>> rte_ring, the code rather reflects the requirements of my own use cases. If we
>> want this, then we need to provide the same bulk enqueue/dequeue behaviour
>> as rte_ring.
>> 
>>> where either all pointers or none are operated on. This is necessary
>>> to implement the ring API of course, but in a broader sense I think
>>> it's necessary for usability in general. For example, how would a user
>>> (or the ring mempool handler) distinguish between partial dequeues
>>> that occur because the ring is empty vs. those that occur due to a
>>> race? dequeue_bulk_mc could return
>>> 0 even though the ring is full, if its first CAS fails and the head
>>> has advanced beyond its local tail variable.
>> Tail could be re-read on a CAS(&head) failure. This is how I originally coded ring
>> buffer enqueue and dequeue functions. But CAS(&head) failure doesn't by itself
>> mean that *tail* has changed. So I hoisted the read of tail before the CAS-loop
>> which made the code slightly faster since we avoid unnecessarily reading a
>> location which might have been modified and thus could cause a cache miss.
>> Maybe this microptimisation isn't always a good idea (it could lead to spurious
>> queue full/empty failures). Another solution is to only re-read tail if we see the
>> queue as empty (or we can't acquire as many slots as the user requested).
>> 
>>         uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
>>         uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
>>         do {
>>                 actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
>>                 if (unlikely(actual <= 0)) {
>>            /* Re-load tail only when queue looks empty */
>>                tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
>> 
>>           actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
>>            if
>> (unlikely(actual <= 0))
>>                            return 0;
>>        }
>>                 rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
>>         } while (!__atomic_compare_exchange_n(&r->head,
>>                                               &head,
>>                                               head + actual,
>>                                               /*weak*/false,
>>                                               __ATOMIC_RELEASE,
>>                                               __ATOMIC_RELAXED));
>>> The user could retry dequeue, of course, but how many times until
>>> they feel confident that the ring is truly empty and give up? I don't
>>> think we can avoid reserving ring slots prior to enqueueing/dequeueing.
>> If tail is (re-) read inside the CAS-loop, there should be no spurious queue empty
>> failures and no need to repeatedly call the dequeue function "just to be sure".
>> The bulk dequeue/enqueue behaviour is separate.
>> 
>>> 
>>> 2. On the plus side, the enqueue function design that allows it to use
>>> 1/2 the atomics of mine appears to be independent of reserving ring
>>> slots, and should transfer over fairly cleanly. I'm a little concerned
>>> about the performance variability it introduces (i.e. if the thread
>>> gets into "catch up" mode),
>> If a thread has to catch up, it means it is using a stale (head/tail) index (more
>> than one ring lap behaind). Better to try to load a fresh value (if one is
>> available) than to iterate over the ring until it has caught up. So I think this is the
>> better/faster design.
>> 
>> Catch up mode is not triggered by finding an occupied slot for the current lap
>> (that was just written by some other thread). Or at least this is the idea. If we
>> find a freshly written slot, we just move to the next slot.
>> 
>>> particularly for larger rings, since real-time software values predictability.
>>> What if the reload criteria was instead something like:
>>> 
>>> #define ENQ_RETRY_LIMIT 32 //arbitrary
>>> 
>>> if (old.e.idx != tail - size) {
>>>     if (++fail_cnt < ENQ_RETRY_LIMIT) {
>>>         tail++;
>>>     } else {
>>>         fail_cnt = 0;
>>>         tail = rte_lfring_reload(...);
>>>     }
>>>     goto restart;
>>> }
>>> fail_cnt = 0;
>> There are three cases (slot index must be between q->tail and q->head + q-
>>> size):
>> slot.idx == tail - size: slot is free, try to write it slot.idx == tail: slot has just been
>> written (by other thread), skip to next slot (tail++) none of the above: thread is
>> behind (at least one lap), re-load tail from q-
>>> tail
>> 
>> I think using the retry count actually delays catching up to a fresh position.
>> 
> 
> Miscommunication here -- by "catch up", I meant the case where the thread is behind but by less than one lap (the second case you describe). In the worst case, the thread would have to read N-1 (N = ring size) entries before reaching the next available entry, and N could easily be in the thousands. That's the performance variability I was referring to, and why I suggested capping the failed slot reads at 32. Maintaining a local fail_cnt variable is a small price to pay (relative to a CAS) to prevent a ring enqueue latency spike.
> 
> But you're right that we should still catch the 1+ lap behind case, so the reload criteria could be:
> 
> #define ENQ_RETRY_LIMIT 32 //arbitrary
> 
> if (old.e.idx != tail - size) {
>    if (++fail_cnt < ENQ_RETRY_LIMIT && old.e.idx == tail) {
>        tail++;
>    } else {
>        fail_cnt = 0;
>        tail = rte_lfring_reload(...);
>    }
>    goto restart;
> }
> fail_cnt = 0;
> 

Scratch that. This proposal breaks lock-freedom.

>>> 
>>> 3. Using a zero-length array to mark the start of the ring is a nice
>>> approach
>>> -- I'll incorporate that into the patchset.
>>> 
>>> At an algorithm level, I don't see any other differences.
>>> Implementation-wise, we'll need to nail the memory ordering flags to
>>> best support weak consistency machines, as you pointed out elsewhere.
>> There is no pre-acquisition of slots in enqueue and dequeue. That separate step
>> makes lock-freedom impossible (I think).
>> 
> 
> Can you elaborate? I don't currently see any other way to support rte_ring bulk semantics, which is necessary for creating a non-blocking mempool handler, so we should clear up any doubt.
> 
> In the NB ring patchset each thread reserves a number of slots before performing the enq/deq, but doesn't reserve *specific* slots (unlike rte_ring). This reservation is atomic, so that we never over-subscribe valid ring entries (for dequeue) or unused ring entries (for enqueue). This guarantees that the enq/deq operation will eventually complete, regardless of the behavior of other threads. This is why the enqueue loop doesn't check if space is available and the dequeue loop doesn't check if any more valid entries remain.
> 
> This sort of reservation should be compatible with lfring, but requires changes (e.g. two sets of head/tail pointers).
> 
>> With a merged implementation, I think the NON-BLOCKING/LOCK-FREE mode
>> most have separate implementations also for SP/SC. In non-blocking/lock-free
>> mode (whether MP/MC or SP/SC), only one set of head/tail pointers is used. In
>> blocking mode, two sets of head/tail are used. There isn't a set of SP/SC code
>> that supports both non-blocking/lock-free and blocking behaviours.
>> 
> 
> Sure, I don't see a problem with that. The NB ring patchset has separate NB SP/SC code from the blocking SP/SC code as well.
> 
>>> 
>>> Out of curiosity, is p64_lfring based on the nb ring patchset? I
>>> noticed it used a different algorithm until pretty recently.
>> I created the original lfring design (which isn't that different I think) back in
>> November last and it seemed to work but I didn't think it could guarantee FIFO
>> order as elements were enqueued and dequeued wherever a thread found a
>> suitable slot and in my use cases, FIFO order is an important characteristic (need
>> to maintain ingress-to-egress packet order). Already then I was considering
>> adding the index to the ring slots to enforce FIFO enqueue/dequeue order, I
>> knew this would be possible on ARMv7a (which has 2x32 bit CAS using
>> exclusives) and ARMv8/AArch64 (which has 2x64-bit CAS using exclusives). I had
>> seen this design idea on the Internet before (e.g.
>> here https://www.youtube.com/ watch?v=HP2InVqgBFM, I actually stopped
>> viewing this presentation half-way because I wanted to iron out the details
>> myself), it's not a novel idea (sorry).
>> However I didn't have time to do this change before Christmas holidays and just
>> came back to work two weeks ago. So I would rather call this simultaneous
>> "invention" (or publication). Difficult to prove this of course...
>> 
>>> 
>>> Thanks,
>>> Gage
>>> 
>>> </snip>
>> --
>> Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype
>> ola.liljedahl
>
  
Ola Liljedahl Jan. 30, 2019, 11:36 a.m. UTC | #5
On Wed, 2019-01-30 at 05:17 +0000, Eads, Gage wrote:
<SNIP>

> > > 2. On the plus side, the enqueue function design that allows it to use
> > > 1/2 the atomics of mine appears to be independent of reserving ring
> > > slots, and should transfer over fairly cleanly. I'm a little concerned
> > > about the performance variability it introduces (i.e. if the thread
> > > gets into "catch up" mode),
> > If a thread has to catch up, it means it is using a stale (head/tail) index
> > (more
> > than one ring lap behaind). Better to try to load a fresh value (if one is
> > available) than to iterate over the ring until it has caught up. So I think
> > this is the
> > better/faster design.
> > 
> > Catch up mode is not triggered by finding an occupied slot for the current
> > lap
> > (that was just written by some other thread). Or at least this is the idea.
> > If we
> > find a freshly written slot, we just move to the next slot.
> > 
> > > 
> > > particularly for larger rings, since real-time software values
> > > predictability.
> > > What if the reload criteria was instead something like:
> > > 
> > > #define ENQ_RETRY_LIMIT 32 //arbitrary
> > > 
> > > if (old.e.idx != tail - size) {
> > >     if (++fail_cnt < ENQ_RETRY_LIMIT) {
> > >         tail++;
> > >     } else {
> > >         fail_cnt = 0;
> > >         tail = rte_lfring_reload(...);
> > >     }
> > >     goto restart;
> > > }
> > > fail_cnt = 0;
> > There are three cases (slot index must be between q->tail and q->head + q-
> > > 
> > > size):
> > slot.idx == tail - size: slot is free, try to write it slot.idx == tail:
> > slot has just been
> > written (by other thread), skip to next slot (tail++) none of the above:
> > thread is
> > behind (at least one lap), re-load tail from q-
> > > 
> > > tail
> > I think using the retry count actually delays catching up to a fresh
> > position.
> > 
> Miscommunication here -- by "catch up", I meant the case where the thread is
> behind but by less than one lap (the second case you describe). In the worst
> case, the thread would have to read N-1 (N = ring size) entries before
> reaching the next available entry, and N could easily be in the thousands.
> That's the performance variability I was referring to, and why I suggested
> capping the failed slot reads at 32. Maintaining a local fail_cnt variable is
> a small price to pay (relative to a CAS) to prevent a ring enqueue latency
> spike.
OK, I misunderstood. We can reload the private tail from the shared ring buffer
tail earlier. You could actually do this on every failed CAS but I think that
would be overkill. Your design with a retry limit is better, we need to find out
what is a suitable value for the retry limit.

> 
> But you're right that we should still catch the 1+ lap behind case, so the
> reload criteria could be:
> 
> #define ENQ_RETRY_LIMIT 32 //arbitrary
> 
> if (old.e.idx != tail - size) {
>     if (++fail_cnt < ENQ_RETRY_LIMIT && old.e.idx == tail) {
>         tail++;
>     } else {
>         fail_cnt = 0;
>         tail = rte_lfring_reload(...);
>     }
>     goto restart;
> }
> fail_cnt = 0;
Agree.

> 
> > 
> > > 
> > > 
> > > 3. Using a zero-length array to mark the start of the ring is a nice
> > > approach
> > > -- I'll incorporate that into the patchset.
> > > 
> > > At an algorithm level, I don't see any other differences.
> > > Implementation-wise, we'll need to nail the memory ordering flags to
> > > best support weak consistency machines, as you pointed out elsewhere.
> > There is no pre-acquisition of slots in enqueue and dequeue. That separate
> > step
> > makes lock-freedom impossible (I think).
> > 
> Can you elaborate?
I think lock-freedom is impossible with the initial acquisition of elements.
This acquisition creates a side effect that cannot be undone or helped by other
threads.

You can still implement a "non-blocking" ring buffer (like your original design) which hides any delay in threads that access the ring buffer but isn't properly lock-free (which could be considered unnecessary in a DPDK environment, threads may get delayed but shouldn't die or block forever).

>  I don't currently see any other way to support rte_ring bulk semantics, which
> is necessary for creating a non-blocking mempool handler, so we should clear
> up any doubt.
OK, I wasn't aware of this strict dependency on bulk enqueue and dequeue. Bulk
dequeue (mempool allocates buffers from the ring) should be easy to support
though. Bulk enqueue (mempool frees buffers to the ring) will work as long as
the ring is large enough to hold all free buffers so it can never become full
(need to reload head/tail pointers at the appropriate places to avoid spurious
full/empty status). I assume this is the case, initially all free buffers will
be stored in the ring?

> 
> In the NB ring patchset each thread reserves a number of slots before
> performing the enq/deq, but doesn't reserve *specific* slots (unlike
> rte_ring). This reservation is atomic, so that we never over-subscribe valid
> ring entries (for dequeue) or unused ring entries (for enqueue). This
> guarantees that the enq/deq operation will eventually complete, regardless of
> the behavior of other threads. This is why the enqueue loop doesn't check if
> space is available and the dequeue loop doesn't check if any more valid
> entries remain.
I initially thought these acquisitions were just for compatibility with rte_ring
(update the same metadata so that the user can mix MP/MC and SP/SC calls) but
realise they are there for the bulk enqueue/dequeue. But doing this acquisition
means each enqueue or dequeue will update two metadata variables so creates more
contention and less scalability. I think it would be good if we could provide
the bulk behaviour *without* this initial acquisition and only update one
metadata location per enqueue/dequeue operation.

> 
> This sort of reservation should be compatible with lfring, but requires
> changes (e.g. two sets of head/tail pointers).
> 
<SNIP>
> 
-- 
Ola Liljedahl, Networking System Architect, Arm
Phone +46706866373, Skype ola.liljedahl
  
Eads, Gage Feb. 1, 2019, 3:40 p.m. UTC | #6
> -----Original Message-----
> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> Sent: Wednesday, January 30, 2019 5:36 AM
> To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Richardson,
> Bruce <bruce.richardson@intel.com>; Eads, Gage <gage.eads@intel.com>;
> dev@dpdk.org
> Cc: nd <nd@arm.com>
> Subject: Re: [RFC] lfring: lock-free ring buffer
> 
> On Wed, 2019-01-30 at 05:17 +0000, Eads, Gage wrote:
> <SNIP>
> >
> > > > 2. On the plus side, the enqueue function design that allows it to
> > > > use
> > > > 1/2 the atomics of mine appears to be independent of reserving
> > > > ring slots, and should transfer over fairly cleanly. I'm a little
> > > > concerned about the performance variability it introduces (i.e. if
> > > > the thread gets into "catch up" mode),
> > > If a thread has to catch up, it means it is using a stale
> > > (head/tail) index (more than one ring lap behaind). Better to try to
> > > load a fresh value (if one is
> > > available) than to iterate over the ring until it has caught up. So
> > > I think this is the better/faster design.
> > >
> > > Catch up mode is not triggered by finding an occupied slot for the
> > > current lap (that was just written by some other thread). Or at
> > > least this is the idea.
> > > If we
> > > find a freshly written slot, we just move to the next slot.
> > >
> > > >
> > > > particularly for larger rings, since real-time software values
> > > > predictability.
> > > > What if the reload criteria was instead something like:
> > > >
> > > > #define ENQ_RETRY_LIMIT 32 //arbitrary
> > > >
> > > > if (old.e.idx != tail - size) {
> > > >     if (++fail_cnt < ENQ_RETRY_LIMIT) {
> > > >         tail++;
> > > >     } else {
> > > >         fail_cnt = 0;
> > > >         tail = rte_lfring_reload(...);
> > > >     }
> > > >     goto restart;
> > > > }
> > > > fail_cnt = 0;
> > > There are three cases (slot index must be between q->tail and
> > > q->head + q-
> > > >
> > > > size):
> > > slot.idx == tail - size: slot is free, try to write it slot.idx == tail:
> > > slot has just been
> > > written (by other thread), skip to next slot (tail++) none of the above:
> > > thread is
> > > behind (at least one lap), re-load tail from q-
> > > >
> > > > tail
> > > I think using the retry count actually delays catching up to a fresh
> > > position.
> > >
> > Miscommunication here -- by "catch up", I meant the case where the
> > thread is behind but by less than one lap (the second case you
> > describe). In the worst case, the thread would have to read N-1 (N =
> > ring size) entries before reaching the next available entry, and N could easily
> be in the thousands.
> > That's the performance variability I was referring to, and why I
> > suggested capping the failed slot reads at 32. Maintaining a local
> > fail_cnt variable is a small price to pay (relative to a CAS) to
> > prevent a ring enqueue latency spike.
> OK, I misunderstood. We can reload the private tail from the shared ring buffer
> tail earlier. You could actually do this on every failed CAS but I think that would
> be overkill. Your design with a retry limit is better, we need to find out what is a
> suitable value for the retry limit.
> 

Too late for lfring v2 unfortunately, but this idea will actually break lock-freedom. For example, say the tail index is 33 slots behind the next available slot. An enqueueing thread would get stuck retrying slots from tail index -> tail index + 32 until another thread updates the tail index. 

> >
> > But you're right that we should still catch the 1+ lap behind case, so
> > the reload criteria could be:
> >
> > #define ENQ_RETRY_LIMIT 32 //arbitrary
> >
> > if (old.e.idx != tail - size) {
> >     if (++fail_cnt < ENQ_RETRY_LIMIT && old.e.idx == tail) {
> >         tail++;
> >     } else {
> >         fail_cnt = 0;
> >         tail = rte_lfring_reload(...);
> >     }
> >     goto restart;
> > }
> > fail_cnt = 0;
> Agree.
> 
> >
> > >
> > > >
> > > >
> > > > 3. Using a zero-length array to mark the start of the ring is a
> > > > nice approach
> > > > -- I'll incorporate that into the patchset.
> > > >
> > > > At an algorithm level, I don't see any other differences.
> > > > Implementation-wise, we'll need to nail the memory ordering flags
> > > > to best support weak consistency machines, as you pointed out elsewhere.
> > > There is no pre-acquisition of slots in enqueue and dequeue. That
> > > separate step makes lock-freedom impossible (I think).
> > >
> > Can you elaborate?
> I think lock-freedom is impossible with the initial acquisition of elements.
> This acquisition creates a side effect that cannot be undone or helped by other
> threads.
> 
> You can still implement a "non-blocking" ring buffer (like your original design)
> which hides any delay in threads that access the ring buffer but isn't properly
> lock-free (which could be considered unnecessary in a DPDK environment,
> threads may get delayed but shouldn't die or block forever).
> 

I think I see what you're saying. For example If a dequeueing thread reserves N elements and then is preempted, it's effectively reduced the number of available elements by N during that period.

Practically speaking, I don't think this is a problem. Not just because threads shouldn't die or block forever, but if a thread can be preempted after reserving N ring slots (but before enqueueing to or dequeueing from them), the net effect is those N buffers are taken out of the system. This isn't really different than if it that thread was preempted *outside* of the ring functions while holding N buffers -- the application should be designed to be resilient to that.

> >  I don't currently see any other way to support rte_ring bulk
> > semantics, which is necessary for creating a non-blocking mempool
> > handler, so we should clear up any doubt.
> OK, I wasn't aware of this strict dependency on bulk enqueue and dequeue. Bulk
> dequeue (mempool allocates buffers from the ring) should be easy to support
> though. Bulk enqueue (mempool frees buffers to the ring) will work as long as
> the ring is large enough to hold all free buffers so it can never become full (need
> to reload head/tail pointers at the appropriate places to avoid spurious
> full/empty status). I assume this is the case, initially all free buffers will be stored
> in the ring?
> 
> >
> > In the NB ring patchset each thread reserves a number of slots before
> > performing the enq/deq, but doesn't reserve *specific* slots (unlike
> > rte_ring). This reservation is atomic, so that we never over-subscribe
> > valid ring entries (for dequeue) or unused ring entries (for enqueue).
> > This guarantees that the enq/deq operation will eventually complete,
> > regardless of the behavior of other threads. This is why the enqueue
> > loop doesn't check if space is available and the dequeue loop doesn't
> > check if any more valid entries remain.
> I initially thought these acquisitions were just for compatibility with rte_ring
> (update the same metadata so that the user can mix MP/MC and SP/SC calls) but
> realise they are there for the bulk enqueue/dequeue. But doing this acquisition
> means each enqueue or dequeue will update two metadata variables so creates
> more contention and less scalability. I think it would be good if we could provide
> the bulk behaviour *without* this initial acquisition and only update one
> metadata location per enqueue/dequeue operation.
> 

Agreed. I see lfring v2 has avoided this on the dequeue side, but I think it's unavoidable on the enqueue side -- since we can't write ring entries and CAS the tail pointer in one big atomic operation. AFAICT, slot reservations are unavoidable there to achieve bulk semantics.

> >
> > This sort of reservation should be compatible with lfring, but
> > requires changes (e.g. two sets of head/tail pointers).
> >
> <SNIP>
> >
> --
> Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype
> ola.liljedahl
  
Ola Liljedahl Feb. 1, 2019, 6:19 p.m. UTC | #7
On Fri, 2019-02-01 at 15:40 +0000, Eads, Gage wrote:
> 
> > 
> > -----Original Message-----
> > From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> > Sent: Wednesday, January 30, 2019 5:36 AM
> > To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Richardson,
> > Bruce <bruce.richardson@intel.com>; Eads, Gage <gage.eads@intel.com>;
> > dev@dpdk.org
> > Cc: nd <nd@arm.com>
> > Subject: Re: [RFC] lfring: lock-free ring buffer
> > 
> > On Wed, 2019-01-30 at 05:17 +0000, Eads, Gage wrote:
> > <SNIP>
> > > 
> > > 
> > > > 
> > > > > 
> > > > > 2. On the plus side, the enqueue function design that allows it to
> > > > > use
> > > > > 1/2 the atomics of mine appears to be independent of reserving
> > > > > ring slots, and should transfer over fairly cleanly. I'm a little
> > > > > concerned about the performance variability it introduces (i.e. if
> > > > > the thread gets into "catch up" mode),
> > > > If a thread has to catch up, it means it is using a stale
> > > > (head/tail) index (more than one ring lap behaind). Better to try to
> > > > load a fresh value (if one is
> > > > available) than to iterate over the ring until it has caught up. So
> > > > I think this is the better/faster design.
> > > > 
> > > > Catch up mode is not triggered by finding an occupied slot for the
> > > > current lap (that was just written by some other thread). Or at
> > > > least this is the idea.
> > > > If we
> > > > find a freshly written slot, we just move to the next slot.
> > > > 
> > > > > 
> > > > > 
> > > > > particularly for larger rings, since real-time software values
> > > > > predictability.
> > > > > What if the reload criteria was instead something like:
> > > > > 
> > > > > #define ENQ_RETRY_LIMIT 32 //arbitrary
> > > > > 
> > > > > if (old.e.idx != tail - size) {
> > > > >     if (++fail_cnt < ENQ_RETRY_LIMIT) {
> > > > >         tail++;
> > > > >     } else {
> > > > >         fail_cnt = 0;
> > > > >         tail = rte_lfring_reload(...);
> > > > >     }
> > > > >     goto restart;
> > > > > }
> > > > > fail_cnt = 0;
> > > > There are three cases (slot index must be between q->tail and
> > > > q->head + q-
> > > > > 
> > > > > 
> > > > > size):
> > > > slot.idx == tail - size: slot is free, try to write it slot.idx == tail:
> > > > slot has just been
> > > > written (by other thread), skip to next slot (tail++) none of the above:
> > > > thread is
> > > > behind (at least one lap), re-load tail from q-
> > > > > 
> > > > > 
> > > > > tail
> > > > I think using the retry count actually delays catching up to a fresh
> > > > position.
> > > > 
> > > Miscommunication here -- by "catch up", I meant the case where the
> > > thread is behind but by less than one lap (the second case you
> > > describe). In the worst case, the thread would have to read N-1 (N =
> > > ring size) entries before reaching the next available entry, and N could
> > > easily
> > be in the thousands.
> > > 
> > > That's the performance variability I was referring to, and why I
> > > suggested capping the failed slot reads at 32. Maintaining a local
> > > fail_cnt variable is a small price to pay (relative to a CAS) to
> > > prevent a ring enqueue latency spike.
> > OK, I misunderstood. We can reload the private tail from the shared ring
> > buffer
> > tail earlier. You could actually do this on every failed CAS but I think
> > that would
> > be overkill. Your design with a retry limit is better, we need to find out
> > what is a
> > suitable value for the retry limit.
> > 
> Too late for lfring v2 unfortunately, but this idea will actually break lock-
> freedom. For example, say the tail index is 33 slots behind the next available
> slot. An enqueueing thread would get stuck retrying slots from tail index ->
> tail index + 32 until another thread updates the tail index. 
I don't think this is true.

If we hit the retry limit, we do: tail = _rte_lfring_reload(&r->tail, tail);
where we obtain a fresh copy of the shared ring->tail value but only use this if
it is "larger" (using serial number arithmetic) than our private tail variable.
If ring->tail hasn't been updated yet, we continue to use our private tail but
increment it to the next slot. We never go backwards.

static __rte_always_inline uint64_t
_rte_lfring_reload(const uint64_t *loc, uint64_t idx)
{
        uint64_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
        if (_rte_lfring_before(idx, fresh)) {
                /* fresh is after idx, use this instead */
                idx = fresh;
        } else {
                /* Continue with next slot */
                idx++;
        }
        return idx;
}
> 
> > 
> > > 
> > > 
> > > But you're right that we should still catch the 1+ lap behind case, so
> > > the reload criteria could be:
> > > 
> > > #define ENQ_RETRY_LIMIT 32 //arbitrary
> > > 
> > > if (old.e.idx != tail - size) {
> > >     if (++fail_cnt < ENQ_RETRY_LIMIT && old.e.idx == tail) {
> > >         tail++;
> > >     } else {
> > >         fail_cnt = 0;
> > >         tail = rte_lfring_reload(...);
> > >     }
> > >     goto restart;
> > > }
> > > fail_cnt = 0;
> > Agree.
> > 
> > > 
> > > 
> > > > 
> > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 3. Using a zero-length array to mark the start of the ring is a
> > > > > nice approach
> > > > > -- I'll incorporate that into the patchset.
> > > > > 
> > > > > At an algorithm level, I don't see any other differences.
> > > > > Implementation-wise, we'll need to nail the memory ordering flags
> > > > > to best support weak consistency machines, as you pointed out
> > > > > elsewhere.
> > > > There is no pre-acquisition of slots in enqueue and dequeue. That
> > > > separate step makes lock-freedom impossible (I think).
> > > > 
> > > Can you elaborate?
> > I think lock-freedom is impossible with the initial acquisition of elements.
> > This acquisition creates a side effect that cannot be undone or helped by
> > other
> > threads.
> > 
> > You can still implement a "non-blocking" ring buffer (like your original
> > design)
> > which hides any delay in threads that access the ring buffer but isn't
> > properly
> > lock-free (which could be considered unnecessary in a DPDK environment,
> > threads may get delayed but shouldn't die or block forever).
> > 
> I think I see what you're saying. For example If a dequeueing thread reserves
> N elements and then is preempted, it's effectively reduced the number of
> available elements by N during that period.
> 
> Practically speaking, I don't think this is a problem. Not just because
> threads shouldn't die or block forever, but if a thread can be preempted after
> reserving N ring slots (but before enqueueing to or dequeueing from them), the
> net effect is those N buffers are taken out of the system. This isn't really
> different than if it that thread was preempted *outside* of the ring functions
> while holding N buffers -- the application should be designed to be resilient
> to that.
Yes, practically speaking, this behaviour is unlikely to pose a problem for DPDK
applications. But the update of another piece of metadata will decrease
scalability (unless each of prod/cons head/tail is in a separate cache line but
that's a lot of mostly empty cache lines).

If we only need the non-blocking behavior in DPDK (to handle variable latency
and unexpected delays from threads that access the ring buffer or any data
structure that uses the ring buffer), there are other ways to achieve that that
will be more efficient than a lock-free ring like this or your nblk ring which
need to perform a CAS per element and then some extra CAS operations.

> 
> > 
> > > 
> > >  I don't currently see any other way to support rte_ring bulk
> > > semantics, which is necessary for creating a non-blocking mempool
> > > handler, so we should clear up any doubt.
> > OK, I wasn't aware of this strict dependency on bulk enqueue and dequeue.
> > Bulk
> > dequeue (mempool allocates buffers from the ring) should be easy to support
> > though. Bulk enqueue (mempool frees buffers to the ring) will work as long
> > as
> > the ring is large enough to hold all free buffers so it can never become
> > full (need
> > to reload head/tail pointers at the appropriate places to avoid spurious
> > full/empty status). I assume this is the case, initially all free buffers
> > will be stored
> > in the ring?
> > 
> > > 
> > > 
> > > In the NB ring patchset each thread reserves a number of slots before
> > > performing the enq/deq, but doesn't reserve *specific* slots (unlike
> > > rte_ring). This reservation is atomic, so that we never over-subscribe
> > > valid ring entries (for dequeue) or unused ring entries (for enqueue).
> > > This guarantees that the enq/deq operation will eventually complete,
> > > regardless of the behavior of other threads. This is why the enqueue
> > > loop doesn't check if space is available and the dequeue loop doesn't
> > > check if any more valid entries remain.
> > I initially thought these acquisitions were just for compatibility with
> > rte_ring
> > (update the same metadata so that the user can mix MP/MC and SP/SC calls)
> > but
> > realise they are there for the bulk enqueue/dequeue. But doing this
> > acquisition
> > means each enqueue or dequeue will update two metadata variables so creates
> > more contention and less scalability. I think it would be good if we could
> > provide
> > the bulk behaviour *without* this initial acquisition and only update one
> > metadata location per enqueue/dequeue operation.
> > 
> Agreed. I see lfring v2 has avoided this on the dequeue side, but I think it's
> unavoidable on the enqueue side -- since we can't write ring entries and CAS
> the tail pointer in one big atomic operation. AFAICT, slot reservations are
> unavoidable there to achieve bulk semantics.
For enqueue, yes unless you trust the ring to be large enough to be able to hold
all elements that might be enqueued. Isn't this the case for a mempool? All
buffers will originally be on the ring?

> 
> > 
> > > 
> > > 
> > > This sort of reservation should be compatible with lfring, but
> > > requires changes (e.g. two sets of head/tail pointers).
> > > 
> > <SNIP>
> > > 
> > > 
> > --
> > Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype
> > ola.liljedahl
  

Patch

diff --git a/config/common_base b/config/common_base
index 7c6da51..375f9c3 100644
--- a/config/common_base
+++ b/config/common_base
@@ -719,6 +719,11 @@  CONFIG_RTE_LIBRTE_PMD_IFPGA_RAWDEV=y
 CONFIG_RTE_LIBRTE_RING=y
 
 #
+# Compile librte_lfring
+#
+CONFIG_RTE_LIBRTE_LFRING=y
+
+#
 # Compile librte_mempool
 #
 CONFIG_RTE_LIBRTE_MEMPOOL=y
diff --git a/lib/Makefile b/lib/Makefile
index d6239d2..cd46339 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -12,6 +12,8 @@  DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
 DEPDIRS-librte_pci := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring
 DEPDIRS-librte_ring := librte_eal
+DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring
+DEPDIRS-librte_lfring := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool
 DEPDIRS-librte_mempool := librte_eal librte_ring
 DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf
diff --git a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile
new file mode 100644
index 0000000..c04d2e9
--- /dev/null
+++ b/lib/librte_lfring/Makefile
@@ -0,0 +1,22 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_lfring.a
+
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_lfring_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h
new file mode 100644
index 0000000..2550a35
--- /dev/null
+++ b/lib/librte_lfring/lockfree16.h
@@ -0,0 +1,144 @@ 
+//Copyright (c) 2018-2019, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier:        BSD-3-Clause
+
+#ifndef _LOCKFREE16_H
+#define _LOCKFREE16_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#define HAS_ACQ(mo) ((mo) != __ATOMIC_RELAXED && (mo) != __ATOMIC_RELEASE)
+#define HAS_RLS(mo) ((mo) == __ATOMIC_RELEASE || (mo) == __ATOMIC_ACQ_REL || (mo) == __ATOMIC_SEQ_CST)
+
+#define MO_LOAD(mo) (HAS_ACQ((mo)) ? __ATOMIC_ACQUIRE : __ATOMIC_RELAXED)
+#define MO_STORE(mo) (HAS_RLS((mo)) ? __ATOMIC_RELEASE : __ATOMIC_RELAXED)
+
+#if defined __aarch64__
+
+static inline __int128 ldx128(const __int128 *var, int mm)
+{
+	__int128 old;
+	if (mm == __ATOMIC_ACQUIRE)
+		__asm volatile("ldaxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("ldxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t stx128(__int128 *var, __int128 neu, int mm)
+{
+	uint32_t ret;
+	if (mm == __ATOMIC_RELEASE)
+		__asm volatile("stlxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("stxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return ret;
+}
+
+//Weak compare-exchange and non-atomic load on failure
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_success,
+				   int mo_failure)
+{
+	(void)mo_failure;//Ignore memory ordering for failure, memory order for
+	//success must be stronger or equal
+	int ldx_mo = MO_LOAD(mo_success);
+	int stx_mo = MO_STORE(mo_success);
+	register __int128 expected = *exp;
+	__asm __volatile("" ::: "memory");
+	//Atomicity of LDXP is not guaranteed
+	register __int128 old = ldx128(var, ldx_mo);
+	if (old == expected && !stx128(var, neu, stx_mo)) {
+		//Right value and STX succeeded
+		__asm __volatile("" ::: "memory");
+		return 1;
+	}
+	__asm __volatile("" ::: "memory");
+	//Wrong value or STX failed
+	*exp = old;//Old possibly torn value (OK for 'frail' flavour)
+	return 0;//Failure, *exp updated
+}
+
+#elif defined __x86_64__
+
+union u128
+{
+	struct {
+		uint64_t lo, hi;
+	} s;
+	__int128 ui;
+};
+
+static inline bool
+cmpxchg16b(__int128 *src, union u128 *cmp, union u128 with)
+{
+	bool result;
+	__asm__ __volatile__
+		("lock cmpxchg16b %1\n\tsetz %0"
+		 : "=q" (result), "+m" (*src), "+d" (cmp->s.hi), "+a" (cmp->s.lo)
+		 : "c" (with.s.hi), "b" (with.s.lo)
+		 : "cc", "memory"
+		);
+	return result;
+}
+
+static inline bool
+lockfree_compare_exchange_16(register __int128 *var,
+			     __int128 *exp,
+			     register __int128 neu,
+			     bool weak,
+			     int mo_success,
+			     int mo_failure)
+{
+	(void)weak;
+	(void)mo_success;
+	(void)mo_failure;
+	union u128 cmp, with;
+	cmp.ui = *exp;
+	with.ui = neu;
+	if (cmpxchg16b(var, &cmp, with)) {
+		return true;
+	} else {
+		*exp = cmp.ui;
+		return false;
+	}
+}
+
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_s,
+				   int mo_f)
+{
+	return lockfree_compare_exchange_16(var, exp, neu, true, mo_s, mo_f);
+}
+
+#else
+
+#error Unsupported architecture
+
+#endif
+
+#endif
diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build
new file mode 100644
index 0000000..c8b90cb
--- /dev/null
+++ b/lib/librte_lfring/meson.build
@@ -0,0 +1,6 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2017 Intel Corporation
+
+version = 1
+sources = files('rte_lfring.c')
+headers = files('rte_lfring.h','lockfree16.h')
diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c
new file mode 100644
index 0000000..2e3f1c8
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.c
@@ -0,0 +1,261 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2015 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_launch.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_spinlock.h>
+
+#include "rte_lfring.h"
+
+TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
+
+static struct rte_tailq_elem rte_lfring_tailq = {
+	.name = RTE_TAILQ_LFRING_NAME,
+};
+EAL_REGISTER_TAILQ(rte_lfring_tailq)
+
+/* true if x is a power of 2 */
+#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_lfring_get_memsize(unsigned count)
+{
+	ssize_t sz;
+
+	/* count must be a power of 2 */
+	if ((!POWEROF2(count)) || (count > 0x80000000U )) {
+		RTE_LOG(ERR, RING,
+			"Requested size is invalid, must be power of 2, and "
+			"do not exceed the size limit %u\n", 0x80000000U);
+		return -EINVAL;
+	}
+
+	sz = sizeof(struct rte_lfring) + count * sizeof(struct rte_lfr_element);
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	return sz;
+}
+
+int
+rte_lfring_init(struct rte_lfring *r, const char *name, unsigned count,
+		unsigned flags)
+{
+	int ret;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_lfring) &
+			  RTE_CACHE_LINE_MASK) != 0);
+
+	/* init the ring structure */
+	memset(r, 0, sizeof(*r));
+	ret = snprintf(r->name, sizeof(r->name), "%s", name);
+	if (ret < 0 || ret >= (int)sizeof(r->name))
+		return -ENAMETOOLONG;
+	r->flags = flags;
+
+	r->size = rte_align32pow2(count);
+	r->mask = r->size - 1;
+	r->head = 0;
+	r->tail = 0;
+	for (uint64_t i = 0; i < r->size; i++)
+	{
+	    r->ring[i].ptr = NULL;
+	    r->ring[i].idx = i - r->size;
+	}
+
+	return 0;
+}
+
+/* create the ring */
+struct rte_lfring *
+rte_lfring_create(const char *name, unsigned count, int socket_id,
+		  unsigned flags)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_lfring *r;
+	struct rte_tailq_entry *te;
+	const struct rte_memzone *mz;
+	ssize_t ring_size;
+	int mz_flags = 0;
+	struct rte_lfring_list* ring_list = NULL;
+	const unsigned int requested_count = count;
+	int ret;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	count = rte_align32pow2(count);
+
+	ring_size = rte_lfring_get_memsize(count);
+	if (ring_size < 0) {
+		rte_errno = ring_size;
+		return NULL;
+	}
+
+	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+		RTE_LFRING_MZ_PREFIX, name);
+	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+		rte_errno = ENAMETOOLONG;
+		return NULL;
+	}
+
+	te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* reserve a memory zone for this ring. If we can't get rte_config or
+	 * we are secondary process, the memzone_reserve function will set
+	 * rte_errno for us appropriately - hence no check in this this function */
+	mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,
+					 mz_flags, __alignof__(*r));
+	if (mz != NULL) {
+		r = mz->addr;
+		/* no need to check return value here, we already checked the
+		 * arguments above */
+		rte_lfring_init(r, name, requested_count, flags);
+
+		te->data = (void *) r;
+		r->memzone = mz;
+
+		TAILQ_INSERT_TAIL(ring_list, te, next);
+	} else {
+		r = NULL;
+		RTE_LOG(ERR, RING, "Cannot reserve memory\n");
+		rte_free(te);
+	}
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return r;
+}
+
+/* free the ring */
+void
+rte_lfring_free(struct rte_lfring *r)
+{
+	struct rte_lfring_list *ring_list = NULL;
+	struct rte_tailq_entry *te;
+
+	if (r == NULL)
+		return;
+
+	/*
+	 * Ring was not created with rte_lfring_create,
+	 * therefore, there is no memzone to free.
+	 */
+	if (r->memzone == NULL) {
+		RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_lfring_create()");
+		return;
+	}
+
+	if (rte_memzone_free(r->memzone) != 0) {
+		RTE_LOG(ERR, RING, "Cannot free memory\n");
+		return;
+	}
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* find out tailq entry */
+	TAILQ_FOREACH(te, ring_list, next) {
+		if (te->data == (void *) r)
+			break;
+	}
+
+	if (te == NULL) {
+		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+		return;
+	}
+
+	TAILQ_REMOVE(ring_list, te, next);
+
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	rte_free(te);
+}
+
+/* dump the status of the ring on the console */
+void
+rte_lfring_dump(FILE *f, const struct rte_lfring *r)
+{
+	fprintf(f, "ring <%s>@%p\n", r->name, r);
+	fprintf(f, "  flags=%x\n", r->flags);
+	fprintf(f, "  size=%"PRIu64"\n", r->size);
+	fprintf(f, "  tail=%"PRIu64"\n", r->tail);
+	fprintf(f, "  head=%"PRIu64"\n", r->head);
+	fprintf(f, "  used=%u\n", rte_lfring_count(r));
+	fprintf(f, "  avail=%u\n", rte_lfring_free_count(r));
+}
+
+/* dump the status of all rings on the console */
+void
+rte_lfring_list_dump(FILE *f)
+{
+	const struct rte_tailq_entry *te;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		rte_lfring_dump(f, (struct rte_lfring *) te->data);
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+}
+
+/* search a ring from its name */
+struct rte_lfring *
+rte_lfring_lookup(const char *name)
+{
+	struct rte_tailq_entry *te;
+	struct rte_lfring *r = NULL;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		r = (struct rte_lfring *) te->data;
+		if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
+			break;
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	if (te == NULL) {
+		rte_errno = ENOENT;
+		return NULL;
+	}
+
+	return r;
+}
diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h
new file mode 100644
index 0000000..f33bad6
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.h
@@ -0,0 +1,567 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#ifndef _RTE_LFRING_H_
+#define _RTE_LFRING_H_
+
+/**
+ * @file
+ * RTE Lfring
+ *
+ * The Lfring Manager is a fixed-size queue, implemented as a table of
+ * (pointers + counters).
+ *
+ * - FIFO (First In First Out)
+ * - Maximum size is fixed; the pointers are stored in a table.
+ * - Lock-free implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "lockfree16.h"
+
+#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
+
+#define RTE_LFRING_MZ_PREFIX "RG_"
+/**< The maximum length of an lfring name. */
+#define RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_LFRING_MZ_PREFIX) + 1)
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+struct rte_lfr_element
+{
+    void *ptr;
+    uintptr_t idx;
+};
+
+/**
+ * An RTE lfring structure.
+ *
+ */
+struct rte_lfring {
+	char name[RTE_LFRING_NAMESIZE] __rte_cache_aligned; /**< Name of the lfring. */
+	const struct rte_memzone *memzone;
+			/**< Memzone, if any, containing the rte_lfring */
+
+	char pad0 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by producer. */
+	uint64_t tail __rte_cache_aligned;
+	uint64_t size;	   /**< Size of ring. */
+	uint64_t mask;	   /**< Mask (size-1) of lfring. */
+	uint32_t flags;	   /**< Flags supplied at creation. */
+	char pad1 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by consumer. */
+	uint64_t head __rte_cache_aligned;
+	char pad2 __rte_cache_aligned; /**< empty cache line */
+
+	struct rte_lfr_element ring[] __rte_cache_aligned;
+};
+
+#define LFRING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define LFRING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+
+/**
+ * Calculate the memory size needed for a lfring
+ *
+ * This function returns the number of bytes needed for a lfring, given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_lfring and the size of the memory needed by the
+ * objects pointers. The value is aligned to a cache line size.
+ *
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the lfring on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+ssize_t rte_lfring_get_memsize(unsigned count);
+
+/**
+ * Initialize a lfring structure.
+ *
+ * Initialize a lfring structure in memory pointed by "r". The size of the
+ * memory area must be large enough to store the lfring structure and the
+ * object table. It is advised to use rte_lfring_get_memsize() to get the
+ * appropriate size.
+ *
+ * The lfring size is set to *count*, which must be a power of two. Water
+ * marking is disabled by default. The real usable lfring size is
+ * *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ *   The pointer to the lfring structure followed by the objects table.
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @param flags
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned count,
+		    unsigned flags);
+
+/**
+ * Create a new lfring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_lfring_init() to initialize an empty lfring.
+ *
+ * The new lfring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable lfring size
+ * is *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is added in RTE_TAILQ_LFRING list.
+ *
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The size of the lfring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated lfring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+struct rte_lfring *rte_lfring_create(const char *name, unsigned count,
+				     int socket_id, unsigned flags);
+/**
+ * De-allocate all memory used by the lfring.
+ *
+ * @param r
+ *   Lfring to free
+ */
+void rte_lfring_free(struct rte_lfring *r);
+
+/**
+ * Dump the status of the lfring to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param r
+ *   A pointer to the lfring structure.
+ */
+void rte_lfring_dump(FILE *f, const struct rte_lfring *r);
+
+/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
+static __rte_always_inline bool
+rte_lfring_before(uint64_t a, uint64_t b)
+{
+    return (int64_t)(a - b) < 0;
+}
+
+static __rte_always_inline uint64_t
+rte_lfring_update(uint64_t *loc, uint64_t neu)
+{
+	uint64_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	do {
+		if (rte_lfring_before(neu, old)) {
+			/* neu < old */
+			return old;
+		}
+		/* Else neu > old, need to update *loc */
+	} while (!__atomic_compare_exchange_n(loc,
+					      &old,
+					      neu,
+					      /*weak=*/true,
+					      __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+	return neu;
+}
+
+static __rte_always_inline uint64_t
+rte_lfring_reload(const uint64_t *loc, uint64_t idx)
+{
+	uint64_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	if (rte_lfring_before(idx, fresh)) {
+		/* fresh is after idx, use this instead */
+		idx = fresh;
+	} else {
+		/* Continue with next slot */
+		idx++;
+	}
+	return idx;
+}
+
+static __rte_always_inline void
+rte_lfring_enq_elems(void *const *elems,
+		     struct rte_lfr_element *ring,
+		     uint64_t mask,
+		     uint64_t idx,
+		     uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++) {
+		ring0[i].ptr = *elems++;
+		ring0[i].idx = idx++;
+	}
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++) {
+			ring[i].ptr = *elems++;
+			ring[i].idx = idx++;
+		}
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk_sp(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Single-producer */
+	uint64_t mask = r->mask;
+	uint64_t size = mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_ACQUIRE);
+	int64_t actual = RTE_MIN((int64_t)(head + size - tail), (int64_t)nelems);
+	if (actual <= 0)
+		return 0;
+	rte_lfring_enq_elems(elems, r->ring, mask, tail, actual);
+	__atomic_store_n(&r->tail, tail + actual, __ATOMIC_RELEASE);
+	if (free_space != NULL)
+		*free_space = head + size - (tail + actual);
+	return actual;
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk_mp(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Lock-free multi-producer */
+	uint64_t mask = r->mask;
+	uint64_t size = mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t actual = 0;
+restart:
+	while (actual < nelems &&
+	       rte_lfring_before(tail,
+			__atomic_load_n(&r->head, __ATOMIC_ACQUIRE)) + size) {
+		union {
+			struct rte_lfr_element e;
+			__int128 ui;
+		} old, neu;
+		struct rte_lfr_element *slot = &r->ring[tail & mask];
+		old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
+		old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
+		do {
+			if (old.e.idx != tail - size) {
+				if (old.e.idx != tail) {
+					/* We are far behind,
+					   restart with fresh index */
+					tail = rte_lfring_reload(&r->tail, tail);
+					goto restart;
+				} else {
+					/* Slot already enqueued,
+					   try next slot */
+					tail++;
+					goto restart;
+				}
+			}
+			/* Found slot that was used one lap back,
+			   try to enqueue next element */
+			neu.e.ptr = elems[actual];
+			neu.e.idx = tail;/* Set idx on enqueue */
+		} while (!lockfree_compare_exchange_16_frail((__int128 *)slot,
+							     &old.ui,
+							     neu.ui,
+							     __ATOMIC_RELEASE,
+							     __ATOMIC_RELAXED));
+		/* Enqueue succeeded */
+		actual++;
+		/* Continue with next slot */
+		tail++;
+	}
+	tail = rte_lfring_update(&r->tail, tail);
+	if (free_space != NULL)
+		*free_space = __atomic_load_n(&r->head, __ATOMIC_RELAXED) + size - tail;
+	return actual;
+}
+
+/**
+ * Enqueue several objects on an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to enqueue on the lfring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the lfring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, between 0 and n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk(struct rte_lfring *r,
+			void * const *elems,
+			unsigned int nelems,
+			unsigned int *free_space)
+{
+	if (r->flags & LFRING_F_SP_ENQ)
+		return rte_lfring_enqueue_bulk_sp(r, elems, nelems, free_space);
+	else
+		return rte_lfring_enqueue_bulk_mp(r, elems, nelems, free_space);
+}
+
+static __rte_always_inline void
+rte_lfring_deq_elems(void **elems,
+		     const struct rte_lfr_element *ring,
+		     uint64_t mask,
+		     uint64_t idx,
+		     uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	const struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++)
+		*elems++ = ring0[i].ptr;
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++)
+			*elems++ = ring[i].ptr;
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk_sc(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Single-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+	actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+	if (unlikely(actual <= 0))
+		return 0;
+	rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+	__atomic_store_n(&r->head, head + actual, __ATOMIC_RELEASE);
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk_mc(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Lock-free multi-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+	do {
+		actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+		if (unlikely(actual <= 0))
+			return 0;
+		rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+	} while (!__atomic_compare_exchange_n(&r->head,
+					      &head,
+					      head + actual,
+					      /*weak*/false,
+					      __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+/**
+ * Dequeue several objects from an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to dequeue from the lfring to the obj_table.
+ * @param available
+ *   Returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   The number of objects dequeued, from 0 to n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk(struct rte_lfring *r,
+			void **elems,
+			unsigned int nelems,
+			unsigned int *available)
+{
+	if (r->flags & LFRING_F_SC_DEQ)
+		return rte_lfring_dequeue_bulk_sc(r, elems, nelems, available);
+	else
+		return rte_lfring_dequeue_bulk_mc(r, elems, nelems, available);
+}
+
+/**
+ * Return the number of entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_count(const struct rte_lfring *r)
+{
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t avail = tail - head;
+	return (int64_t)avail > 0 ? avail : 0;
+}
+
+/**
+ * Return the number of free entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of free entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_free_count(const struct rte_lfring *r)
+{
+	return r->size - rte_lfring_count(r);
+}
+
+/**
+ * Test if a lfring is full.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is full.
+ *   - 0: The lfring is not full.
+ */
+static inline int
+rte_lfring_full(const struct rte_lfring *r)
+{
+	return rte_lfring_free_count(r) == 0;
+}
+
+/**
+ * Test if a lfring is empty.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is empty.
+ *   - 0: The lfring is not empty.
+ */
+static inline int
+rte_lfring_empty(const struct rte_lfring *r)
+{
+	return rte_lfring_count(r) == 0;
+}
+
+/**
+ * Return the size of the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The size of the data store used by the lfring.
+ *   NOTE: this is not the same as the usable space in the lfring. To query that
+ *   use ``rte_lfring_get_capacity()``.
+ */
+static inline unsigned int
+rte_lfring_get_size(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Return the number of elements which can be stored in the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The usable size of the lfring.
+ */
+static inline unsigned int
+rte_lfring_get_capacity(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Dump the status of all lfrings on the console
+ *
+ * @param f
+ *   A pointer to a file for output
+ */
+void rte_lfring_list_dump(FILE *f);
+
+/**
+ * Search a lfring from its name
+ *
+ * @param name
+ *   The name of the lfring.
+ * @return
+ *   The pointer to the lfring matching the name, or NULL if not found,
+ *   with rte_errno set appropriately. Possible rte_errno values include:
+ *    - ENOENT - required entry not available to return.
+ */
+struct rte_lfring *rte_lfring_lookup(const char *name);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_LFRING_H_ */
diff --git a/lib/librte_lfring/rte_lfring_version.map b/lib/librte_lfring/rte_lfring_version.map
new file mode 100644
index 0000000..d935efd
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring_version.map
@@ -0,0 +1,19 @@ 
+DPDK_2.0 {
+	global:
+
+	rte_ring_create;
+	rte_ring_dump;
+	rte_ring_get_memsize;
+	rte_ring_init;
+	rte_ring_list_dump;
+	rte_ring_lookup;
+
+	local: *;
+};
+
+DPDK_2.2 {
+	global:
+
+	rte_ring_free;
+
+} DPDK_2.0;