[dpdk-dev,v2,1/8] event/opdl: add the opdl ring infrastructure library

Message ID 1513337189-137661-2-git-send-email-liang.j.ma@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Liang, Ma Dec. 15, 2017, 11:26 a.m. UTC
  OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
provide the core data structure and core helper function set. The Ring
implements a single ring multi-port/stage pipelined packet distribution
mechanism. This mechanism has the following characteristics:

• No multiple queue cost, therefore, latency is significant reduced.
• Fixed dependencies between queue/ports is more suitable for complex.
  fixed pipelines of stateless packet processing (static pipeline).
• Has decentralized distribution (no scheduling core).
• Packets remain in order (no reorder core(s)).

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/Makefile                       |   66 ++
 drivers/event/opdl/opdl_ring.c                    | 1232 +++++++++++++++++++++
 drivers/event/opdl/opdl_ring.h                    |  601 ++++++++++
 drivers/event/opdl/rte_pmd_evdev_opdl_version.map |    3 +
 4 files changed, 1902 insertions(+)
 create mode 100644 drivers/event/opdl/Makefile
 create mode 100644 drivers/event/opdl/opdl_ring.c
 create mode 100644 drivers/event/opdl/opdl_ring.h
 create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
  

Comments

Neil Horman Dec. 15, 2017, 12:38 p.m. UTC | #1
On Fri, Dec 15, 2017 at 11:26:22AM +0000, Liang Ma wrote:
> OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
> provide the core data structure and core helper function set. The Ring
> implements a single ring multi-port/stage pipelined packet distribution
> mechanism. This mechanism has the following characteristics:
> 
> • No multiple queue cost, therefore, latency is significant reduced.
> • Fixed dependencies between queue/ports is more suitable for complex.
>   fixed pipelines of stateless packet processing (static pipeline).
> • Has decentralized distribution (no scheduling core).
> • Packets remain in order (no reorder core(s)).
> 
> Signed-off-by: Liang Ma <liang.j.ma@intel.com>
> Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
> ---
>  drivers/event/opdl/Makefile                       |   66 ++
>  drivers/event/opdl/opdl_ring.c                    | 1232 +++++++++++++++++++++
>  drivers/event/opdl/opdl_ring.h                    |  601 ++++++++++
>  drivers/event/opdl/rte_pmd_evdev_opdl_version.map |    3 +
>  4 files changed, 1902 insertions(+)
>  create mode 100644 drivers/event/opdl/Makefile
>  create mode 100644 drivers/event/opdl/opdl_ring.c
>  create mode 100644 drivers/event/opdl/opdl_ring.h
>  create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> 
><snip>

> +
> +#endif  /* _OPDL_H_ */
> diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> new file mode 100644
> index 0000000..5352e7e
> --- /dev/null
> +++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> @@ -0,0 +1,3 @@
> +DPDK_17.05 {
> +	local: *;
> +};
you need to ennumerate the functions you want to globally export here, or this won't work
as a shared library.  This also suggests you haven't tested this as a DSO yet,
please do so.

Neil

> -- 
> 2.7.5
> 
> --------------------------------------------------------------
> Intel Research and Development Ireland Limited
> Registered in Ireland
> Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> Registered Number: 308263
> 
> 
> This e-mail and any attachments may contain confidential material for the sole
> use of the intended recipient(s). Any review or distribution by others is
> strictly prohibited. If you are not the intended recipient, please contact the
> sender and delete all copies.
  
Liang, Ma Dec. 15, 2017, 1:50 p.m. UTC | #2
On 15 Dec 07:38, Neil Horman wrote:
> On Fri, Dec 15, 2017 at 11:26:22AM +0000, Liang Ma wrote:
> > OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
> > provide the core data structure and core helper function set. The Ring
> > implements a single ring multi-port/stage pipelined packet distribution
> > mechanism. This mechanism has the following characteristics:
> > 
> > • No multiple queue cost, therefore, latency is significant reduced.
> > • Fixed dependencies between queue/ports is more suitable for complex.
> >   fixed pipelines of stateless packet processing (static pipeline).
> > • Has decentralized distribution (no scheduling core).
> > • Packets remain in order (no reorder core(s)).
> > 
> > Signed-off-by: Liang Ma <liang.j.ma@intel.com>
> > Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
> > ---
> >  drivers/event/opdl/Makefile                       |   66 ++
> >  drivers/event/opdl/opdl_ring.c                    | 1232 +++++++++++++++++++++
> >  drivers/event/opdl/opdl_ring.h                    |  601 ++++++++++
> >  drivers/event/opdl/rte_pmd_evdev_opdl_version.map |    3 +
> >  4 files changed, 1902 insertions(+)
> >  create mode 100644 drivers/event/opdl/Makefile
> >  create mode 100644 drivers/event/opdl/opdl_ring.c
> >  create mode 100644 drivers/event/opdl/opdl_ring.h
> >  create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > 
> ><snip>
> 
> > +
> > +#endif  /* _OPDL_H_ */
> > diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > new file mode 100644
> > index 0000000..5352e7e
> > --- /dev/null
> > +++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > @@ -0,0 +1,3 @@
> > +DPDK_17.05 {
> > +	local: *;
> > +};
> you need to ennumerate the functions you want to globally export here, or this won't work
> as a shared library.  This also suggests you haven't tested this as a DSO yet,
> please do so.
> 
> Neil
FYI, the subject of email has indicated that's part of the PMD.
there is no need to export any function globally.
you can reference 
drivers/event/octeontx/rte_pmd_octeontx_ssovf_version.map
/drivers/event/dpaa2/rte_pmd_dpaa2_event_version.map
drivers/event/sw/rte_pmd_sw_event_version.map

BTW: I do test with shared library build. 
> 
> > -- 
> > 2.7.5
> > 
> > --------------------------------------------------------------
> > Intel Research and Development Ireland Limited
> > Registered in Ireland
> > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> > Registered Number: 308263
> > 
> > 
> > This e-mail and any attachments may contain confidential material for the sole
> > use of the intended recipient(s). Any review or distribution by others is
> > strictly prohibited. If you are not the intended recipient, please contact the
> > sender and delete all copies.
  
Neil Horman Dec. 15, 2017, 9:23 p.m. UTC | #3
On Fri, Dec 15, 2017 at 01:50:41PM +0000, Ma, Liang wrote:
> On 15 Dec 07:38, Neil Horman wrote:
> > On Fri, Dec 15, 2017 at 11:26:22AM +0000, Liang Ma wrote:
> > > OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
> > > provide the core data structure and core helper function set. The Ring
> > > implements a single ring multi-port/stage pipelined packet distribution
> > > mechanism. This mechanism has the following characteristics:
> > > 
> > > • No multiple queue cost, therefore, latency is significant reduced.
> > > • Fixed dependencies between queue/ports is more suitable for complex.
> > >   fixed pipelines of stateless packet processing (static pipeline).
> > > • Has decentralized distribution (no scheduling core).
> > > • Packets remain in order (no reorder core(s)).
> > > 
> > > Signed-off-by: Liang Ma <liang.j.ma@intel.com>
> > > Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
> > > ---
> > >  drivers/event/opdl/Makefile                       |   66 ++
> > >  drivers/event/opdl/opdl_ring.c                    | 1232 +++++++++++++++++++++
> > >  drivers/event/opdl/opdl_ring.h                    |  601 ++++++++++
> > >  drivers/event/opdl/rte_pmd_evdev_opdl_version.map |    3 +
> > >  4 files changed, 1902 insertions(+)
> > >  create mode 100644 drivers/event/opdl/Makefile
> > >  create mode 100644 drivers/event/opdl/opdl_ring.c
> > >  create mode 100644 drivers/event/opdl/opdl_ring.h
> > >  create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > > 
> > ><snip>
> > 
> > > +
> > > +#endif  /* _OPDL_H_ */
> > > diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > > new file mode 100644
> > > index 0000000..5352e7e
> > > --- /dev/null
> > > +++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > > @@ -0,0 +1,3 @@
> > > +DPDK_17.05 {
> > > +	local: *;
> > > +};
> > you need to ennumerate the functions you want to globally export here, or this won't work
> > as a shared library.  This also suggests you haven't tested this as a DSO yet,
> > please do so.
> > 
> > Neil
> FYI, the subject of email has indicated that's part of the PMD.
I apologize, I didn't realize it was its own pmd, rather than part of the other
pmds.  I missed the pathing and thought you were creating a separate library for
others to consume, rather than a PMD to be accessed via the ethdev api

> there is no need to export any function globally.
> you can reference 
> drivers/event/octeontx/rte_pmd_octeontx_ssovf_version.map
> /drivers/event/dpaa2/rte_pmd_dpaa2_event_version.map
> drivers/event/sw/rte_pmd_sw_event_version.map
> 
> BTW: I do test with shared library build. 
What did you test with?  I ask because with gcc version 7.2.1, the build throws
several warnings that error the build out:

/home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c: In function ‘create_queues_and_rings’:
/home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:17: error: ‘%s’ directive writing up to 63 bytes into a region of size 32 [-Werror=format-overflow=]
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
                 ^~
/home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:2: note: ‘sprintf’ output between 3 and 75 bytes into a destination of size 32
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
  ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:17: error: ‘%s’ directive writing up to 63 bytes into a region of size 32 [-Werror=format-overflow=]
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
                 ^~
/home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:2: note: ‘sprintf’ output between 3 and 75 bytes into a destination of size 32
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
  ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
cc1: all warnings being treated as errors


It looks like you're trying to write a string to a 32 byte array, but the first
component of that string (service_name) can be as long as 63 bytes, and the
nb_opdls can be up to 75 bytes long. I'm guessing you want to use snprintf there
rather than sprintf with a limit of RTE_MEMZONE_NAMESIZE - LIB_NAME (to account
for the added characters in opdl_ring_create?

Neil

> > 
> > > -- 
> > > 2.7.5
> > > 
> > > --------------------------------------------------------------
> > > Intel Research and Development Ireland Limited
> > > Registered in Ireland
> > > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> > > Registered Number: 308263
> > > 
> > > 
> > > This e-mail and any attachments may contain confidential material for the sole
> > > use of the intended recipient(s). Any review or distribution by others is
> > > strictly prohibited. If you are not the intended recipient, please contact the
> > > sender and delete all copies.
>
  
Jerin Jacob Dec. 16, 2017, 10:14 a.m. UTC | #4
-----Original Message-----
> Date: Fri, 15 Dec 2017 11:26:22 +0000
> From: Liang Ma <liang.j.ma@intel.com>
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com,
>  deepak.k.jain@intel.com, john.geary@intel.com, peter.mccarthy@intel.com,
>  seanbh@gmail.com
> Subject: [PATCH v2 1/8] event/opdl:  add the opdl ring infrastructure
>  library
> X-Mailer: git-send-email 2.7.5
> 
> OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
> provide the core data structure and core helper function set. The Ring
> implements a single ring multi-port/stage pipelined packet distribution
> mechanism. This mechanism has the following characteristics:
> 
> • No multiple queue cost, therefore, latency is significant reduced.
> • Fixed dependencies between queue/ports is more suitable for complex.
>   fixed pipelines of stateless packet processing (static pipeline).
> • Has decentralized distribution (no scheduling core).
> • Packets remain in order (no reorder core(s)).
> 
> Signed-off-by: Liang Ma <liang.j.ma@intel.com>
> Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>

1) Invalid Signed-off-by format. "," after first name.


2) There are some compilation issues with series
/export/dpdk-next-eventdev/drivers/event/opdl/opdl_evdev_init.c: In
function ‘create_queues_and_rings’: /export/dpdk-next-eventdev/drivers/event/opdl/opdl_evdev_init.c:570:17:
error: ‘%s’ directive writing up to 63 bytes into a region of size 32
[-Werror=format-overflow=]
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
                 ^~
/export/dpdk-next-eventdev/drivers/event/opdl/opdl_evdev_init.c:570:2: note: ‘sprintf’ output between 3 and 75 bytes into a destination of size
32
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
  ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/export/dpdk-next-eventdev/drivers/event/opdl/opdl_evdev_init.c:570:17: error: ‘%s’ directive writing up to 63 bytes into a region of size 32
[-Werror=format-overflow=]
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
                 ^~
/export/dpdk-next-eventdev/drivers/event/opdl/opdl_evdev_init.c:570:2: note: ‘sprintf’ output between 3 and 75 bytes into a destination of size 32
  sprintf(name, "%s_%u", device->service_name, device->nb_opdls);

3) Please rebase to next-eventdev tree. Gage already added a new capability flag


> ---
> +
> +# library name
> +LIB = librte_pmd_opdl_event.a
> +
> +# build flags
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS)
> +# for older GCC versions, allow us to initialize an event using
> +# designated initializers.
> +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
> +ifeq ($(shell test $(GCC_VERSION) -le 50 && echo 1), 1)
> +CFLAGS += -Wno-missing-field-initializers
> +endif
> +endif
> +
> +LDLIBS += -lrte_eal -lrte_eventdev -lrte_kvargs -lrte_ring

Does it have -lrte_ring dependency?

> +LDLIBS += -lrte_bus_vdev -lrte_mbuf -lrte_mempool
> +
> +# library version
> +LIBABIVER := 1
> +
> +# versioning export map
> +EXPORT_MAP := rte_pmd_evdev_opdl_version.map
> +
> +# library source files
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_init.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_ring.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_xstats.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_test.c

Each patch should be build able, Add the files when you really add the
source code.


> +
> +# export include files
> +SYMLINK-y-include +=
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
> new file mode 100644
> index 0000000..5120fbe
> --- /dev/null
> +++ b/drivers/event/opdl/opdl_ring.c
> @@ -0,0 +1,1232 @@
> +/*-
> + * <COPYRIGHT_TAG>

??

> + */
> +
> +#include <stdbool.h>
> +#include <stddef.h>
> +#include <stdint.h>
> +#include <stdio.h>
> +
> +#include <rte_branch_prediction.h>
> +#include <rte_debug.h>
> +#include <rte_lcore.h>
> +#include <rte_log.h>
> +#include <rte_malloc.h>
> +#include <rte_memcpy.h>
> +#include <rte_memory.h>
> +#include <rte_memzone.h>
> +#include <rte_eal_memconfig.h>
> +
> +#include "opdl_ring.h"
> +
> +#define LIB_NAME "opdl_ring"
> +
> +#define OPDL_NAME_SIZE 64
> +
> +#define RTE_LOGTYPE_OPDL RTE_LOGTYPE_USER8
> +#define log(level, fmt, ...) \
> +		RTE_LOG(level, OPDL, LIB_NAME": " fmt "\n", ##__VA_ARGS__)
> +
> +#ifdef OPDL_DEBUG
> +#define log_debug(...) log(DEBUG, __VA_ARGS__)
> +#else
> +#define log_debug(...)
> +#endif

For new PMDs, Use dynamic logging


> +
> +#define POWER_OF_2(n) ((n) && !((n) & ((n) - 1)))

I guess, it is available as standard DPDK macro in common area. If not,
create new one.

> +
> +#define RTE_EVENT_MASK  (0xFFFF0000000FFFFFULL)

Please don't use RTE_ for PMD specific marcos.

> +
> +/* Types of dependency between stages */
> +enum dep_type {
> +	DEP_NONE = 0,  /* no dependency */
> +	DEP_DIRECT,  /* stage has direct dependency */
> +	DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
> +	DEP_SELF,  /* stage dependency on itself, used to detect loops */
> +};
> +
> +/* Shared section of stage state.
> + * Care is needed when accessing and the layout is important, especially to
> + * limit the adjacent cache-line HW prefetcher from impacting performance.
> + */
> +struct shared_state {
> +	/* Last known minimum sequence number of dependencies, used for multi
> +	 * thread operation
> +	 */
> +	uint32_t available_seq;
> +	char _pad1[RTE_CACHE_LINE_SIZE * 3];
> +	uint32_t head;  /* Head sequence number (for multi thread operation) */
> +	char _pad2[RTE_CACHE_LINE_SIZE * 3];
> +	struct opdl_stage *stage;  /* back pointer */
> +	uint32_t tail;  /* Tail sequence number */
> +	char _pad3[RTE_CACHE_LINE_SIZE * 2];
> +} __rte_cache_aligned;
> +
> +/* A structure to keep track of "unfinished" claims. This is only used for
> + * stages that are threadsafe. Each lcore accesses its own instance of this
> + * structure to record the entries it has claimed. This allows one lcore to make
> + * multiple claims without being blocked by another. When disclaiming it moves
> + * forward the shared tail when the shared tail matches the tail value recorded
> + * here.
> + */
> +struct claim_manager {
> +	uint32_t num_to_disclaim;
> +	uint32_t num_claimed;
> +	uint32_t mgr_head;
> +	uint32_t mgr_tail;
> +	struct {
> +		uint32_t head;
> +		uint32_t tail;
> +	} claims[OPDL_DISCLAIMS_PER_LCORE];
> +} __rte_cache_aligned;
> +
> +/* Context for each stage of opdl_ring.
> + * Calculations on sequence numbers need to be done with other uint32_t values
> + * so that results are modulus 2^32, and not undefined.
> + */
> +struct opdl_stage {
> +	struct opdl_ring *t;  /* back pointer, set at init */
> +	uint32_t num_slots;  /* Number of slots for entries, set at init */
> +	uint32_t index;  /* ID for this stage, set at init */
> +	bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
> +	/* Last known min seq number of dependencies for used for single thread
> +	 * operation
> +	 */
> +	uint32_t available_seq;
> +	uint32_t head;  /* Current head for single-thread operation */
> +	uint32_t shadow_head;  /* Shadow head for single-thread operation */
> +	uint32_t nb_instance;  /* Number of instances */
> +	uint32_t instance_id;  /* ID of this stage instance */
> +	uint16_t num_claimed;  /* Number of slots claimed */
> +	uint16_t num_event;		/* Number of events */
> +	uint32_t seq;			/* sequence number  */
> +	uint32_t num_deps;  /* Number of direct dependencies */
> +	/* Keep track of all dependencies, used during init only */
> +	enum dep_type *dep_tracking;
> +	/* Direct dependencies of this stage */
> +	struct shared_state **deps;
> +	/* Other stages read this! */
> +	struct shared_state shared __rte_cache_aligned;
> +	/* For managing disclaims in multi-threaded processing stages */
> +	struct claim_manager pending_disclaims[RTE_MAX_LCORE]
> +					       __rte_cache_aligned;
> +} __rte_cache_aligned;
> +
> +/* Context for opdl_ring */
> +struct opdl_ring {
> +	char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
> +	int socket;  /* NUMA socket that memory is allocated on */
> +	uint32_t num_slots;  /* Number of slots for entries */
> +	uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
> +	uint32_t slot_size;  /* Size of each slot in bytes */
> +	uint32_t num_stages;  /* Number of stages that have been added */
> +	uint32_t max_num_stages;  /* Max number of stages */
> +	/* Stages indexed by ID */
> +	struct opdl_stage *stages;
> +	/* Memory for storing slot data */
> +	uint8_t slots[0] __rte_cache_aligned;
> +};
> +
> +
> +/* Return input stage of a opdl_ring */
> +static inline struct opdl_stage *__attribute__((always_inline))

Change to __rte_always_inline everywhere in the driver.

> +input_stage(const struct opdl_ring *t)
> +{
> +	return &t->stages[0];
> +}
> +
> +}
> +
> +/* Move head atomically, returning number of entries available to process and
> + * the original value of head. For non-input stages, the claim is recorded
> + * so that the tail can be updated later by opdl_stage_disclaim().
> + */
> +static inline void __attribute__((always_inline))
> +move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
> +		uint32_t *old_head, bool block, bool claim_func)
> +{
> +	uint32_t orig_num_entries = *num_entries;
> +	uint32_t ret;
> +	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
> +
> +	/* Attempt to disclaim any outstanding claims */
> +	opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
> +			false);
> +
> +	*old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);

I guess __atomic introduced after gcc 4.7.
Make sure the PMD does not build if __atomic_* not available.
See CONFIG_RTE_LIBRTE_THUNDERX_NICVF_PMD file in mk/toolchain/icc/rte.toolchain-compat.mk


> diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
> new file mode 100644
> index 0000000..cc37bd1
> --- /dev/null
> +++ b/drivers/event/opdl/opdl_ring.h
> @@ -0,0 +1,601 @@
> +/*-te
> + *
> + * <COPYRIGHT_TAG>

??

> + */
> +
> +#ifndef _OPDL_H_
> +#define _OPDL_H_
> +
> +/**
> + * @file
> + * The "opdl_ring" is a data structure that contains a fixed number of slots,
> + * with each slot having the same, but configurable, size. Entries are input
> + * into the opdl_ring by copying into available slots. Once in the opdl_ring,
> + * an entry is processed by a number of stages, with the ordering of stage
> + * processing controlled by making stages dependent on one or more other stages.
> + * An entry is not available for a stage to process until it has been processed
> + * by that stages dependencies. Entries are always made available for
> + * processing in the same order that they were input in to the opdl_ring.
> + * Inputting is considered as a stage that depends on all other stages,
> + * and is also a dependency of all stages.
> + *
> + * Inputting and processing in a stage can support multi-threading. Note that
> + * multi-thread processing can also be done by making stages co-operate e.g. two
> + * stages where one processes the even packets and the other processes odd
> + * packets.
> + *
> + * A opdl_ring can be used as the basis for pipeline based applications. Instead
> + * of each stage in a pipeline dequeueing from a ring, processing and enqueueing
> + * to another ring, it can process entries in-place on the ring. If stages do
> + * not depend on each other, they can run in parallel.
> + *
> + * The opdl_ring works with entries of configurable size, these could be
> + * pointers to mbufs, pointers to mbufs with application specific meta-data,
> + * tasks etc.
> + */
> +
> +#include <stdbool.h>
> +#include <stdint.h>
> +#include <stdio.h>
> +
> +#include <rte_eventdev.h>
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#ifndef OPDL_DISCLAIMS_PER_LCORE

Move this configuration to base/config or even better to expose as devargs

> +/** Multi-threaded processing allows one thread to process multiple batches in a
> + * stage, while another thread is processing a single large batch. This number
> + * controls how many non-contiguous batches one stage can process before being
> + * blocked by the other stage.
> + */
> +#define OPDL_DISCLAIMS_PER_LCORE 8
> +#endif
> +
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif  /* _OPDL_H_ */
> diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> new file mode 100644
> index 0000000..5352e7e
> --- /dev/null
> +++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> @@ -0,0 +1,3 @@
> +DPDK_17.05 {

DPDK_18.02

> +	local: *;
> +};
> -- 
> 2.7.5
> 
> --------------------------------------------------------------
> Intel Research and Development Ireland Limited
> Registered in Ireland
> Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> Registered Number: 308263
> 
> 
> This e-mail and any attachments may contain confidential material for the sole
> use of the intended recipient(s). Any review or distribution by others is
> strictly prohibited. If you are not the intended recipient, please contact the
> sender and delete all copies.

Remove such notice from public mailing lists.
  
Liang, Ma Dec. 18, 2017, 11:05 a.m. UTC | #5
On 15 Dec 16:23, Neil Horman wrote:
> On Fri, Dec 15, 2017 at 01:50:41PM +0000, Ma, Liang wrote:
> > On 15 Dec 07:38, Neil Horman wrote:
> > > On Fri, Dec 15, 2017 at 11:26:22AM +0000, Liang Ma wrote:
> > > > OPDL ring is the core infrastructure of OPDL PMD. OPDL ring library
> > > > provide the core data structure and core helper function set. The Ring
> > > > implements a single ring multi-port/stage pipelined packet distribution
> > > > mechanism. This mechanism has the following characteristics:
> > > > 
> > > > • No multiple queue cost, therefore, latency is significant reduced.
> > > > • Fixed dependencies between queue/ports is more suitable for complex.
> > > >   fixed pipelines of stateless packet processing (static pipeline).
> > > > • Has decentralized distribution (no scheduling core).
> > > > • Packets remain in order (no reorder core(s)).
> > > > 
> > > > Signed-off-by: Liang Ma <liang.j.ma@intel.com>
> > > > Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
> > > > ---
> > > >  drivers/event/opdl/Makefile                       |   66 ++
> > > >  drivers/event/opdl/opdl_ring.c                    | 1232 +++++++++++++++++++++
> > > >  drivers/event/opdl/opdl_ring.h                    |  601 ++++++++++
> > > >  drivers/event/opdl/rte_pmd_evdev_opdl_version.map |    3 +
> > > >  4 files changed, 1902 insertions(+)
> > > >  create mode 100644 drivers/event/opdl/Makefile
> > > >  create mode 100644 drivers/event/opdl/opdl_ring.c
> > > >  create mode 100644 drivers/event/opdl/opdl_ring.h
> > > >  create mode 100644 drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > > > 
> > > ><snip>
> > > 
> > > > +
> > > > +#endif  /* _OPDL_H_ */
> > > > diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > > > new file mode 100644
> > > > index 0000000..5352e7e
> > > > --- /dev/null
> > > > +++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
> > > > @@ -0,0 +1,3 @@
> > > > +DPDK_17.05 {
> > > > +	local: *;
> > > > +};
> > > you need to ennumerate the functions you want to globally export here, or this won't work
> > > as a shared library.  This also suggests you haven't tested this as a DSO yet,
> > > please do so.
> > > 
> > > Neil
> > FYI, the subject of email has indicated that's part of the PMD.
> I apologize, I didn't realize it was its own pmd, rather than part of the other
> pmds.  I missed the pathing and thought you were creating a separate library for
> others to consume, rather than a PMD to be accessed via the ethdev api
> 
> > there is no need to export any function globally.
> > you can reference 
> > drivers/event/octeontx/rte_pmd_octeontx_ssovf_version.map
> > /drivers/event/dpaa2/rte_pmd_dpaa2_event_version.map
> > drivers/event/sw/rte_pmd_sw_event_version.map
> > 
> > BTW: I do test with shared library build. 
> What did you test with?  I ask because with gcc version 7.2.1, the build throws
> several warnings that error the build out:
I test with Gcc 6.3.1 and some older version, no warning report. 
gcc 7.2.1 is not in our test list rightnow, I will investigate the issue. 
> 
> /home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c: In function ‘create_queues_and_rings’:
> /home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:17: error: ‘%s’ directive writing up to 63 bytes into a region of size 32 [-Werror=format-overflow=]
>   sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
>                  ^~
> /home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:2: note: ‘sprintf’ output between 3 and 75 bytes into a destination of size 32
>   sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
>   ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> /home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:17: error: ‘%s’ directive writing up to 63 bytes into a region of size 32 [-Werror=format-overflow=]
>   sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
>                  ^~
> /home/nhorman/git/dpdk/drivers/event/opdl/opdl_evdev_init.c:570:2: note: ‘sprintf’ output between 3 and 75 bytes into a destination of size 32
>   sprintf(name, "%s_%u", device->service_name, device->nb_opdls);
>   ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> cc1: all warnings being treated as errors
> 
> 
> It looks like you're trying to write a string to a 32 byte array, but the first
> component of that string (service_name) can be as long as 63 bytes, and the
> nb_opdls can be up to 75 bytes long. I'm guessing you want to use snprintf there
> rather than sprintf with a limit of RTE_MEMZONE_NAMESIZE - LIB_NAME (to account
> for the added characters in opdl_ring_create?
> 
> Neil
> 
> > > 
> > > > -- 
> > > > 2.7.5
> > > > 
> > > > --------------------------------------------------------------
> > > > Intel Research and Development Ireland Limited
> > > > Registered in Ireland
> > > > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> > > > Registered Number: 308263
> > > > 
> > > > 
> > > > This e-mail and any attachments may contain confidential material for the sole
> > > > use of the intended recipient(s). Any review or distribution by others is
> > > > strictly prohibited. If you are not the intended recipient, please contact the
> > > > sender and delete all copies.
> >
  

Patch

diff --git a/drivers/event/opdl/Makefile b/drivers/event/opdl/Makefile
new file mode 100644
index 0000000..2faf52d
--- /dev/null
+++ b/drivers/event/opdl/Makefile
@@ -0,0 +1,66 @@ 
+#   BSD LICENSE
+#
+#   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_pmd_opdl_event.a
+
+# build flags
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+# for older GCC versions, allow us to initialize an event using
+# designated initializers.
+ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
+ifeq ($(shell test $(GCC_VERSION) -le 50 && echo 1), 1)
+CFLAGS += -Wno-missing-field-initializers
+endif
+endif
+
+LDLIBS += -lrte_eal -lrte_eventdev -lrte_kvargs -lrte_ring
+LDLIBS += -lrte_bus_vdev -lrte_mbuf -lrte_mempool
+
+# library version
+LIBABIVER := 1
+
+# versioning export map
+EXPORT_MAP := rte_pmd_evdev_opdl_version.map
+
+# library source files
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_init.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_ring.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_evdev_xstats.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_OPDL_EVENTDEV) += opdl_test.c
+
+# export include files
+SYMLINK-y-include +=
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
new file mode 100644
index 0000000..5120fbe
--- /dev/null
+++ b/drivers/event/opdl/opdl_ring.c
@@ -0,0 +1,1232 @@ 
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include <rte_branch_prediction.h>
+#include <rte_debug.h>
+#include <rte_lcore.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_eal_memconfig.h>
+
+#include "opdl_ring.h"
+
+#define LIB_NAME "opdl_ring"
+
+#define OPDL_NAME_SIZE 64
+
+#define RTE_LOGTYPE_OPDL RTE_LOGTYPE_USER8
+#define log(level, fmt, ...) \
+		RTE_LOG(level, OPDL, LIB_NAME": " fmt "\n", ##__VA_ARGS__)
+
+#ifdef OPDL_DEBUG
+#define log_debug(...) log(DEBUG, __VA_ARGS__)
+#else
+#define log_debug(...)
+#endif
+
+#define POWER_OF_2(n) ((n) && !((n) & ((n) - 1)))
+
+#define RTE_EVENT_MASK  (0xFFFF0000000FFFFFULL)
+
+/* Types of dependency between stages */
+enum dep_type {
+	DEP_NONE = 0,  /* no dependency */
+	DEP_DIRECT,  /* stage has direct dependency */
+	DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
+	DEP_SELF,  /* stage dependency on itself, used to detect loops */
+};
+
+/* Shared section of stage state.
+ * Care is needed when accessing and the layout is important, especially to
+ * limit the adjacent cache-line HW prefetcher from impacting performance.
+ */
+struct shared_state {
+	/* Last known minimum sequence number of dependencies, used for multi
+	 * thread operation
+	 */
+	uint32_t available_seq;
+	char _pad1[RTE_CACHE_LINE_SIZE * 3];
+	uint32_t head;  /* Head sequence number (for multi thread operation) */
+	char _pad2[RTE_CACHE_LINE_SIZE * 3];
+	struct opdl_stage *stage;  /* back pointer */
+	uint32_t tail;  /* Tail sequence number */
+	char _pad3[RTE_CACHE_LINE_SIZE * 2];
+} __rte_cache_aligned;
+
+/* A structure to keep track of "unfinished" claims. This is only used for
+ * stages that are threadsafe. Each lcore accesses its own instance of this
+ * structure to record the entries it has claimed. This allows one lcore to make
+ * multiple claims without being blocked by another. When disclaiming it moves
+ * forward the shared tail when the shared tail matches the tail value recorded
+ * here.
+ */
+struct claim_manager {
+	uint32_t num_to_disclaim;
+	uint32_t num_claimed;
+	uint32_t mgr_head;
+	uint32_t mgr_tail;
+	struct {
+		uint32_t head;
+		uint32_t tail;
+	} claims[OPDL_DISCLAIMS_PER_LCORE];
+} __rte_cache_aligned;
+
+/* Context for each stage of opdl_ring.
+ * Calculations on sequence numbers need to be done with other uint32_t values
+ * so that results are modulus 2^32, and not undefined.
+ */
+struct opdl_stage {
+	struct opdl_ring *t;  /* back pointer, set at init */
+	uint32_t num_slots;  /* Number of slots for entries, set at init */
+	uint32_t index;  /* ID for this stage, set at init */
+	bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
+	/* Last known min seq number of dependencies for used for single thread
+	 * operation
+	 */
+	uint32_t available_seq;
+	uint32_t head;  /* Current head for single-thread operation */
+	uint32_t shadow_head;  /* Shadow head for single-thread operation */
+	uint32_t nb_instance;  /* Number of instances */
+	uint32_t instance_id;  /* ID of this stage instance */
+	uint16_t num_claimed;  /* Number of slots claimed */
+	uint16_t num_event;		/* Number of events */
+	uint32_t seq;			/* sequence number  */
+	uint32_t num_deps;  /* Number of direct dependencies */
+	/* Keep track of all dependencies, used during init only */
+	enum dep_type *dep_tracking;
+	/* Direct dependencies of this stage */
+	struct shared_state **deps;
+	/* Other stages read this! */
+	struct shared_state shared __rte_cache_aligned;
+	/* For managing disclaims in multi-threaded processing stages */
+	struct claim_manager pending_disclaims[RTE_MAX_LCORE]
+					       __rte_cache_aligned;
+} __rte_cache_aligned;
+
+/* Context for opdl_ring */
+struct opdl_ring {
+	char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
+	int socket;  /* NUMA socket that memory is allocated on */
+	uint32_t num_slots;  /* Number of slots for entries */
+	uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
+	uint32_t slot_size;  /* Size of each slot in bytes */
+	uint32_t num_stages;  /* Number of stages that have been added */
+	uint32_t max_num_stages;  /* Max number of stages */
+	/* Stages indexed by ID */
+	struct opdl_stage *stages;
+	/* Memory for storing slot data */
+	uint8_t slots[0] __rte_cache_aligned;
+};
+
+
+/* Return input stage of a opdl_ring */
+static inline struct opdl_stage *__attribute__((always_inline))
+input_stage(const struct opdl_ring *t)
+{
+	return &t->stages[0];
+}
+
+/* Check if a stage is the input stage */
+static inline bool __attribute__((always_inline))
+is_input_stage(const struct opdl_stage *s)
+{
+	return s->index == 0;
+}
+
+/* Get slot pointer from sequence number */
+static inline void *__attribute__((always_inline))
+get_slot(const struct opdl_ring *t, uint32_t n)
+{
+	return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
+}
+
+/* Find how many entries are available for processing */
+static inline uint32_t __attribute__((always_inline))
+available(const struct opdl_stage *s)
+{
+	if (s->threadsafe == true) {
+		uint32_t n = __atomic_load_n(&s->shared.available_seq,
+				__ATOMIC_ACQUIRE) -
+				__atomic_load_n(&s->shared.head,
+				__ATOMIC_ACQUIRE);
+
+		/* Return 0 if available_seq needs to be updated */
+		return (n <= s->num_slots) ? n : 0;
+	}
+
+	/* Single threaded */
+	return s->available_seq - s->head;
+}
+
+/* Read sequence number of dependencies and find minimum */
+static inline void __attribute__((always_inline))
+update_available_seq(struct opdl_stage *s)
+{
+	uint32_t i;
+	uint32_t this_tail = s->shared.tail;
+	uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
+	/* Input stage sequence numbers are greater than the sequence numbers of
+	 * its dependencies so an offset of t->num_slots is needed when
+	 * calculating available slots and also the condition which is used to
+	 * determine the dependencies minimum sequence number must be reverted.
+	 */
+	uint32_t wrap;
+
+	if (is_input_stage(s)) {
+		wrap = s->num_slots;
+		for (i = 1; i < s->num_deps; i++) {
+			uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
+					__ATOMIC_ACQUIRE);
+			if ((this_tail - seq) > (this_tail - min_seq))
+				min_seq = seq;
+		}
+	} else {
+		wrap = 0;
+		for (i = 1; i < s->num_deps; i++) {
+			uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
+					__ATOMIC_ACQUIRE);
+			if ((seq - this_tail) < (min_seq - this_tail))
+				min_seq = seq;
+		}
+	}
+
+	if (s->threadsafe == false)
+		s->available_seq = min_seq + wrap;
+	else
+		__atomic_store_n(&s->shared.available_seq, min_seq + wrap,
+				__ATOMIC_RELEASE);
+}
+
+/* Wait until the number of available slots reaches number requested */
+static inline void __attribute__((always_inline))
+wait_for_available(struct opdl_stage *s, uint32_t n)
+{
+	while (available(s) < n) {
+		rte_pause();
+		update_available_seq(s);
+	}
+}
+
+/* Return number of slots to process based on number requested and mode */
+static inline uint32_t __attribute__((always_inline))
+num_to_process(struct opdl_stage *s, uint32_t n, bool block)
+{
+	/* Don't read tail sequences of dependencies if not needed */
+	if (available(s) >= n)
+		return n;
+
+	update_available_seq(s);
+
+	if (block == false) {
+		uint32_t avail = available(s);
+
+		if (avail == 0) {
+			rte_pause();
+			return 0;
+		}
+		return (avail <= n) ? avail : n;
+	}
+
+	if (unlikely(n > s->num_slots)) {
+		log(ERR, "%u entries is more than max (%u)", n, s->num_slots);
+		return 0;  /* Avoid infinite loop */
+	}
+	/* blocking */
+	wait_for_available(s, n);
+	return n;
+}
+
+/* Copy entries in to slots with wrap-around */
+static inline void __attribute__((always_inline))
+copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
+		uint32_t num_entries)
+{
+	uint32_t slot_size = t->slot_size;
+	uint32_t slot_index = start & t->mask;
+
+	if (slot_index + num_entries <= t->num_slots) {
+		rte_memcpy(get_slot(t, start), entries,
+				num_entries * slot_size);
+	} else {
+		uint32_t split = t->num_slots - slot_index;
+
+		rte_memcpy(get_slot(t, start), entries, split * slot_size);
+		rte_memcpy(get_slot(t, 0),
+				RTE_PTR_ADD(entries, split * slot_size),
+				(num_entries - split) * slot_size);
+	}
+}
+
+/* Copy entries out from slots with wrap-around */
+static inline void __attribute__((always_inline))
+copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
+		uint32_t num_entries)
+{
+	uint32_t slot_size = t->slot_size;
+	uint32_t slot_index = start & t->mask;
+
+	if (slot_index + num_entries <= t->num_slots) {
+		rte_memcpy(entries, get_slot(t, start),
+				num_entries * slot_size);
+	} else {
+		uint32_t split = t->num_slots - slot_index;
+
+		rte_memcpy(entries, get_slot(t, start), split * slot_size);
+		rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
+				get_slot(t, 0),
+				(num_entries - split) * slot_size);
+	}
+}
+
+/* Input function optimised for single thread */
+static inline uint32_t __attribute__((always_inline))
+opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
+		uint32_t num_entries, bool block)
+{
+	struct opdl_stage *s = input_stage(t);
+	uint32_t head = s->head;
+
+	num_entries = num_to_process(s, num_entries, block);
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_in(t, head, entries, num_entries);
+
+	s->head += num_entries;
+	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+	return num_entries;
+}
+
+/* Convert head and tail of claim_manager into valid index */
+static inline uint32_t __attribute__((always_inline))
+claim_mgr_index(uint32_t n)
+{
+	return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
+}
+
+/* Check if there are available slots in claim_manager */
+static inline bool __attribute__((always_inline))
+claim_mgr_available(struct claim_manager *mgr)
+{
+	return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
+			true : false;
+}
+
+/* Record a new claim. Only use after first checking an entry is available */
+static inline void __attribute__((always_inline))
+claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
+{
+	if ((mgr->mgr_head != mgr->mgr_tail) &&
+			(mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
+			tail)) {
+		/* Combine with previous claim */
+		mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
+	} else {
+		mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
+		mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
+		mgr->mgr_head++;
+	}
+
+	mgr->num_claimed += (head - tail);
+}
+
+/* Read the oldest recorded claim */
+static inline bool __attribute__((always_inline))
+claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
+{
+	if (mgr->mgr_head == mgr->mgr_tail)
+		return false;
+
+	*head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
+	*tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
+	return true;
+}
+
+/* Remove the oldest recorded claim. Only use after first reading the entry */
+static inline void __attribute__((always_inline))
+claim_mgr_remove(struct claim_manager *mgr)
+{
+	mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
+			mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
+	mgr->mgr_tail++;
+}
+
+/* Update tail in the oldest claim. Only use after first reading the entry */
+static inline void __attribute__((always_inline))
+claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
+{
+	mgr->num_claimed -= num_entries;
+	mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
+}
+
+static inline void __attribute__((always_inline))
+opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
+		uint32_t num_entries, bool block)
+{
+	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
+	uint32_t head;
+	uint32_t tail;
+
+	while (num_entries) {
+		bool ret = claim_mgr_read(disclaims, &tail, &head);
+
+		if (ret == false)
+			break;  /* nothing is claimed */
+		/* There should be no race condition here. If shared.tail
+		 * matches, no other core can update it until this one does.
+		 */
+		if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
+				tail) {
+			if (num_entries >= (head - tail)) {
+				claim_mgr_remove(disclaims);
+				__atomic_store_n(&s->shared.tail, head,
+						__ATOMIC_RELEASE);
+				num_entries -= (head - tail);
+			} else {
+				claim_mgr_move_tail(disclaims, num_entries);
+				__atomic_store_n(&s->shared.tail,
+						num_entries + tail,
+						__ATOMIC_RELEASE);
+				num_entries = 0;
+			}
+		} else if (block == false)
+			break;  /* blocked by other thread */
+		/* Keep going until num_entries are disclaimed. */
+		rte_pause();
+	}
+
+	disclaims->num_to_disclaim = num_entries;
+}
+
+/* Move head atomically, returning number of entries available to process and
+ * the original value of head. For non-input stages, the claim is recorded
+ * so that the tail can be updated later by opdl_stage_disclaim().
+ */
+static inline void __attribute__((always_inline))
+move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
+		uint32_t *old_head, bool block, bool claim_func)
+{
+	uint32_t orig_num_entries = *num_entries;
+	uint32_t ret;
+	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
+
+	/* Attempt to disclaim any outstanding claims */
+	opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
+			false);
+
+	*old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
+	while (true) {
+		bool success;
+		/* If called by opdl_ring_input(), claim does not need to be
+		 * recorded, as there will be no disclaim.
+		 */
+		if (claim_func) {
+			/* Check that the claim can be recorded */
+			ret = claim_mgr_available(disclaims);
+			if (ret == false) {
+				/* exit out if claim can't be recorded */
+				*num_entries = 0;
+				return;
+			}
+		}
+
+		*num_entries = num_to_process(s, orig_num_entries, block);
+		if (*num_entries == 0)
+			return;
+
+		success = __atomic_compare_exchange_n(&s->shared.head, old_head,
+				*old_head + *num_entries,
+				true,  /* may fail spuriously */
+				__ATOMIC_RELEASE,  /* memory order on success */
+				__ATOMIC_ACQUIRE);  /* memory order on fail */
+		if (likely(success))
+			break;
+		rte_pause();
+	}
+
+	if (claim_func)
+		/* Store the claim record */
+		claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
+}
+
+/* Input function that supports multiple threads */
+static inline uint32_t __attribute__((always_inline))
+opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
+		uint32_t num_entries, bool block)
+{
+	struct opdl_stage *s = input_stage(t);
+	uint32_t old_head;
+
+	move_head_atomically(s, &num_entries, &old_head, block, false);
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_in(t, old_head, entries, num_entries);
+
+	/* If another thread started inputting before this one, but hasn't
+	 * finished, we need to wait for it to complete to update the tail.
+	 */
+	while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
+			old_head))
+		rte_pause();
+
+	__atomic_store_n(&s->shared.tail, old_head + num_entries,
+			__ATOMIC_RELEASE);
+
+	return num_entries;
+}
+
+static inline uint32_t __attribute__((always_inline))
+opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
+		uint8_t this_lcore)
+{
+	return ((nb_p_lcores <= 1) ? 0 :
+			(nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
+			nb_p_lcores);
+}
+
+/* Claim slots to process, optimised for single-thread operation */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
+{
+	uint32_t i = 0, j = 0,  offset;
+	void *get_slots;
+	struct rte_event *ev;
+	RTE_SET_USED(seq);
+	struct opdl_ring *t = s->t;
+	uint8_t *entries_offset = (uint8_t *)entries;
+
+	if (!atomic) {
+
+		offset = opdl_first_entry_id(s->seq, s->nb_instance,
+				s->instance_id);
+
+		num_entries = s->nb_instance * num_entries;
+
+		num_entries = num_to_process(s, num_entries, block);
+
+		for (; offset < num_entries; offset += s->nb_instance) {
+			get_slots = get_slot(t, s->head + offset);
+			memcpy(entries_offset, get_slots, t->slot_size);
+			entries_offset += t->slot_size;
+			i++;
+		}
+	} else {
+		num_entries = num_to_process(s, num_entries, block);
+
+		for (j = 0; j < num_entries; j++) {
+			ev = (struct rte_event *)get_slot(t, s->head+j);
+			if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+				memcpy(entries_offset, ev, t->slot_size);
+				entries_offset += t->slot_size;
+				i++;
+			}
+		}
+	}
+	s->shadow_head = s->head;
+	s->head += num_entries;
+	s->num_claimed = num_entries;
+	s->num_event = i;
+
+	/* automatically disclaim entries if number of rte_events is zero */
+	if (unlikely(i == 0))
+		opdl_stage_disclaim(s, 0, false);
+
+	return i;
+}
+
+/* Thread-safe version of function to claim slots for processing */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	uint32_t old_head;
+	struct opdl_ring *t = s->t;
+	uint32_t i = 0, offset;
+	uint8_t *entries_offset = (uint8_t *)entries;
+
+	offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
+	num_entries = offset + (s->nb_instance * num_entries);
+
+	move_head_atomically(s, &num_entries, &old_head, block, true);
+
+	for (; offset < num_entries; offset += s->nb_instance) {
+		memcpy(entries_offset, get_slot(t, s->head + offset),
+			t->slot_size);
+		entries_offset += t->slot_size;
+		i++;
+	}
+	if (seq != NULL)
+		*seq = old_head;
+
+	return i;
+}
+
+/* Claim and copy slot pointers, optimised for single-thread operation */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	num_entries = num_to_process(s, num_entries, block);
+	if (num_entries == 0)
+		return 0;
+	copy_entries_out(s->t, s->head, entries, num_entries);
+	if (seq != NULL)
+		*seq = s->head;
+	s->head += num_entries;
+	return num_entries;
+}
+
+/* Thread-safe version of function to claim and copy pointers to slots */
+static inline uint32_t __attribute__((always_inline))
+opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	uint32_t old_head;
+
+	move_head_atomically(s, &num_entries, &old_head, block, true);
+	if (num_entries == 0)
+		return 0;
+	copy_entries_out(s->t, old_head, entries, num_entries);
+	if (seq != NULL)
+		*seq = old_head;
+	return num_entries;
+}
+
+static inline void __attribute__((always_inline))
+opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
+		uint32_t num_entries)
+{
+	uint32_t old_tail = s->shared.tail;
+
+	if (unlikely(num_entries > (s->head - old_tail))) {
+		log(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
+				num_entries, s->head - old_tail);
+		num_entries = s->head - old_tail;
+	}
+	__atomic_store_n(&s->shared.tail, num_entries + old_tail,
+			__ATOMIC_RELEASE);
+}
+
+uint32_t
+opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
+		bool block)
+{
+	if (input_stage(t)->threadsafe == false)
+		return opdl_ring_input_singlethread(t, entries, num_entries,
+				block);
+	else
+		return opdl_ring_input_multithread(t, entries, num_entries,
+				block);
+}
+
+uint32_t
+opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
+		const void *entries, uint32_t num_entries, bool block)
+{
+	uint32_t head = s->head;
+
+	num_entries = num_to_process(s, num_entries, block);
+
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_in(t, head, entries, num_entries);
+
+	s->head += num_entries;
+	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+	return num_entries;
+
+}
+
+uint32_t
+opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
+		void *entries, uint32_t num_entries, bool block)
+{
+	uint32_t head = s->head;
+
+	num_entries = num_to_process(s, num_entries, block);
+	if (num_entries == 0)
+		return 0;
+
+	copy_entries_out(t, head, entries, num_entries);
+
+	s->head += num_entries;
+	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+
+	return num_entries;
+}
+
+uint32_t
+opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
+{
+	/* return (num_to_process(s, num_entries, false)); */
+
+	if (available(s) >= num_entries)
+		return num_entries;
+
+	update_available_seq(s);
+
+	uint32_t avail = available(s);
+
+	if (avail == 0) {
+		rte_pause();
+		return 0;
+	}
+	return (avail <= num_entries) ? avail : num_entries;
+}
+
+uint32_t
+opdl_stage_claim(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
+{
+	if (s->threadsafe == false)
+		return opdl_stage_claim_singlethread(s, entries, num_entries,
+				seq, block, atomic);
+	else
+		return opdl_stage_claim_multithread(s, entries, num_entries,
+				seq, block);
+}
+
+uint32_t
+opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block)
+{
+	if (s->threadsafe == false)
+		return opdl_stage_claim_copy_singlethread(s, entries,
+				num_entries, seq, block);
+	else
+		return opdl_stage_claim_copy_multithread(s, entries,
+				num_entries, seq, block);
+}
+
+void
+opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
+		bool block)
+{
+
+	if (s->threadsafe == false) {
+		opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
+	} else {
+		struct claim_manager *disclaims =
+			&s->pending_disclaims[rte_lcore_id()];
+
+		if (unlikely(num_entries > s->num_slots)) {
+			log(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
+					num_entries, disclaims->num_claimed);
+			num_entries = disclaims->num_claimed;
+		}
+
+		num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
+				disclaims->num_claimed);
+		opdl_stage_disclaim_multithread_n(s, num_entries, block);
+	}
+}
+
+int
+opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
+{
+	if (num_entries != s->num_event) {
+		rte_errno = -EINVAL;
+		return 0;
+	}
+	if (s->threadsafe == false) {
+		__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
+		s->seq += s->num_claimed;
+		s->shadow_head = s->head;
+		s->num_claimed = 0;
+	} else {
+		struct claim_manager *disclaims =
+				&s->pending_disclaims[rte_lcore_id()];
+		opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
+				block);
+	}
+	return num_entries;
+}
+
+uint32_t
+opdl_ring_available(struct opdl_ring *t)
+{
+	return opdl_stage_available(&t->stages[0]);
+}
+
+uint32_t
+opdl_stage_available(struct opdl_stage *s)
+{
+	update_available_seq(s);
+	return available(s);
+}
+
+void
+opdl_ring_flush(struct opdl_ring *t)
+{
+	struct opdl_stage *s = input_stage(t);
+
+	wait_for_available(s, s->num_slots);
+}
+
+/******************** Non performance sensitive functions ********************/
+
+/* Initial setup of a new stage's context */
+static int
+init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
+		bool is_input)
+{
+	uint32_t available = (is_input) ? t->num_slots : 0;
+
+	s->t = t;
+	s->num_slots = t->num_slots;
+	s->index = t->num_stages;
+	s->threadsafe = threadsafe;
+	s->shared.stage = s;
+
+	/* Alloc memory for deps */
+	s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
+			t->max_num_stages * sizeof(enum dep_type),
+			0, t->socket);
+	if (s->dep_tracking == NULL)
+		return -ENOMEM;
+
+	s->deps = rte_zmalloc_socket(LIB_NAME,
+			t->max_num_stages * sizeof(struct shared_state *),
+			0, t->socket);
+	if (s->deps == NULL) {
+		rte_free(s->dep_tracking);
+		return -ENOMEM;
+	}
+
+	s->dep_tracking[s->index] = DEP_SELF;
+
+	if (threadsafe == true)
+		s->shared.available_seq = available;
+	else
+		s->available_seq = available;
+
+	return 0;
+}
+
+/* Add direct or indirect dependencies between stages */
+static int
+add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
+		enum dep_type type)
+{
+	struct opdl_ring *t = dependent->t;
+	uint32_t i;
+
+	/* Add new direct dependency */
+	if ((type == DEP_DIRECT) &&
+			(dependent->dep_tracking[dependency->index] ==
+					DEP_NONE)) {
+		log_debug("%s:%u direct dependency on %u",
+				t->name, dependent->index, dependency->index);
+		dependent->dep_tracking[dependency->index] = DEP_DIRECT;
+	}
+
+	/* Add new indirect dependency or change direct to indirect */
+	if ((type == DEP_INDIRECT) &&
+			((dependent->dep_tracking[dependency->index] ==
+			DEP_NONE) ||
+			(dependent->dep_tracking[dependency->index] ==
+			DEP_DIRECT))) {
+		log_debug("%s:%u indirect dependency on %u",
+				t->name, dependent->index, dependency->index);
+		dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
+	}
+
+	/* Shouldn't happen... */
+	if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
+			(dependent != input_stage(t))) {
+		log(ERR, "Loop in dependency graph %s:%u",
+				t->name, dependent->index);
+		return -EINVAL;
+	}
+
+	/* Keep going to dependencies of the dependency, until input stage */
+	if (dependency != input_stage(t))
+		for (i = 0; i < dependency->num_deps; i++) {
+			int ret = add_dep(dependent, dependency->deps[i]->stage,
+					DEP_INDIRECT);
+
+			if (ret < 0)
+				return ret;
+		}
+
+	/* Make list of sequence numbers for direct dependencies only */
+	if (type == DEP_DIRECT)
+		for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
+			if (dependent->dep_tracking[i] == DEP_DIRECT) {
+				if ((i == 0) && (dependent->num_deps > 1))
+					rte_panic("%s:%u depends on > input",
+							t->name,
+							dependent->index);
+				dependent->deps[dependent->num_deps++] =
+						&t->stages[i].shared;
+			}
+
+	return 0;
+}
+
+struct opdl_ring *
+opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
+		uint32_t max_num_stages, int socket)
+{
+	struct opdl_ring *t;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	int mz_flags = 0;
+	struct opdl_stage *st = NULL;
+	const struct rte_memzone *mz = NULL;
+	size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
+			(num_slots * slot_size));
+
+	/* Compile time checking */
+	RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
+			0);
+	RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
+			RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
+			RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON(!POWER_OF_2(OPDL_DISCLAIMS_PER_LCORE));
+
+	/* Parameter checking */
+	if (name == NULL) {
+		log(ERR, "name param is NULL");
+		return NULL;
+	}
+	if (!rte_is_power_of_2(num_slots)) {
+		log(ERR, "num_slots (%u) for %s is not power of 2",
+				num_slots, name);
+		return NULL;
+	}
+
+	/* Alloc memory for stages */
+	st = rte_zmalloc_socket(LIB_NAME,
+		max_num_stages * sizeof(struct opdl_stage),
+		RTE_CACHE_LINE_SIZE, socket);
+	if (st == NULL)
+		goto exit_fail;
+
+	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
+
+	/* Alloc memory for memzone */
+	mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
+	if (mz == NULL)
+		goto exit_fail;
+
+	t = mz->addr;
+
+	/* Initialise opdl_ring queue */
+	memset(t, 0, sizeof(*t));
+	snprintf(t->name, sizeof(t->name), "%s", name);
+	t->socket = socket;
+	t->num_slots = num_slots;
+	t->mask = num_slots - 1;
+	t->slot_size = slot_size;
+	t->max_num_stages = max_num_stages;
+	t->stages = st;
+
+	log_debug("Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
+			t->name, t, num_slots, socket, slot_size);
+
+	return t;
+
+exit_fail:
+	log(ERR, "Cannot reserve memory");
+	rte_free(st);
+	rte_memzone_free(mz);
+
+	return NULL;
+}
+
+void *
+opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
+{
+	return get_slot(t, index);
+}
+
+bool
+opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+		uint32_t index, bool atomic)
+{
+	uint32_t i = 0, j = 0, offset;
+	struct opdl_ring *t = s->t;
+	struct rte_event *ev_orig = NULL;
+	bool ev_updated = false;
+	uint64_t  ev_temp = 0;
+
+	if (index > s->num_event) {
+		log(ERR, "index is overflow");
+		return ev_updated;
+	}
+
+	ev_temp = ev->event&RTE_EVENT_MASK;
+
+	if (!atomic) {
+		offset = opdl_first_entry_id(s->seq, s->nb_instance,
+				s->instance_id);
+		offset += index*s->nb_instance;
+		ev_orig = get_slot(t, s->shadow_head+offset);
+		if ((ev_orig->event&RTE_EVENT_MASK) != ev_temp) {
+			ev_orig->event = ev->event;
+			ev_updated = true;
+		}
+		if (ev_orig->u64 != ev->u64) {
+			ev_orig->u64 = ev->u64;
+			ev_updated = true;
+		}
+
+	} else {
+		for (i = 0; i < s->num_claimed; i++) {
+			ev_orig = (struct rte_event *)get_slot(t, s->shadow_head+i);
+			if ((ev_orig->flow_id%s->nb_instance) == s->instance_id) {
+
+				if (j == index) {
+					if ((ev_orig->event&RTE_EVENT_MASK) != ev_temp) {
+						ev_orig->event = ev->event;
+						ev_updated = true;
+					}
+					if (ev_orig->u64 != ev->u64) {
+						ev_orig->u64 = ev->u64;
+						ev_updated = true;
+					}
+
+					break;
+				}
+				j++;
+			}
+		}
+
+	}
+
+	return ev_updated;
+}
+
+int
+opdl_ring_get_socket(const struct opdl_ring *t)
+{
+	return t->socket;
+}
+
+uint32_t
+opdl_ring_get_num_slots(const struct opdl_ring *t)
+{
+	return t->num_slots;
+}
+
+const char *
+opdl_ring_get_name(const struct opdl_ring *t)
+{
+	return t->name;
+}
+
+/* Check dependency list is valid for a given opdl_ring */
+static int
+check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
+		uint32_t num_deps)
+{
+	unsigned int i;
+
+	for (i = 0; i < num_deps; ++i) {
+		if (!deps[i]) {
+			log(ERR, "deps[%u] is NULL", i);
+			return -EINVAL;
+		}
+		if (t != deps[i]->t) {
+			log(ERR, "deps[%u] is in opdl_ring %s, not %s",
+					i, deps[i]->t->name, t->name);
+			return -EINVAL;
+		}
+	}
+	if (num_deps > t->num_stages) {
+		log(ERR, "num_deps (%u) > number stages (%u)",
+				num_deps, t->num_stages);
+		return -EINVAL;
+	}
+	return 0;
+}
+
+struct opdl_stage *
+opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
+{
+	struct opdl_stage *s;
+
+	/* Parameter checking */
+	if (!t) {
+		log(ERR, "opdl_ring is NULL");
+		return NULL;
+	}
+	if (t->num_stages == t->max_num_stages) {
+		log(ERR, "%s has max number of stages (%u)",
+				t->name, t->max_num_stages);
+		return NULL;
+	}
+
+	s = &t->stages[t->num_stages];
+
+	if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
+		log(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
+				&s->shared, t->name);
+
+	if (init_stage(t, s, threadsafe, is_input) < 0) {
+		log(ERR, "Cannot reserve memory");
+		return NULL;
+	}
+	t->num_stages++;
+
+	return s;
+}
+
+uint32_t
+opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
+		uint32_t nb_instance, uint32_t instance_id,
+		struct opdl_stage *deps[],
+		uint32_t num_deps)
+{
+	uint32_t i;
+	int ret = 0;
+
+	if ((num_deps > 0) && (!deps)) {
+		log(ERR, "%s stage has NULL dependencies", t->name);
+		return -1;
+	}
+	ret = check_deps(t, deps, num_deps);
+	if (ret < 0)
+		return ret;
+
+	for (i = 0; i < num_deps; i++) {
+		ret = add_dep(s, deps[i], DEP_DIRECT);
+		if (ret < 0)
+			return ret;
+	}
+
+	s->nb_instance = nb_instance;
+	s->instance_id = instance_id;
+
+	return ret;
+}
+
+struct opdl_stage *
+opdl_ring_get_input_stage(const struct opdl_ring *t)
+{
+	return input_stage(t);
+}
+
+int
+opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
+		uint32_t num_deps)
+{
+	unsigned int i;
+	int ret;
+
+	if ((num_deps == 0) || (!deps)) {
+		log(ERR, "cannot set NULL dependencies");
+		return -EINVAL;
+	}
+
+	ret = check_deps(s->t, deps, num_deps);
+	if (ret < 0)
+		return ret;
+
+	/* Update deps */
+	for (i = 0; i < num_deps; i++)
+		s->deps[i] = &deps[i]->shared;
+	s->num_deps = num_deps;
+
+	return 0;
+}
+
+struct opdl_ring *
+opdl_stage_get_opdl_ring(const struct opdl_stage *s)
+{
+	return s->t;
+}
+
+void
+opdl_ring_dump(const struct opdl_ring *t, FILE *f)
+{
+	uint32_t i;
+
+	if (t == NULL) {
+		fprintf(f, "NULL OPDL!\n");
+		return;
+	}
+	fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
+			t->name, t->num_slots, t->mask, t->slot_size,
+			t->num_stages, t->socket);
+	for (i = 0; i < t->num_stages; i++) {
+		uint32_t j;
+		const struct opdl_stage *s = &t->stages[i];
+
+		fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
+				t->name, i, (s->threadsafe) ? "true" : "false",
+				(s->threadsafe) ? s->shared.head : s->head,
+				(s->threadsafe) ? s->shared.available_seq :
+				s->available_seq,
+				s->shared.tail, (s->num_deps > 0) ?
+				s->deps[0]->stage->index : 0);
+		for (j = 1; j < s->num_deps; j++)
+			fprintf(f, ",%u", s->deps[j]->stage->index);
+		fprintf(f, "\n");
+	}
+	fflush(f);
+}
+
+void
+opdl_ring_free(struct opdl_ring *t)
+{
+	uint32_t i;
+	const struct rte_memzone *mz;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+
+	if (t == NULL) {
+		log_debug("Freeing NULL OPDL Ring!");
+		return;
+	}
+
+	log_debug("Freeing %s opdl_ring at %p", t->name, t);
+
+	for (i = 0; i < t->num_stages; ++i) {
+		rte_free(t->stages[i].deps);
+		rte_free(t->stages[i].dep_tracking);
+	}
+
+	rte_free(t->stages);
+
+	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
+	mz = rte_memzone_lookup(mz_name);
+	if (rte_memzone_free(mz) != 0)
+		log(ERR, "Cannot free memzone for %s", t->name);
+}
+
+/* search a opdl_ring from its name */
+struct opdl_ring *
+opdl_ring_lookup(const char *name)
+{
+	const struct rte_memzone *mz;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+
+	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
+
+	mz = rte_memzone_lookup(mz_name);
+	if (mz == NULL)
+		return NULL;
+
+	return mz->addr;
+}
+
+void
+opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
+{
+	s->threadsafe = threadsafe;
+}
diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h
new file mode 100644
index 0000000..cc37bd1
--- /dev/null
+++ b/drivers/event/opdl/opdl_ring.h
@@ -0,0 +1,601 @@ 
+/*-te
+ *
+ * <COPYRIGHT_TAG>
+ */
+
+#ifndef _OPDL_H_
+#define _OPDL_H_
+
+/**
+ * @file
+ * The "opdl_ring" is a data structure that contains a fixed number of slots,
+ * with each slot having the same, but configurable, size. Entries are input
+ * into the opdl_ring by copying into available slots. Once in the opdl_ring,
+ * an entry is processed by a number of stages, with the ordering of stage
+ * processing controlled by making stages dependent on one or more other stages.
+ * An entry is not available for a stage to process until it has been processed
+ * by that stages dependencies. Entries are always made available for
+ * processing in the same order that they were input in to the opdl_ring.
+ * Inputting is considered as a stage that depends on all other stages,
+ * and is also a dependency of all stages.
+ *
+ * Inputting and processing in a stage can support multi-threading. Note that
+ * multi-thread processing can also be done by making stages co-operate e.g. two
+ * stages where one processes the even packets and the other processes odd
+ * packets.
+ *
+ * A opdl_ring can be used as the basis for pipeline based applications. Instead
+ * of each stage in a pipeline dequeueing from a ring, processing and enqueueing
+ * to another ring, it can process entries in-place on the ring. If stages do
+ * not depend on each other, they can run in parallel.
+ *
+ * The opdl_ring works with entries of configurable size, these could be
+ * pointers to mbufs, pointers to mbufs with application specific meta-data,
+ * tasks etc.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include <rte_eventdev.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef OPDL_DISCLAIMS_PER_LCORE
+/** Multi-threaded processing allows one thread to process multiple batches in a
+ * stage, while another thread is processing a single large batch. This number
+ * controls how many non-contiguous batches one stage can process before being
+ * blocked by the other stage.
+ */
+#define OPDL_DISCLAIMS_PER_LCORE 8
+#endif
+
+/** Opaque handle to a opdl_ring instance */
+struct opdl_ring;
+
+/** Opaque handle to a single stage in a opdl_ring */
+struct opdl_stage;
+
+/**
+ * Create a new instance of a opdl_ring.
+ *
+ * @param name
+ *   String containing the name to give the new opdl_ring instance.
+ * @param num_slots
+ *   How many slots the opdl_ring contains. Must be a power a 2!
+ * @param slot_size
+ *   How many bytes in each slot.
+ * @param max_num_stages
+ *   Maximum number of stages.
+ * @param socket
+ *   The NUMA socket (or SOCKET_ID_ANY) to allocate the memory used for this
+ *   opdl_ring instance.
+ * @param threadsafe
+ *   Whether to support multiple threads inputting to the opdl_ring or not.
+ *   Enabling this may have a negative impact on performance if only one thread
+ *   will be inputting.
+ *
+ * @return
+ *   A pointer to a new opdl_ring instance, or NULL on error.
+ */
+struct opdl_ring *
+opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
+		uint32_t max_num_stages, int socket);
+
+/**
+ * Get pointer to individual slot in a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ * @param index
+ *   Index of slot. If greater than the number of slots it will be masked to be
+ *   within correct range.
+ *
+ * @return
+ *   A pointer to that slot.
+ */
+void *
+opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index);
+
+/**
+ * Get NUMA socket used by a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   NUMA socket.
+ */
+int
+opdl_ring_get_socket(const struct opdl_ring *t);
+
+/**
+ * Get number of slots in a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   Number of slots.
+ */
+uint32_t
+opdl_ring_get_num_slots(const struct opdl_ring *t);
+
+/**
+ * Get name of a opdl_ring.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   Name string.
+ */
+const char *
+opdl_ring_get_name(const struct opdl_ring *t);
+
+/**
+ * Adds a new processing stage to a specified opdl_ring instance. Adding a stage
+ * while there are entries in the opdl_ring being processed will cause undefined
+ * behaviour.
+ *
+ * @param t
+ *   The opdl_ring to add the stage to.
+ * @param deps
+ *   An array of pointers to other stages that this stage depends on. The other
+ *   stages must be part of the same opdl_ring! Note that input is an implied
+ *   dependency. This can be NULL if num_deps is 0.
+ * @param num_deps
+ *   The size of the deps array.
+ * @param threadsafe
+ *   Whether to support multiple threads processing this stage or  not.
+ *   Enabling this may have a negative impact on performance if only one thread
+ *   will be processing this stage.
+ * @param is_input
+ *   Indication to nitialise the stage with all slots available or none
+ *
+ * @return
+ *   A pointer to the new stage, or NULL on error.
+ */
+struct opdl_stage *
+opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input);
+
+/**
+ * Returns the input stage of a opdl_ring to be used by other API functions.
+ *
+ * @param t
+ *   The opdl_ring.
+ *
+ * @return
+ *   A pointer to the input stage.
+ */
+struct opdl_stage *
+opdl_ring_get_input_stage(const struct opdl_ring *t);
+
+/**
+ * Sets the dependencies for a stage (clears all the previous deps!). Changing
+ * dependencies while there are entries in the opdl_ring being processed will
+ * cause undefined behaviour.
+ *
+ * @param s
+ *   The stage to set the dependencies for.
+ * @param deps
+ *   An array of pointers to other stages that this stage will depends on. The
+ *   other stages must be part of the same opdl_ring!
+ * @param num_deps
+ *   The size of the deps array. This must be > 0.
+ *
+ * @return
+ *   0 on success, a negative value on error.
+ */
+int
+opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
+		uint32_t num_deps);
+
+/**
+ * Returns the opdl_ring that a stage belongs to.
+ *
+ * @param s
+ *   The stage
+ *
+ * @return
+ *   A pointer to the opdl_ring that the stage belongs to.
+ */
+struct opdl_ring *
+opdl_stage_get_opdl_ring(const struct opdl_stage *s);
+
+/**
+ * Inputs a new batch of entries into the opdl_ring. This function is only
+ * threadsafe (with the same opdl_ring parameter) if the threadsafe parameter of
+ * opdl_ring_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ *   The opdl_ring to input entries in to.
+ * @param entries
+ *   An array of entries that will be copied in to the opdl_ring.
+ * @param num_entries
+ *   The size of the entries array.
+ * @param block
+ *   If this is true, the function blocks until enough slots are available to
+ *   input all the requested entries. If false, then the function inputs as
+ *   many entries as currently possible.
+ *
+ * @return
+ *   The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
+		bool block);
+
+/**
+ * Inputs a new batch of entries into a opdl stage. This function is only
+ * threadsafe (with the same opdl parameter) if the threadsafe parameter of
+ * opdl_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ *   The opdl ring to input entries in to.
+ * @param s
+ *   The stage to copy entries to.
+ * @param entries
+ *   An array of entries that will be copied in to the opdl ring.
+ * @param num_entries
+ *   The size of the entries array.
+ * @param block
+ *   If this is true, the function blocks until enough slots are available to
+ *   input all the requested entries. If false, then the function inputs as
+ *   many entries as currently possible.
+ *
+ * @return
+ *   The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
+			const void *entries, uint32_t num_entries, bool block);
+
+/**
+ * Copy a batch of entries from the opdl ring. This function is only
+ * threadsafe (with the same opdl parameter) if the threadsafe parameter of
+ * opdl_create() was true. For performance reasons, this function does not
+ * check input parameters.
+ *
+ * @param t
+ *   The opdl ring to copy entries from.
+ * @param s
+ *   The stage to copy entries from.
+ * @param entries
+ *   An array of entries that will be copied from the opdl ring.
+ * @param num_entries
+ *   The size of the entries array.
+ * @param block
+ *   If this is true, the function blocks until enough slots are available to
+ *   input all the requested entries. If false, then the function inputs as
+ *   many entries as currently possible.
+ *
+ * @return
+ *   The number of entries successfully input.
+ */
+uint32_t
+opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
+		void *entries, uint32_t num_entries, bool block);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. This function is threadsafe using same opdl_stage parameter if
+ * the stage was created with threadsafe set to true, otherwise it is only
+ * threadsafe with a different opdl_stage per thread. For performance
+ * reasons, this function does not check input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage to read entries in.
+ * @param entries
+ *   An array of pointers to entries that will be filled in by this function.
+ * @param num_entries
+ *   The number of entries to attempt to claim for processing (and the size of
+ *   the entries array).
+ * @param seq
+ *   If not NULL, this is set to the value of the internal stage sequence number
+ *   associated with the first entry returned.
+ * @param block
+ *   If this is true, the function blocks until num_entries slots are available
+ *   to process. If false, then the function claims as many entries as
+ *   currently possible.
+ *
+ * @param atomic
+ *   if this is true, the function will return event according to event flow id
+ * @return
+ *   The number of pointers to entries filled in to the entries array.
+ */
+uint32_t
+opdl_stage_claim(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block, bool atomic);
+
+uint32_t
+opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
+		uint32_t nb_instance, uint32_t instance_id,
+		struct opdl_stage *deps[], uint32_t num_deps);
+
+/**
+ * A function to check how many entries are ready to be claimed.
+ *
+ * @param entries
+ *   An array of pointers to entries.
+ * @param num_entries
+ *   Number of entries in an array.
+ * @param arg
+ *   An opaque pointer to data passed to the claim function.
+ * @param block
+ *   When set to true, the function should wait until num_entries are ready to
+ *   be processed. Otherwise it should return immediately.
+ *
+ * @return
+ *   Number of entries ready to be claimed.
+ */
+typedef uint32_t (opdl_ring_check_entries_t)(void *entries[],
+		uint32_t num_entries, void *arg, bool block);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. Each entry is checked by the passed check() function and depending
+ * on block value, it waits until num_entries are ready or returns immediately.
+ * This function is only threadsafe with a different opdl_stage per thread.
+ *
+ * @param s
+ *   The opdl_ring stage to read entries in.
+ * @param entries
+ *   An array of pointers to entries that will be filled in by this function.
+ * @param num_entries
+ *   The number of entries to attempt to claim for processing (and the size of
+ *   the entries array).
+ * @param seq
+ *   If not NULL, this is set to the value of the internal stage sequence number
+ *   associated with the first entry returned.
+ * @param block
+ *   If this is true, the function blocks until num_entries ready slots are
+ *   available to process. If false, then the function claims as many ready
+ *   entries as currently possible.
+ * @param check
+ *   Pointer to a function called to check entries.
+ * @param arg
+ *   Opaque data passed to check() function.
+ *
+ * @return
+ *   The number of pointers to ready entries filled in to the entries array.
+ */
+uint32_t
+opdl_stage_claim_check(struct opdl_stage *s, void **entries,
+		uint32_t num_entries, uint32_t *seq, bool block,
+		opdl_ring_check_entries_t *check, void *arg);
+
+/**
+ * Before processing a batch of entries, a stage must first claim them to get
+ * access. This function is threadsafe using same opdl_stage parameter if
+ * the stage was created with threadsafe set to true, otherwise it is only
+ * threadsafe with a different opdl_stage per thread.
+ *
+ * The difference between this function and opdl_stage_claim() is that this
+ * function copies the entries from the opdl_ring. Note that any changes made to
+ * the copied entries will not be reflected back in to the entries in the
+ * opdl_ring, so this function probably only makes sense if the entries are
+ * pointers to other data. For performance reasons, this function does not check
+ * input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage to read entries in.
+ * @param entries
+ *   An array of entries that will be filled in by this function.
+ * @param num_entries
+ *   The number of entries to attempt to claim for processing (and the size of
+ *   the entries array).
+ * @param seq
+ *   If not NULL, this is set to the value of the internal stage sequence number
+ *   associated with the first entry returned.
+ * @param block
+ *   If this is true, the function blocks until num_entries slots are available
+ *   to process. If false, then the function claims as many entries as
+ *   currently possible.
+ *
+ * @return
+ *   The number of entries copied in to the entries array.
+ */
+uint32_t
+opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
+		uint32_t num_entries, uint32_t *seq, bool block);
+
+/**
+ * This function must be called when a stage has finished its processing of
+ * entries, to make them available to any dependent stages. All entries that are
+ * claimed by the calling thread in the stage will be disclaimed. It is possible
+ * to claim multiple batches before disclaiming. For performance reasons, this
+ * function does not check input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage in which to disclaim all claimed entries.
+ *
+ * @param block
+ *   Entries are always made available to a stage in the same order that they
+ *   were input in the stage. If a stage is multithread safe, this may mean that
+ *   full disclaiming of a batch of entries can not be considered complete until
+ *   all earlier threads in the stage have disclaimed. If this parameter is true
+ *   then the function blocks until all entries are fully disclaimed, otherwise
+ *   it disclaims as many as currently possible, with non fully disclaimed
+ *   batches stored until the next call to a claim or disclaim function for this
+ *   stage on this thread.
+ *
+ *   If a thread is not going to process any more entries in this stage, it
+ *   *must* first call this function with this parameter set to true to ensure
+ *   it does not block the entire opdl_ring.
+ *
+ *   In a single threaded stage, this parameter has no effect.
+ */
+int
+opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries,
+		bool block);
+
+/**
+ * This function can be called when a stage has finished its processing of
+ * entries, to make them available to any dependent stages. The difference
+ * between this function and opdl_stage_disclaim() is that here only a
+ * portion of entries are disclaimed, not all of them. For performance reasons,
+ * this function does not check input parameters.
+ *
+ * @param s
+ *   The opdl_ring stage in which to disclaim entries.
+ *
+ * @param num_entries
+ *   The number of entries to disclaim.
+ *
+ * @param block
+ *   Entries are always made available to a stage in the same order that they
+ *   were input in the stage. If a stage is multithread safe, this may mean that
+ *   full disclaiming of a batch of entries can not be considered complete until
+ *   all earlier threads in the stage have disclaimed. If this parameter is true
+ *   then the function blocks until the specified number of entries has been
+ *   disclaimed (or there are no more entries to disclaim). Otherwise it
+ *   disclaims as many claims as currently possible and an attempt to disclaim
+ *   them is made the next time a claim or disclaim function for this stage on
+ *   this thread is called.
+ *
+ *   In a single threaded stage, this parameter has no effect.
+ */
+void
+opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
+		bool block);
+
+/**
+ * Check how many entries can be input.
+ *
+ * @param t
+ *   The opdl_ring instance to check.
+ *
+ * @return
+ *   The number of new entries currently allowed to be input.
+ */
+uint32_t
+opdl_ring_available(struct opdl_ring *t);
+
+/**
+ * Check how many entries can be processed in a stage.
+ *
+ * @param s
+ *   The stage to check.
+ *
+ * @return
+ *   The number of entries currently available to be processed in this stage.
+ */
+uint32_t
+opdl_stage_available(struct opdl_stage *s);
+
+/**
+ * Check how many entries are available to be processed.
+ *
+ * NOTE : DOES NOT CHANGE ANY STATE WITHIN THE STAGE
+ *
+ * @param s
+ *   The stage to check.
+ *
+ * @param num_entries
+ *   The number of entries to check for availability.
+ *
+ * @return
+ *   The number of entries currently available to be processed in this stage.
+ */
+uint32_t
+opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
+
+/**
+ * Create empty stage instance and return the pointer.
+ *
+ * @param t
+ *   The pointer of  opdl_ring.
+ *
+ * @param threadsafe
+ *    enable multiple thread or not.
+ * @return
+ *   The pointer of one empty stage instance.
+ */
+struct opdl_stage *
+opdl_stage_create(struct opdl_ring *t,  bool threadsafe);
+
+/**
+ * Prints information on opdl_ring instance and all its stages
+ *
+ * @param t
+ *   The stage to print info on.
+ * @param f
+ *   Where to print the info.
+ */
+void
+opdl_ring_dump(const struct opdl_ring *t, FILE *f);
+
+/**
+ * Blocks until all entries in a opdl_ring have been processed by all stages.
+ *
+ * @param t
+ *   The opdl_ring instance to flush.
+ */
+void
+opdl_ring_flush(struct opdl_ring *t);
+
+/**
+ * Deallocates all resources used by a opdl_ring instance
+ *
+ * @param t
+ *   The opdl_ring instance to free.
+ */
+void
+opdl_ring_free(struct opdl_ring *t);
+
+/**
+ * Search for a opdl_ring by its name
+ *
+ * @param name
+ *   The name of the opdl_ring.
+ * @return
+ *   The pointer to the opdl_ring matching the name, or NULL if not found.
+ *
+ */
+struct opdl_ring *
+opdl_ring_lookup(const char *name);
+
+/**
+ * Set a opdl_stage to threadsafe variable.
+ *
+ * @param s
+ *   The opdl_stage.
+ * @param threadsafe
+ *   Threadsafe value.
+ */
+void
+opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
+
+
+/**
+ * Compare the event descriptor with original version in the ring.
+ * if key field event descriptor is changed by application, then
+ * update the slot in the ring otherwise do nothing with it.
+ * the key field is flow_id, prioirty, mbuf, impl_opaque
+ *
+ * @param s
+ *   The opdl_stage.
+ * @param ev
+ *   pointer of the event descriptor.
+  * @param index
+ *   index of the event descriptor.
+ * @param atomic
+ *   queue type associate with the stage.
+ * @return
+ *   if the evevnt key field is changed compare with previous record.
+ */
+
+bool
+opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+		uint32_t index, bool atomic);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif  /* _OPDL_H_ */
diff --git a/drivers/event/opdl/rte_pmd_evdev_opdl_version.map b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
new file mode 100644
index 0000000..5352e7e
--- /dev/null
+++ b/drivers/event/opdl/rte_pmd_evdev_opdl_version.map
@@ -0,0 +1,3 @@ 
+DPDK_17.05 {
+	local: *;
+};