[dpdk-dev,2/7] event/opdl: add the opdl pmd header and init helper function

Message ID 1511522632-139652-3-git-send-email-liang.j.ma@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Liang, Ma Nov. 24, 2017, 11:23 a.m. UTC
  From: Liang Ma <liang.j.ma@intel.com>

opdl_evdev.h include the main data structure of opdl device
and all the function prototype need to be exposed to support
eventdev API.

opdl_evdev_init.c implement all initailization helper function

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 drivers/event/opdl/opdl_evdev.h      | 353 +++++++++++++
 drivers/event/opdl/opdl_evdev_init.c | 945 +++++++++++++++++++++++++++++++++++
 2 files changed, 1298 insertions(+)
 create mode 100644 drivers/event/opdl/opdl_evdev.h
 create mode 100644 drivers/event/opdl/opdl_evdev_init.c
  

Patch

diff --git a/drivers/event/opdl/opdl_evdev.h b/drivers/event/opdl/opdl_evdev.h
new file mode 100644
index 0000000..e2657de
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev.h
@@ -0,0 +1,353 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _OPDL_EVDEV_H_
+#define _OPDL_EVDEV_H_
+
+#include <rte_eventdev.h>
+#include <rte_eventdev_pmd_vdev.h>
+#include <rte_atomic.h>
+#include "opdl_ring.h"
+
+#define OPDL_QID_NUM_FIDS 1024
+#define OPDL_IQS_MAX 1
+#define OPDL_Q_PRIORITY_MAX 1
+#define OPDL_PORTS_MAX 64
+#define MAX_OPDL_CONS_Q_DEPTH 128
+/* OPDL size */
+#define OPDL_INFLIGHT_EVENTS_TOTAL 4096
+/* allow for lots of over-provisioning */
+#define OPDL_FRAGMENTS_MAX 1
+
+/* report dequeue burst sizes in buckets */
+#define OPDL_DEQ_STAT_BUCKET_SHIFT 2
+/* how many packets pulled from port by sched */
+#define SCHED_DEQUEUE_BURST_SIZE 32
+
+/* size of our history list */
+#define OPDL_PORT_HIST_LIST (MAX_OPDL_PROD_Q_DEPTH)
+
+/* how many data points use for average stats */
+#define NUM_SAMPLES 64
+
+#define EVENTDEV_NAME_OPDL_PMD event_opdl
+#define OPDL_PMD_NAME RTE_STR(event_opdl)
+#define OPDL_PMD_NAME_MAX 64
+
+#define OPDL_INVALID_QID 255
+
+#define OPDL_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+
+#define OPDL_NUM_POLL_BUCKETS  \
+	(MAX_OPDL_CONS_Q_DEPTH >> OPDL_DEQ_STAT_BUCKET_SHIFT)
+
+enum {
+	QE_FLAG_VALID_SHIFT = 0,
+	QE_FLAG_COMPLETE_SHIFT,
+	QE_FLAG_NOT_EOP_SHIFT,
+	_QE_FLAG_COUNT
+};
+
+enum port_type {
+	OPDL_INVALID_PORT = 0,
+	OPDL_REGULAR_PORT = 1,
+	OPDL_PURE_RX_PORT,
+	OPDL_PURE_TX_PORT,
+	OPDL_ASYNC_PORT
+};
+
+enum queue_type {
+	OPDL_Q_TYPE_INVALID = 0,
+	OPDL_Q_TYPE_SINGLE_LINK = 1,
+	OPDL_Q_TYPE_ATOMIC,
+	OPDL_Q_TYPE_ORDERED
+};
+
+enum queue_pos {
+	OPDL_Q_POS_START = 0,
+	OPDL_Q_POS_MIDDLE,
+	OPDL_Q_POS_END
+};
+
+#define QE_FLAG_VALID    (1 << QE_FLAG_VALID_SHIFT)    /* for NEW FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP  */
+#define QE_FLAG_NOT_EOP  (1 << QE_FLAG_NOT_EOP_SHIFT)  /* set for FRAG only  */
+
+static const uint8_t opdl_qe_flag_map[] = {
+	QE_FLAG_VALID /* NEW Event */,
+	QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */,
+	QE_FLAG_COMPLETE /* RELEASE Event */,
+
+	/* Values which can be used for future support for partial
+	 * events, i.e. where one event comes back to the scheduler
+	 * as multiple which need to be tracked together
+	 */
+	QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
+#define OPDL_LOG_INFO(fmt, args...) \
+	RTE_LOG(INFO, EVENTDEV, "[%s] line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__LINE__, ## args)
+
+#ifdef RTE_LIBRTE_PMD_EVDEV_OPDL_DEBUG
+#define OPDL_LOG_DBG(fmt, args...) \
+	RTE_LOG(DEBUG, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__func__, __LINE__, ## args)
+#else
+#define OPDL_LOG_DBG(fmt, args...)
+#endif
+
+#define OPDL_LOG_ERR(fmt, args...) \
+	RTE_LOG(ERR, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
+			OPDL_PMD_NAME, \
+			__func__, __LINE__, ## args)
+
+enum port_xstat_name {
+	claim_pkts_requested = 0,
+	claim_pkts_granted,
+	claim_non_empty,
+	claim_empty,
+	total_cycles,
+	max_num_port_xstat
+};
+
+#define OPDL_MAX_PORT_XSTAT_NUM (OPDL_PORTS_MAX * max_num_port_xstat)
+
+struct opdl_port;
+
+typedef uint16_t (*opdl_enq_operation)(struct opdl_port *port,
+		const struct rte_event ev[],
+		uint16_t num);
+
+typedef uint16_t (*opdl_deq_operation)(struct opdl_port *port,
+		struct rte_event ev[],
+		uint16_t num);
+
+struct opdl_evdev;
+
+struct opdl_stage_meta_data {
+	uint32_t num_claimed;	/* number of entries claimed by this stage */
+	uint32_t burst_sz;	/* Port claim burst size */
+};
+
+struct opdl_port {
+
+	/* back pointer */
+	struct opdl_evdev *opdl;
+
+	/* enq handler & stage instance */
+	opdl_enq_operation enq;
+	struct opdl_stage *enq_stage_inst;
+
+	/* deq handler & stage instance */
+	opdl_deq_operation deq;
+	struct opdl_stage *deq_stage_inst;
+
+	/* port id has correctly been set */
+	uint8_t configured;
+
+	/* set when the port is initialized */
+	uint8_t initialized;
+
+	/* A numeric ID for the port */
+	uint8_t id;
+
+	/* Space for claimed entries */
+	struct rte_event *entries[MAX_OPDL_CONS_Q_DEPTH];
+
+	/* RX/REGULAR/TX/ASYNC - determined on position in queue */
+	enum port_type p_type;
+
+	/* if the claim is static atomic type  */
+	bool atomic_claim;
+
+	/* Queue linked to this port - internal queue id*/
+	uint8_t queue_id;
+
+	/* Queue linked to this port - external queue id*/
+	uint8_t external_qid;
+
+	/* Next queue linked to this port - external queue id*/
+	uint8_t next_external_qid;
+
+	/* number of instances of this stage */
+	uint32_t num_instance;
+
+	/* instance ID of this stage*/
+	uint32_t instance_id;
+
+	/* track packets in and out of this port */
+	uint64_t port_stat[max_num_port_xstat];
+	uint64_t start_cycles;
+};
+
+struct opdl_queue_meta_data {
+	uint8_t         ext_id;
+	enum queue_type type;
+	int8_t          setup;
+};
+
+struct opdl_xstats_entry {
+	struct rte_event_dev_xstats_name stat;
+	unsigned int id;
+	uint64_t *value;
+};
+
+struct opdl_queue {
+
+	/* Turbine this queue is associated with */
+	uint32_t turbine_id;
+
+	/* type and position have correctly been set */
+	uint8_t configured;
+
+	/* port number and associated ports have been associated */
+	uint8_t initialized;
+
+	/* type of this queue (Atomic, Ordered, Parallel, Direct)*/
+	enum queue_type q_type;
+
+	/* position of queue (START, MIDDLE, END) */
+	enum queue_pos q_pos;
+
+	/* external queue id. It is mapped to the queue position */
+	uint8_t external_qid;
+
+	struct opdl_port *ports[OPDL_PORTS_MAX];
+	uint32_t nb_ports;
+
+	/* priority, reserved for future */
+	uint8_t priority;
+};
+
+
+#define OPDL_TUR_PER_DEV 12
+
+/* PMD needs an extra queue per turbine */
+#define OPDL_MAX_QUEUES (RTE_EVENT_MAX_QUEUES_PER_DEV - OPDL_TUR_PER_DEV)
+
+
+struct opdl_evdev {
+	struct rte_eventdev_data *data;
+
+	uint8_t started;
+
+	/* Max number of ports and queues*/
+	uint32_t max_port_nb;
+	uint32_t max_queue_nb;
+
+	/* slots in the turbine */
+	uint32_t nb_events_limit;
+
+	/*
+	 * Array holding all turbines for this device
+	 */
+	struct opdl_ring *turbine[OPDL_TUR_PER_DEV];
+	uint32_t nb_turbines;
+
+	struct opdl_queue_meta_data q_md[OPDL_MAX_QUEUES];
+	uint32_t nb_q_md;
+
+	/* Internal queues - one per logical queue */
+	struct opdl_queue
+		queue[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+
+	uint32_t nb_queues;
+
+	struct opdl_stage_meta_data s_md[OPDL_PORTS_MAX];
+
+	/* Contains all ports - load balanced and directed */
+	struct opdl_port ports[OPDL_PORTS_MAX] __rte_cache_aligned;
+	uint32_t nb_ports;
+
+	uint8_t q_map_ex_to_in[OPDL_INVALID_QID];
+
+	/* Stats */
+	struct opdl_xstats_entry port_xstat[OPDL_MAX_PORT_XSTAT_NUM];
+
+	char service_name[OPDL_PMD_NAME_MAX];
+	int socket;
+	int do_validation;
+};
+
+
+static inline struct opdl_evdev *
+opdl_pmd_priv(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+static inline const struct opdl_evdev *
+opdl_pmd_priv_const(const struct rte_eventdev *eventdev)
+{
+	return eventdev->data->dev_private;
+}
+
+uint16_t opdl_event_enqueue(void *port, const struct rte_event *ev);
+uint16_t opdl_event_enqueue_burst(void *port, const struct rte_event ev[],
+		uint16_t num);
+
+uint16_t opdl_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t opdl_event_dequeue_burst(void *port, struct rte_event *ev,
+		uint16_t num, uint64_t wait);
+void opdl_event_schedule(struct rte_eventdev *dev);
+
+void opdl_xstats_init(struct rte_eventdev *dev);
+int opdl_xstats_uninit(struct rte_eventdev *dev);
+int opdl_xstats_get_names(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		struct rte_event_dev_xstats_name *xstats_names,
+		unsigned int *ids, unsigned int size);
+int opdl_xstats_get(const struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+		const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t opdl_xstats_get_by_name(const struct rte_eventdev *dev,
+		const char *name, unsigned int *id);
+int opdl_xstats_reset(struct rte_eventdev *dev,
+		enum rte_event_dev_xstats_mode mode,
+		int16_t queue_port_id,
+		const uint32_t ids[],
+		uint32_t nb_ids);
+
+int opdl_add_event_handlers(struct rte_eventdev *dev);
+int build_all_dependencies(struct rte_eventdev *dev);
+int check_queues_linked(struct rte_eventdev *dev);
+int create_queues_and_rings(struct rte_eventdev *dev);
+int initialise_all_other_ports(struct rte_eventdev *dev);
+int initialise_queue_zero_ports(struct rte_eventdev *dev);
+int assign_internal_queue_ids(struct rte_eventdev *dev);
+void destroy_queues_and_rings(struct rte_eventdev *dev);
+
+
+#endif /* _OPDL_EVDEV_H_ */
diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c
new file mode 100644
index 0000000..2699cb2
--- /dev/null
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -0,0 +1,945 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <rte_bus_vdev.h>
+#include <rte_memzone.h>
+#include <rte_kvargs.h>
+#include <rte_ring.h>
+#include <rte_errno.h>
+#include <rte_cycles.h>
+
+#include "opdl_evdev.h"
+#include "opdl_ring.h"
+
+
+static inline uint32_t __attribute__((always_inline))
+enqueue_check(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	uint16_t i;
+
+	if (p->opdl->do_validation) {
+
+		for (i = 0; i < num; i++) {
+			if (ev[i].queue_id != p->next_external_qid) {
+				OPDL_LOG_ERR("ERROR - port:[%u] - event wants"
+						" to enq to q_id[%u],"
+						" but should be [%u]\n",
+						p->id,
+						ev[i].queue_id,
+						p->next_external_qid);
+				rte_errno = -EINVAL;
+				return 0;
+			}
+		}
+
+		/* Stats */
+		if (p->p_type == OPDL_PURE_RX_PORT ||
+				p->p_type == OPDL_ASYNC_PORT) {
+			/* Stats */
+			if (num_events) {
+				p->port_stat[claim_pkts_requested] += num;
+				p->port_stat[claim_pkts_granted] += num_events;
+				p->port_stat[claim_non_empty]++;
+				p->start_cycles = rte_rdtsc();
+			} else {
+				p->port_stat[claim_empty]++;
+				p->start_cycles = 0;
+			}
+		} else {
+			if (p->start_cycles) {
+				uint64_t end_cycles = rte_rdtsc();
+				p->port_stat[total_cycles] +=
+					end_cycles - p->start_cycles;
+			}
+		}
+	} else {
+		if (num > 0 &&
+				ev[0].queue_id != p->next_external_qid) {
+			rte_errno = -EINVAL;
+			return 0;
+		}
+	}
+
+	return num;
+}
+
+static inline void __attribute__((always_inline))
+update_on_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num,
+		uint16_t num_events)
+{
+	if (p->opdl->do_validation) {
+		int16_t i;
+		for (i = 0; i < num; i++)
+			ev[i].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+
+		/* Stats */
+		if (num_events) {
+			p->port_stat[claim_pkts_requested] += num;
+			p->port_stat[claim_pkts_granted] += num_events;
+			p->port_stat[claim_non_empty]++;
+			p->start_cycles = rte_rdtsc();
+		} else {
+			p->port_stat[claim_empty]++;
+			p->start_cycles = 0;
+		}
+	} else {
+		if (num > 0)
+			ev[0].queue_id =
+				p->opdl->queue[p->queue_id].external_qid;
+	}
+}
+
+
+/*
+ * Error RX enqueue:
+ *
+ *
+ */
+
+static uint16_t
+opdl_rx_error_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * RX enqueue:
+ *
+ * This function handles enqueue for a single input stage_inst with
+ *	threadsafe disabled or enabled. eg 1 thread using a stage_inst or
+ *	multiple threads sharing a stage_inst
+ */
+
+static uint16_t
+opdl_rx_enqueue(struct opdl_port *p,
+		const struct rte_event ev[],
+		uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
+				   ev,
+				   num,
+				   false);
+	if (!enqueue_check(p, ev, num, enqueued))
+		return 0;
+
+
+	if (enqueued < num)
+		rte_errno = -ENOSPC;
+
+	return enqueued;
+}
+
+/*
+ * Error TX handler
+ *
+ */
+
+static uint16_t
+opdl_tx_error_dequeue(struct opdl_port *p,
+		struct rte_event ev[],
+		uint16_t num)
+{
+	RTE_SET_USED(p);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(num);
+
+	rte_errno = -ENOSPC;
+
+	return 0;
+}
+
+/*
+ * TX single threaded claim
+ *
+ * This function handles dequeue for a single worker stage_inst with
+ *	threadsafe disabled. eg 1 thread using an stage_inst
+ */
+
+static uint16_t
+opdl_tx_dequeue_single_thread(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint16_t returned;
+
+	struct opdl_ring  *ring;
+
+	ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
+
+	returned = opdl_ring_copy_to_burst(ring,
+					   p->deq_stage_inst,
+					   ev,
+					   num,
+					   false);
+
+	update_on_dequeue(p, ev, num, returned);
+
+	return returned;
+}
+
+/*
+ * TX multi threaded claim
+ *
+ * This function handles dequeue for multiple worker stage_inst with
+ *	threadsafe disabled. eg multiple stage_inst each with its own instance
+ */
+
+static uint16_t
+opdl_tx_dequeue_multi_inst(struct opdl_port *p,
+			struct rte_event ev[],
+			uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+				    (void *)ev,
+				    num,
+				    NULL,
+				    false,
+				    false);
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
+}
+
+
+/*
+ * Worker thread claim
+ *
+ */
+
+static uint16_t
+opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
+{
+	uint32_t num_events = 0;
+
+	if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
+		OPDL_LOG_ERR(""
+				"Attempt to dequeue num of events larger than port (%d) max\n",
+				p->id);
+		rte_errno = -EINVAL;
+		return 0;
+	}
+
+
+	num_events = opdl_stage_claim(p->deq_stage_inst,
+			(void *)ev,
+			num,
+			NULL,
+			false,
+			p->atomic_claim);
+
+
+	update_on_dequeue(p, ev, num, num_events);
+
+	return num_events;
+}
+
+/*
+ * Worker thread disclaim
+ */
+
+static uint16_t
+opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
+{
+	uint16_t enqueued = 0;
+
+	enqueued = opdl_stage_disclaim(p->enq_stage_inst,
+				       num,
+				       false);
+
+	return enqueue_check(p, ev, num, enqueued);
+}
+
+static inline struct opdl_stage *__attribute__((always_inline))
+stage_for_port(struct opdl_queue *q, unsigned int i)
+{
+	if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
+		return q->ports[i]->enq_stage_inst;
+	else
+		return q->ports[i]->deq_stage_inst;
+}
+
+static int opdl_add_deps(struct opdl_evdev *device,
+			 int q_id,
+			 int deps_q_id)
+{
+	unsigned int i, j;
+	int status;
+	struct opdl_ring  *ring;
+	struct opdl_queue *queue = &device->queue[q_id];
+	struct opdl_queue *queue_deps = &device->queue[deps_q_id];
+	struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
+
+	/* sanity check that all stages are for same turbine */
+	for (i = 0; i < queue->nb_ports; i++) {
+		struct opdl_ring *r =
+			opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+		for (j = 0; j < queue_deps->nb_ports; j++) {
+			struct opdl_ring *rj =
+				opdl_stage_get_opdl_ring(
+						stage_for_port(queue_deps, j));
+			if (r != rj) {
+				OPDL_LOG_ERR("Stages and dependents"
+						" are not for same turbine");
+				for (uint32_t k = 0;
+						k < device->nb_turbines; k++) {
+					opdl_ring_dump(device->turbine[k],
+							stdout);
+				}
+				return -EINVAL;
+			}
+		}
+	}
+
+	/* Gather all stages instance in deps */
+	for (i = 0; i < queue_deps->nb_ports; i++)
+		dep_stages[i] = stage_for_port(queue_deps, i);
+
+
+	/* Add all deps for each port->stage_inst in this queue */
+	for (i = 0; i < queue->nb_ports; i++) {
+
+		ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
+
+		status = opdl_stage_deps_add(ring,
+				stage_for_port(queue, i),
+				queue->ports[i]->num_instance,
+				queue->ports[i]->instance_id,
+				dep_stages,
+				queue_deps->nb_ports);
+		if (status < 0)
+			return -EINVAL;
+	}
+
+	return 0;
+}
+
+int
+opdl_add_event_handlers(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	unsigned int i;
+
+	for (i = 0; i < device->max_port_nb; i++) {
+
+		struct opdl_port *port = &device->ports[i];
+
+		if (port->configured) {
+			if (port->p_type == OPDL_PURE_RX_PORT) {
+				port->enq = opdl_rx_enqueue;
+				port->deq = opdl_tx_error_dequeue;
+
+			} else if (port->p_type == OPDL_PURE_TX_PORT) {
+
+				port->enq = opdl_rx_error_enqueue;
+
+				if (port->num_instance == 1)
+					port->deq =
+						opdl_tx_dequeue_single_thread;
+				else
+					port->deq = opdl_tx_dequeue_multi_inst;
+
+			} else if (port->p_type == OPDL_REGULAR_PORT) {
+
+				port->enq = opdl_disclaim;
+				port->deq = opdl_claim;
+
+			} else if (port->p_type == OPDL_ASYNC_PORT) {
+
+				port->enq = opdl_rx_enqueue;
+
+				/* Always single instance */
+				port->deq = opdl_tx_dequeue_single_thread;
+			} else {
+				OPDL_LOG_ERR("port:[%u] has invalid port type - ",
+						port->id);
+				err = -EINVAL;
+				break;
+			}
+			port->initialized = 1;
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
+	return err;
+}
+
+int
+build_all_dependencies(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	uint8_t start_qid = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+		if (!queue->initialized)
+			break;
+
+		if (queue->q_pos == OPDL_Q_POS_START) {
+			start_qid = i;
+			continue;
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+		}
+
+		if (queue->q_pos == OPDL_Q_POS_END) {
+			/* Add this dependency */
+			err = opdl_add_deps(device, i, i-1);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+			/* Add dependency for rx on tx */
+			err = opdl_add_deps(device, start_qid, i);
+			if (err < 0) {
+				OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED",
+						queue->external_qid);
+				break;
+			}
+		}
+	}
+
+	if (!err)
+		fprintf(stdout, "Success - dependencies built\n");
+
+	return err;
+}
+int
+check_queues_linked(struct rte_eventdev *dev)
+{
+
+	int err = 0;
+	unsigned int i;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+	uint32_t nb_iq = 0;
+
+	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
+		struct opdl_queue *queue = &device->queue[i];
+
+		if (!queue->initialized)
+			break;
+
+		if (queue->external_qid == OPDL_INVALID_QID)
+			nb_iq++;
+
+		if (queue->nb_ports == 0) {
+			OPDL_LOG_ERR("queue:[%u] has no associated ports",
+					i);
+			err = -EINVAL;
+			break;
+		}
+	}
+	if (!err) {
+		if ((i - nb_iq) != device->max_queue_nb) {
+			OPDL_LOG_ERR("%u queues counted but should be %u",
+					i - nb_iq,
+					device->max_queue_nb);
+			err = -1;
+		} else {
+			fprintf(stdout, "Success - %u queues (ex:%u + in:%u) validated\n",
+					i,
+					device->max_queue_nb,
+					nb_iq);
+		}
+
+	}
+	return err;
+}
+
+void
+destroy_queues_and_rings(struct rte_eventdev *dev)
+{
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_turbines; i++) {
+		if (device->turbine[i])
+			opdl_ring_free(device->turbine[i]);
+	}
+
+	memset(&device->queue,
+			0,
+			sizeof(struct opdl_queue)
+			* RTE_EVENT_MAX_QUEUES_PER_DEV);
+}
+
+#define TURBINE_ID(d)(d->nb_turbines - 1)
+
+static inline void
+initialise_queue(struct opdl_evdev *device,
+		enum queue_pos pos,
+		int32_t i)
+{
+	struct opdl_queue *queue = &device->queue[device->nb_queues];
+
+	if (i == -1) {
+		queue->q_type = OPDL_Q_TYPE_ORDERED;
+		queue->external_qid = OPDL_INVALID_QID;
+	} else {
+		queue->q_type = device->q_md[i].type;
+		queue->external_qid = device->q_md[i].ext_id;
+		/* Add ex->in for queues setup */
+		device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
+	}
+	queue->turbine_id = TURBINE_ID(device);
+	queue->q_pos = pos;
+	queue->nb_ports = 0;
+	queue->configured = 1;
+
+	device->nb_queues++;
+}
+
+
+static inline int
+create_turbine(struct opdl_evdev *device)
+{
+	int err = 0;
+
+	char name[RTE_MEMZONE_NAMESIZE];
+
+	sprintf(name, "%s_%u", device->service_name, device->nb_turbines);
+
+	device->turbine[device->nb_turbines] =
+		opdl_ring_create(name,
+				device->nb_events_limit,
+				sizeof(struct rte_event),
+				device->max_port_nb * 2,
+				device->socket);
+
+	if (!device->turbine[device->nb_turbines]) {
+		OPDL_LOG_ERR("opdl ring %u creation - FAILED",
+				device->nb_turbines);
+		err = -EINVAL;
+	} else {
+		device->nb_turbines++;
+	}
+	return err;
+}
+
+static inline int
+create_link_turbine(struct opdl_evdev *device, uint32_t index)
+{
+
+	int err = 0;
+
+	if (device->q_md[index + 1].type !=
+			OPDL_Q_TYPE_SINGLE_LINK) {
+
+		/* async queue with regular
+		 * queue following it
+		 */
+
+		/* create a new turbine */
+		err = create_turbine(device);
+		if (!err) {
+			/* create an initial
+			 * dummy queue for new turbine
+			 */
+			initialise_queue(device,
+					OPDL_Q_POS_START,
+					-1);
+		} else {
+			err = -EINVAL;
+		}
+	} else {
+		OPDL_LOG_ERR("queue %u, 2"
+				" SINGLE_LINK queues, not allowed",
+				index);
+		err = -EINVAL;
+	}
+
+	return err;
+}
+
+int
+create_queues_and_rings(struct rte_eventdev *dev)
+{
+	int err = 0;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	device->nb_queues = 0;
+
+	if (device->nb_ports != device->max_port_nb) {
+		OPDL_LOG_ERR("Number ports setup:%u NOT EQUAL to max port"
+				" number:%u for this device",
+				device->nb_ports,
+				device->max_port_nb);
+		err = -1;
+	}
+
+	if (!err) {
+		/* We will have at least one turbine so create it now */
+		err = create_turbine(device);
+	}
+
+	if (!err) {
+
+		/* Create 1st "dummy" queue */
+		initialise_queue(device,
+				 OPDL_Q_POS_START,
+				 -1);
+
+		for (uint32_t i = 0; i < device->nb_q_md; i++) {
+
+			/* Check */
+			if (!device->q_md[i].setup) {
+
+				OPDL_LOG_ERR("queue meta data slot %u"
+						" not setup - FAILING",
+						i);
+				err = -EINVAL;
+				break;
+			} else if (device->q_md[i].type !=
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				if (!device->q_md[i + 1].setup) {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue at the end
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_END,
+							i);
+
+				} else {
+					/* Create a simple ORDERED/ATOMIC
+					 * queue in the middle
+					 */
+					initialise_queue(device,
+							OPDL_Q_POS_MIDDLE,
+							i);
+				}
+			} else if (device->q_md[i].type ==
+					OPDL_Q_TYPE_SINGLE_LINK) {
+
+				/* create last queue for this turbine */
+				initialise_queue(device,
+						OPDL_Q_POS_END,
+						i);
+
+				err = create_link_turbine(device, i);
+
+				if (err)
+					break;
+
+
+			}
+		}
+	}
+	if (err)
+		destroy_queues_and_rings(dev);
+	else
+		fprintf(stdout, "Success - Created %u queues and %u turbines\n",
+				device->nb_queues,
+				device->nb_turbines);
+
+	return err;
+}
+
+
+int
+initialise_all_other_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_stage *stage_inst = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		struct opdl_queue *queue = &device->queue[port->queue_id];
+
+		if (port->queue_id == 0) {
+			continue;
+		} else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
+
+			if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
+
+				/* Regular port with claim/disclaim */
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = stage_inst;
+
+				if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
+					port->atomic_claim = true;
+				else
+					port->atomic_claim = false;
+
+				port->p_type =  OPDL_REGULAR_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else if (queue->q_pos == OPDL_Q_POS_END) {
+
+				/* tx port  */
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						false);
+				port->deq_stage_inst = stage_inst;
+				port->enq_stage_inst = NULL;
+				port->p_type = OPDL_PURE_TX_PORT;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			} else {
+
+				OPDL_LOG_ERR("port %u:, linked incorrectly"
+						" to a q_pos START/INVALID %u",
+						port->id,
+						queue->q_pos);
+				err = -EINVAL;
+				break;
+			}
+
+		} else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
+
+			port->p_type = OPDL_ASYNC_PORT;
+
+			/* -- tx -- */
+			stage_inst = opdl_stage_add(
+				device->turbine[queue->turbine_id],
+					false,
+					false); /* First stage */
+			port->deq_stage_inst = stage_inst;
+
+			/* Add the port to the queue array of ports */
+			queue->ports[queue->nb_ports] = port;
+			port->instance_id = queue->nb_ports;
+			queue->nb_ports++;
+
+			if (queue->nb_ports > 1) {
+				OPDL_LOG_ERR("queue %u:, setup as SINGLE_LINK"
+					" but has more than one port linked",
+						queue->external_qid);
+				err = -EINVAL;
+				break;
+			}
+
+			/* -- single instance rx for next turbine -- */
+			uint8_t next_qid =
+				device->q_map_ex_to_in[queue->external_qid] + 1;
+			if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
+					device->queue[next_qid].configured) {
+
+				/* Remap the queue */
+				queue = &device->queue[next_qid];
+
+				stage_inst = opdl_stage_add(
+					device->turbine[queue->turbine_id],
+						false,
+						true);
+				port->enq_stage_inst = stage_inst;
+
+				/* Add the port to the queue array of ports */
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+				if (queue->nb_ports > 1) {
+					OPDL_LOG_ERR("dummy queue %u: for "
+						"port %u, "
+						"SINGLE_LINK but has more "
+						"than one port linked",
+							next_qid,
+							port->id);
+					err = -EINVAL;
+					break;
+				}
+				/* Set this queue to initialized as it is never
+				 * referenced by any ports
+				 */
+				queue->initialized = 1;
+			}
+		}
+	}
+
+	/* Now that all ports are initialised we need to
+	 * setup the last bit of stage md
+	 */
+	if (!err) {
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+
+			if (port->configured &&
+					(port->queue_id != OPDL_INVALID_QID)) {
+				if (queue->nb_ports == 0) {
+					OPDL_LOG_ERR("queue:[%u] has no ports"
+							" linked to it",
+							port->id);
+					err = -EINVAL;
+					break;
+				}
+
+				port->num_instance = queue->nb_ports;
+				port->initialized = 1;
+				queue->initialized = 1;
+			} else {
+				OPDL_LOG_ERR("Port:[%u] not configured  invalid"
+						" queue configuration",
+						port->id);
+				err = -EINVAL;
+				break;
+			}
+		}
+	}
+
+	if (!err) {
+		fprintf(stdout,
+				"Success - %u port(s) initialized\n",
+				device->nb_ports);
+	}
+	return err;
+}
+
+int
+initialise_queue_zero_ports(struct rte_eventdev *dev)
+{
+	int err = 0;
+	uint8_t mt_rx = 0;
+	struct opdl_stage *stage_inst = NULL;
+	struct opdl_queue *queue = NULL;
+
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	/* Assign queue zero and figure out how many Q0 ports we have */
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->queue_id == OPDL_INVALID_QID) {
+			port->queue_id = 0;
+			port->external_qid = OPDL_INVALID_QID;
+			port->p_type = OPDL_PURE_RX_PORT;
+			mt_rx++;
+		}
+	}
+
+	/* Create the stage */
+	stage_inst = opdl_stage_add(device->turbine[0],
+			(mt_rx > 1 ? true : false),
+			true);
+	if (stage_inst) {
+
+		/* Assign the new created input stage to all relevant ports */
+		for (uint32_t i = 0; i < device->nb_ports; i++) {
+			struct opdl_port *port = &device->ports[i];
+			if (port->queue_id == 0) {
+				queue = &device->queue[port->queue_id];
+				port->enq_stage_inst = stage_inst;
+				port->deq_stage_inst = NULL;
+				port->configured = 1;
+				port->initialized = 1;
+
+				queue->ports[queue->nb_ports] = port;
+				port->instance_id = queue->nb_ports;
+				queue->nb_ports++;
+			}
+		}
+	} else {
+		err = -1;
+	}
+
+	if (!err) {
+		fprintf(stdout, "Success - (%u) \"Queue 0\" port(s) "
+				"initialized\n",
+				queue->nb_ports);
+	}
+	return err;
+}
+
+int
+assign_internal_queue_ids(struct rte_eventdev *dev)
+{
+	int err = 0;
+	struct opdl_evdev *device = opdl_pmd_priv(dev);
+
+	for (uint32_t i = 0; i < device->nb_ports; i++) {
+		struct opdl_port *port = &device->ports[i];
+		if (port->external_qid != OPDL_INVALID_QID) {
+			port->queue_id =
+				device->q_map_ex_to_in[port->external_qid];
+
+			/* Now do the external_qid of the next queue */
+			struct opdl_queue *queue =
+				&device->queue[port->queue_id];
+			if (queue->q_pos == OPDL_Q_POS_END)
+				port->next_external_qid =
+				device->queue[port->queue_id + 2].external_qid;
+			else
+				port->next_external_qid =
+				device->queue[port->queue_id + 1].external_qid;
+		}
+	}
+	return err;
+}