[dpdk-dev,6/7] examples/eventdev_pipeline_opdl: adding example

Message ID 1511522632-139652-7-git-send-email-liang.j.ma@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/Intel-compilation fail Compilation issues
ci/checkpatch success coding style OK

Commit Message

Liang, Ma Nov. 24, 2017, 11:23 a.m. UTC
  From: Liang Ma <liang.j.ma@intel.com>

This patch adds a sample app to the examples/ directory, which can
be used as a reference application and for general testing.
The application requires two ethdev ports and expects traffic to be
flowing. The application must be run with the --vdev flags as
follows to indicate to EAL that a virtual eventdev device called
"evdev_opdl0" is available to be used:

    ./build/eventdev_pipeline_opdl_pmd  --vdev=event_opdl0

The general flow of the traffic is as follows:

    Rx core -> Ordered Queue (4 worker) =>
    Ordered Queue (4 worker) => TX core

Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter, Mccarthy <peter.mccarthy@intel.com>
---
 examples/eventdev_pipeline_opdl_pmd/Makefile |  49 ++
 examples/eventdev_pipeline_opdl_pmd/main.c   | 766 +++++++++++++++++++++++++++
 2 files changed, 815 insertions(+)
 create mode 100644 examples/eventdev_pipeline_opdl_pmd/Makefile
 create mode 100644 examples/eventdev_pipeline_opdl_pmd/main.c
  

Patch

diff --git a/examples/eventdev_pipeline_opdl_pmd/Makefile b/examples/eventdev_pipeline_opdl_pmd/Makefile
new file mode 100644
index 0000000..63aea65
--- /dev/null
+++ b/examples/eventdev_pipeline_opdl_pmd/Makefile
@@ -0,0 +1,49 @@ 
+#   BSD LICENSE
+#
+#   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = eventdev_pipeline_opdl_pmd
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/eventdev_pipeline_opdl_pmd/main.c b/examples/eventdev_pipeline_opdl_pmd/main.c
new file mode 100644
index 0000000..6b9e48a
--- /dev/null
+++ b/examples/eventdev_pipeline_opdl_pmd/main.c
@@ -0,0 +1,766 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <getopt.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <signal.h>
+#include <sched.h>
+#include <stdbool.h>
+
+#include <rte_eal.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_launch.h>
+#include <rte_malloc.h>
+#include <rte_random.h>
+#include <rte_cycles.h>
+#include <rte_ethdev.h>
+#include <rte_eventdev.h>
+#include <rte_service.h>
+
+#define BATCH_SIZE 32
+
+static struct rte_event_dev_config config = {
+		.nb_event_queues = 3,
+		.nb_event_ports = 10,
+		.nb_events_limit  = 4096,
+		.nb_event_queue_flows = 1024,
+		.nb_event_port_dequeue_depth = 128,
+		.nb_event_port_enqueue_depth = 128,
+};
+
+static struct rte_event_port_conf wkr_p_conf = {
+		.dequeue_depth = 128,
+		.enqueue_depth = 128,
+		.new_event_threshold = 1024,
+};
+
+static struct rte_event_queue_conf wkr_q_conf = {
+		.event_queue_cfg = 0,
+		.schedule_type = RTE_SCHED_TYPE_ORDERED,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+		.nb_atomic_flows = 1024,
+		.nb_atomic_order_sequences = 1024,
+};
+
+static struct rte_event_port_conf tx_p_conf = {
+		.dequeue_depth = 32,
+		.enqueue_depth = 32,
+		.new_event_threshold = 32,
+};
+
+static const struct rte_event_queue_conf tx_q_conf = {
+		.event_queue_cfg = 0,
+		.schedule_type = RTE_SCHED_TYPE_ORDERED,
+		.nb_atomic_flows = 1024,
+		.nb_atomic_order_sequences = 1024,
+};
+
+static struct rte_event_port_conf rx_p_conf = {
+		.dequeue_depth = 32,
+		.enqueue_depth = 32,
+		.new_event_threshold = 1024,
+};
+
+struct prod_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+	int32_t qid;
+	unsigned int num_nic_ports;
+} __rte_cache_aligned;
+
+static struct prod_data producer_data;
+
+struct cons_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
+} __rte_cache_aligned;
+
+static struct cons_data consumer_data;
+
+struct worker_data {
+	uint8_t dev_id;
+	uint8_t port_id;
+	uint8_t thread_id;
+};
+
+#define QUEUE_0_ID 0
+#define QUEUE_1_ID 1
+#define QUEUE_2_ID 2
+
+
+#define NUM_WORKERS 8
+static struct worker_data worker_data[NUM_WORKERS];
+
+volatile int done;
+
+static uint8_t port_map_4[4] = { 1, 2, 3, 0 };
+static uint8_t port_map_3[3] = { 1, 2, 0 };
+static uint8_t port_map_2[2] = { 1, 0 };
+static uint8_t port_map_1[1] = { 0 };
+static uint8_t *port_map;
+
+__thread  long long  packet_num;
+
+static void
+eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
+			void *userdata)
+{
+	int port_id = (uintptr_t) userdata;
+	unsigned int _sent = 0;
+
+	do {
+		/* Note: hard-coded TX queue */
+		_sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
+					  unsent - _sent);
+	} while (_sent != unsent);
+}
+
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS,
+			.max_rx_pkt_len = ETHER_MAX_LEN
+		},
+		.rx_adv_conf = {
+			.rss_conf = {
+				.rss_hf = ETH_RSS_IP |
+					  ETH_RSS_TCP |
+					  ETH_RSS_UDP,
+			}
+		}
+	};
+	const uint16_t rx_rings = 1, tx_rings = 1;
+	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+	struct rte_eth_conf port_conf = port_conf_default;
+	int retval;
+	uint16_t q;
+
+	if (port >= rte_eth_dev_count())
+		return -1;
+
+	/* Configure the Ethernet device. */
+	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	/* Allocate and set up 1 RX queue per Ethernet port. */
+	for (q = 0; q < rx_rings; q++) {
+		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Allocate and set up 1 TX queue per Ethernet port. */
+	for (q = 0; q < tx_rings; q++) {
+		retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+				rte_eth_dev_socket_id(port), NULL);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Start the Ethernet port. */
+	retval = rte_eth_dev_start(port);
+	if (retval < 0)
+		return retval;
+
+	/* Display the port MAC address. */
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port, &addr);
+	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+			(unsigned int)port,
+			addr.addr_bytes[0], addr.addr_bytes[1],
+			addr.addr_bytes[2], addr.addr_bytes[3],
+			addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	/* Enable RX in promiscuous mode for the Ethernet device. */
+	rte_eth_promiscuous_enable(port);
+
+	return 0;
+}
+
+static int
+init_ports(unsigned int num_ports)
+{
+	uint8_t portid;
+	unsigned int i;
+
+	struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+			/* mbufs */ 16384 * num_ports,
+			/* cache_size */ 512,
+			/* priv_size*/ 0,
+			/* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+			rte_socket_id());
+
+	for (portid = 0; portid < num_ports; portid++)
+		if (port_init(portid, mp) != 0)
+			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+					portid);
+
+	for (i = 0; i < num_ports; i++) {
+		void *userdata = (void *)(uintptr_t) i;
+		consumer_data.tx_buf[i] =
+				rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
+
+		rte_eth_promiscuous_enable(i);
+		if (consumer_data.tx_buf[i] == NULL)
+			rte_panic("Out of memory\n");
+		rte_eth_tx_buffer_init(consumer_data.tx_buf[i], 32);
+		rte_eth_tx_buffer_set_err_callback(consumer_data.tx_buf[i],
+				eth_tx_buffer_retry,
+				userdata);
+	}
+
+	return 0;
+}
+
+static int
+rx_thread(void *arg)
+{
+	uint16_t nb_rx;
+
+	struct rte_mbuf *mbufs[BATCH_SIZE*3];
+	struct rte_event ev[BATCH_SIZE*3] = {0};
+	uint32_t i, j;
+	uint16_t nb_enqueue = 0;
+
+	int32_t qid = producer_data.qid;
+	uint8_t dev_id = producer_data.dev_id;
+	uint8_t port_id = producer_data.port_id;
+	uint32_t prio_idx = 0;
+	uint32_t num_ports = rte_eth_dev_count();
+	RTE_SET_USED(arg);
+
+	printf("Rx thread port_id %d started\n", port_id);
+
+	packet_num  = 0;
+
+	while (!done) {
+
+		for (j = 0; j < num_ports; j++) {
+
+			nb_rx = rte_eth_rx_burst(j, 0, mbufs, BATCH_SIZE);
+
+			for (i = 0; i < nb_rx; i++) {
+				ev[i].flow_id = 1;
+				ev[i].op = RTE_EVENT_OP_NEW;
+				ev[i].sched_type = RTE_SCHED_TYPE_ORDERED;
+				ev[i].queue_id = qid;
+				ev[i].event_type = RTE_EVENT_TYPE_ETHDEV;
+				ev[i].sub_event_type = 0;
+				ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
+				ev[i].mbuf = mbufs[i];
+				RTE_SET_USED(prio_idx);
+			}
+
+			nb_enqueue = rte_event_enqueue_burst(dev_id,
+					port_id, ev, nb_rx);
+
+			packet_num += nb_enqueue;
+
+			if (nb_enqueue != nb_rx) {
+				for (i = nb_enqueue; i < nb_rx; i++)
+					rte_pktmbuf_free(mbufs[i]);
+			}
+
+
+		}
+
+	}
+
+	printf("Rx done,  total packet num is %lld\n", packet_num);
+
+	return 0;
+}
+
+static inline void
+work(struct rte_mbuf *m)
+{
+	m->port = port_map[m->port];
+}
+
+static int
+worker_thread(void *arg)
+{
+	unsigned int i;
+	struct rte_event events[BATCH_SIZE] = {0};
+	struct worker_data *w_data = (struct worker_data *)arg;
+	uint8_t dev_id = w_data->dev_id;
+	uint8_t port_id = w_data->port_id;
+	RTE_SET_USED(arg);
+	uint16_t nb_event_deq;
+
+	printf("Worker thread %d port_id %d started\n",
+			w_data->thread_id, port_id);
+
+	packet_num =  0;
+
+	while (!done) {
+		nb_event_deq = rte_event_dequeue_burst(dev_id,
+				port_id,
+				events,
+				RTE_DIM(events),
+				0);
+
+		if (nb_event_deq == 0) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_event_deq; i++) {
+			events[i].queue_id += 1;
+			events[i].op = RTE_EVENT_OP_FORWARD;
+			events[i].sched_type = RTE_SCHED_TYPE_ORDERED;
+			if (w_data->thread_id > 3)
+				work(events[i].mbuf);
+		}
+
+		packet_num += nb_event_deq;
+
+		rte_event_enqueue_burst(dev_id, port_id,
+				events, nb_event_deq);
+	}
+
+	return 0;
+}
+
+static int
+tx_thread(void *arg)
+{
+	int j;
+	struct rte_event events[BATCH_SIZE];
+	uint8_t dev_id = consumer_data.dev_id;
+	uint8_t eth_dev_count, port_id = consumer_data.port_id;
+
+	RTE_SET_USED(arg);
+
+	eth_dev_count = rte_eth_dev_count();
+
+	packet_num = 0;
+
+	while (!done) {
+
+		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+				events, RTE_DIM(events), 0);
+
+		packet_num += n;
+
+		int i;
+
+		if (n == 0) {
+			for (j = 0; j < eth_dev_count; j++)
+				rte_eth_tx_buffer_flush(j,
+						0,
+						consumer_data.tx_buf[j]);
+			continue;
+		}
+
+		if (events[0].queue_id != QUEUE_2_ID)
+			printf("ERROR TX expected q_id:[%u], "
+					"but received:[%u]\n",
+					QUEUE_2_ID, events[0].queue_id);
+
+		for (i = 0; i < n; i++) {
+			uint8_t outport = events[i].mbuf->port;
+			rte_eth_tx_buffer(outport,
+					0,
+					consumer_data.tx_buf[outport],
+					events[i].mbuf);
+		}
+	}
+
+	return 0;
+}
+
+static int
+setup_eventdev(uint8_t dev_id)
+{
+	uint8_t i, num_wrk_port = 8;
+	uint8_t ev_port = 0, queue0 = QUEUE_0_ID;
+	uint8_t queue1 = QUEUE_1_ID, queue2 = QUEUE_2_ID;
+
+	int ret, ndev = rte_event_dev_count();
+	if (ndev < 1) {
+		printf("%d: No Eventdev Devices Found\n", __LINE__);
+		return -1;
+	}
+
+	struct rte_event_dev_info dev_info;
+	ret = rte_event_dev_info_get(dev_id, &dev_info);
+	if (ret < 0) {
+		printf("%s%d: Error getting device info\n", __FILE__, __LINE__);
+		return -1;
+	}
+
+	if (dev_info.max_event_port_dequeue_depth <
+			config.nb_event_port_dequeue_depth)
+		config.nb_event_port_dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	if (dev_info.max_event_port_enqueue_depth <
+			config.nb_event_port_enqueue_depth)
+		config.nb_event_port_enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;
+
+	ret = rte_event_dev_configure(dev_id, &config);
+	if (ret < 0) {
+		printf("%s%d: Error configuring device\n", __FILE__, __LINE__);
+		return -1;
+	}
+
+	/* Create Queues */
+	if (rte_event_queue_setup(dev_id, queue0, &wkr_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, queue0);
+		return -1;
+	}
+
+	if (rte_event_queue_setup(dev_id, queue1, &wkr_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, queue1);
+		return -1;
+	}
+
+	if (rte_event_queue_setup(dev_id, queue2, &tx_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, queue2);
+		return -1;
+	}
+
+	/* Check that port configs are valid for this device */
+
+	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		rx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (rx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		rx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	/* Create rx_port */
+	if (rte_event_port_setup(dev_id, ev_port, &rx_p_conf) < 0) {
+		printf("Error setting up rx port\n");
+		return -1;
+	}
+	ev_port++;
+
+	/* Create worker ports */
+	for (i = 0; i < num_wrk_port; i++) {
+		if (rte_event_port_setup(dev_id, ev_port, &wkr_p_conf) < 0) {
+			printf("Error setting up worker port %d\n", i);
+			return -1;
+		}
+		ev_port++;
+	}
+
+	/* Create tx_port */
+	if (rte_event_port_setup(dev_id, ev_port, &tx_p_conf) < 0) {
+		printf("Error setting up tx port\n");
+		return -1;
+	}
+	ev_port++;
+
+	/* Link ports 1 - 4 to queue 1*/
+	for (i = 1; i < 5; i++) {
+		if (rte_event_port_link(dev_id,
+					i,
+					&queue0,
+					&wkr_q_conf.priority,
+					1)
+				!= 1) {
+			printf("%d: error creating link for port %d\n",
+					__LINE__, i);
+			return -1;
+		}
+	}
+	/* Link ports 5 - 8 to queue 2*/
+	for (i = 5; i < 9; i++) {
+		if (rte_event_port_link(dev_id,
+					i,
+					&queue1,
+					&wkr_q_conf.priority,
+					1)
+				!= 1) {
+			printf("%d: error creating link for port %d\n",
+					__LINE__, i);
+			return -1;
+		}
+	}
+	/* Link tx port to queue 3*/
+	if (rte_event_port_link(dev_id,
+				i,
+				&queue2,
+				&tx_q_conf.priority,
+				1) != 1) {
+		printf("%d: error creating link for port %d\n", __LINE__, i);
+		return -1;
+	}
+
+	if (rte_event_dev_start(dev_id) < 0) {
+		printf("Error starting eventdev\n");
+		return -1;
+	}
+
+	rte_event_dev_dump(dev_id, stdout);
+
+	return 0;
+}
+
+static void print_statistics(FILE *f)
+{
+	int num_ports = 10; /* Hard-coded for this app */
+
+	for (int i = 0; i < num_ports; i++) {
+		int num_stats, num_stats_returned;
+
+		num_stats = rte_event_dev_xstats_names_get(0,
+				RTE_EVENT_DEV_XSTATS_PORT,
+				i,
+				NULL,
+				NULL,
+				0);
+		if (num_stats > 0) {
+
+			uint32_t ids[num_stats];
+			struct rte_event_dev_xstats_name names[num_stats];
+			uint64_t values[num_stats];
+
+			num_stats_returned = rte_event_dev_xstats_names_get(0,
+					RTE_EVENT_DEV_XSTATS_PORT,
+					i,
+					names,
+					ids,
+					num_stats);
+			if (num_stats == num_stats_returned) {
+				num_stats_returned = rte_event_dev_xstats_get(0,
+						RTE_EVENT_DEV_XSTATS_PORT,
+						i,
+						ids,
+						values,
+						num_stats);
+				if (num_stats == num_stats_returned) {
+					fprintf(f,
+						"Port : [%02u] Statistics\n",
+						i);
+					for (int j = 0; j < num_stats; j++) {
+						fprintf(f,
+							"\t%30s = %16lu\n",
+							names[j].name,
+							values[j]);
+					}
+				}
+			}
+		}
+	}
+}
+
+static void
+signal_handler(int signum)
+{
+
+	if (signum == SIGINT || signum == SIGTERM) {
+		printf("\n\nSignal %d received, preparing to exit...\n",
+				signum);
+		done = 1;
+	}
+}
+
+static int
+scheduler_thread(void *arg)
+{
+	uint8_t dev_id = *(uint8_t *)arg;
+	uint32_t evdev_service_id;
+
+	if (rte_event_dev_service_id_get(dev_id, &evdev_service_id))
+		return -1;
+
+	while (!done) {
+
+		/* run one iteration of the service on the calling core */
+		rte_service_run_iter_on_app_lcore(evdev_service_id, true);
+	}
+	return 0;
+}
+
+
+int
+main(int argc, char **argv)
+{
+	unsigned int num_ports;
+	uint8_t dev_id = 0;
+	int err, lcore_id, this_lcore;
+	int i, worker_nb = 0;
+	int rx_th = 0, sch_th = 0, tx_th = 0;
+
+	signal(SIGINT, signal_handler);
+	signal(SIGTERM, signal_handler);
+	signal(SIGTSTP, signal_handler);
+
+	err = rte_eal_init(argc, argv);
+	if (err < 0)
+		rte_panic("Invalid EAL arguments\n");
+
+	num_ports = rte_eth_dev_count();
+	if (num_ports == 4) {
+		port_map = port_map_4;
+	} else if (num_ports == 3) {
+		port_map = port_map_3;
+	} else if (num_ports == 2) {
+		port_map = port_map_2;
+	} else if (num_ports == 1) {
+		port_map = port_map_1;
+	} else {
+		rte_panic("Incorrect number of ethernet ports found:[%u]\n",
+				num_ports);
+	}
+
+	const unsigned int ndevs = rte_event_dev_count();
+	if (ndevs == 0)
+		rte_panic("No dev_id devs found. Pass in a --vdev eventdev.\n");
+	if (ndevs > 1)
+		fprintf(stderr, "Warning: More than one eventdev, using idx 0");
+
+	err = setup_eventdev(dev_id);
+
+	if (err < 0) {
+		fprintf(stderr, "Error: setup_eventdev failed\n");
+		return -1;
+	}
+
+	struct rte_event_dev_info dev_info;
+	err = rte_event_dev_info_get(dev_id, &dev_info);
+
+	if (err < 0) {
+		printf("%s%d: Error getting device info\n", __FILE__, __LINE__);
+		return -1;
+	}
+	producer_data.dev_id = dev_id;
+	producer_data.num_nic_ports = num_ports;
+	producer_data.qid = QUEUE_0_ID;
+	producer_data.port_id = 0;
+
+	for (i = 0; i < NUM_WORKERS; i++) {
+		worker_data[i].dev_id = dev_id;
+		worker_data[i].port_id = i+1;
+		worker_data[i].thread_id = i;
+	}
+
+	consumer_data.dev_id = dev_id;
+	consumer_data.port_id = 9;
+
+	init_ports(num_ports);
+
+	this_lcore = rte_lcore_id();
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+
+		if ((rx_th == 0) && (this_lcore != lcore_id)) {
+			err = rte_eal_remote_launch(rx_thread, NULL, lcore_id);
+			printf("Start rx thread\n");
+			if (err) {
+				printf("Failed to launch rx on core %d\n",
+						lcore_id);
+			} else {
+				rx_th = 1;
+				continue;
+			}
+		}
+
+		if ((worker_nb < NUM_WORKERS) && (this_lcore != lcore_id)) {
+			err = rte_eal_remote_launch(worker_thread,
+					&worker_data[worker_nb],
+					lcore_id);
+			if (err) {
+				printf("Failed to launch worker on core %d\n",
+						lcore_id);
+			} else {
+				worker_nb++;
+				continue;
+			}
+		}
+
+		if ((tx_th == 0) && (this_lcore != lcore_id)) {
+			printf("Start tx thread\n");
+			err = rte_eal_remote_launch(tx_thread, NULL, lcore_id);
+			if (err) {
+				printf("Failed to launch tx on core %d\n",
+						lcore_id);
+			} else {
+
+				tx_th = 1;
+				continue;
+			}
+		}
+
+		if (strcmp(dev_info.driver_name, "event_opdl")) {
+			if ((sch_th == 0) && (this_lcore != lcore_id)) {
+				printf("Start SW scheduling thread\n");
+				err = rte_eal_remote_launch(scheduler_thread,
+						&dev_id,
+						lcore_id);
+				if (err) {
+					printf("Failed to launch scheduler on core %d\n",
+					       lcore_id);
+				} else {
+
+					sch_th = 1;
+					continue;
+				}
+			}
+		}
+	}
+
+	rte_eal_mp_wait_lcore();
+
+	print_statistics(stdout);
+
+	rte_event_dev_dump(0, stdout);
+
+	rte_event_dev_stop(dev_id);
+
+	rte_event_dev_close(dev_id);
+
+	return 0;
+}