[v2,3/4] app/test-eventdev: add Tx adapter support

Message ID 20180904141223.24216-3-pbhagavatula@caviumnetworks.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers
Series [v2,1/4] app/test-eventdev: fix minor typos |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Pavan Nikhilesh Sept. 4, 2018, 2:12 p.m. UTC
  Convert existing Tx service based pipeline to Tx adapter based APIs and
simplify worker functions.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
 app/test-eventdev/test_pipeline_atq.c    | 269 ++++++++++++-----------
 app/test-eventdev/test_pipeline_common.c | 206 +++++------------
 app/test-eventdev/test_pipeline_common.h |  62 +++---
 app/test-eventdev/test_pipeline_queue.c  | 241 ++++++++++----------
 4 files changed, 367 insertions(+), 411 deletions(-)
  

Comments

Rao, Nikhil Sept. 5, 2018, 6:54 a.m. UTC | #1
On 9/4/2018 7:42 PM, Pavan Nikhilesh wrote:
> Convert existing Tx service based pipeline to Tx adapter based APIs and
> simplify worker functions.
> 
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> ---
>   app/test-eventdev/test_pipeline_atq.c    | 269 ++++++++++++-----------
>   app/test-eventdev/test_pipeline_common.c | 206 +++++------------
>   app/test-eventdev/test_pipeline_common.h |  62 +++---
>   app/test-eventdev/test_pipeline_queue.c  | 241 ++++++++++----------
>   4 files changed, 367 insertions(+), 411 deletions(-)
>    
> diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
> index 832ab8b6e..ab407dbbb 100644
> --- a/app/test-eventdev/test_pipeline_common.c
> +++ b/app/test-eventdev/test_pipeline_common.c
> @@ -5,58 +5,6 @@
>   
> 
> @@ -215,7 +160,6 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
>   {
>   	uint16_t i;
>   	uint8_t nb_queues = 1;
> -	uint8_t mt_state = 0;
>   	struct test_pipeline *t = evt_test_priv(test);
>   	struct rte_eth_rxconf rx_conf;
>   	struct rte_eth_conf port_conf = {
> @@ -238,13 +182,21 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
>   		return -ENODEV;
>   	}
>   
> +	t->internal_port = 0;
>   	RTE_ETH_FOREACH_DEV(i) {
>   		struct rte_eth_dev_info dev_info;
>   		struct rte_eth_conf local_port_conf = port_conf;
> +		uint32_t caps = 0;
> +
> +		rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
> +		if ((caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)) {
> +			t->internal_port = 1;
> +		} else if (t->internal_port == 1) {
> +			evt_err("Eventdev can't use %d port", i);
> +			return -EINVAL;
> +		}
>   
Shouldn't this function also return -EINVAL for the case where 
internal_port = 0 for i = 0, and internal_port = 1 for i = 1 ?

Nikhil
  
Pavan Nikhilesh Sept. 5, 2018, 8:54 a.m. UTC | #2
On Wed, Sep 05, 2018 at 12:24:18PM +0530, Rao, Nikhil wrote:
> On 9/4/2018 7:42 PM, Pavan Nikhilesh wrote:
> > Convert existing Tx service based pipeline to Tx adapter based APIs and
> > simplify worker functions.
> >
> > Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > ---
> >   app/test-eventdev/test_pipeline_atq.c    | 269 ++++++++++++-----------
> >   app/test-eventdev/test_pipeline_common.c | 206 +++++------------
> >   app/test-eventdev/test_pipeline_common.h |  62 +++---
> >   app/test-eventdev/test_pipeline_queue.c  | 241 ++++++++++----------
> >   4 files changed, 367 insertions(+), 411 deletions(-)
> >
> > diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
> > index 832ab8b6e..ab407dbbb 100644
> > --- a/app/test-eventdev/test_pipeline_common.c
> > +++ b/app/test-eventdev/test_pipeline_common.c
> > @@ -5,58 +5,6 @@
> >
> >
> > @@ -215,7 +160,6 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
> >   {
> >       uint16_t i;
> >       uint8_t nb_queues = 1;
> > -     uint8_t mt_state = 0;
> >       struct test_pipeline *t = evt_test_priv(test);
> >       struct rte_eth_rxconf rx_conf;
> >       struct rte_eth_conf port_conf = {
> > @@ -238,13 +182,21 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
> >               return -ENODEV;
> >       }
> >
> > +     t->internal_port = 0;
> >       RTE_ETH_FOREACH_DEV(i) {
> >               struct rte_eth_dev_info dev_info;
> >               struct rte_eth_conf local_port_conf = port_conf;
> > +             uint32_t caps = 0;
> > +
> > +             rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
> > +             if ((caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)) {
> > +                     t->internal_port = 1;
> > +             } else if (t->internal_port == 1) {
> > +                     evt_err("Eventdev can't use %d port", i);
> > +                     return -EINVAL;
> > +             }
> >
> Shouldn't this function also return -EINVAL for the case where
> internal_port = 0 for i = 0, and internal_port = 1 for i = 1 ?

I think it would be better to force all the ports to use the non-internal cap
mode when we detect that one of the port doesn't have internal port capability
rather than exiting. This was the behaviour previously it will leave room to
support both the pipeline models in future.

>
> Nikhil
>

Thanks,
Pavan.
  
Rao, Nikhil Sept. 5, 2018, 9:37 a.m. UTC | #3
On 9/5/2018 2:24 PM, Pavan Nikhilesh wrote:
> On Wed, Sep 05, 2018 at 12:24:18PM +0530, Rao, Nikhil wrote:
>> On 9/4/2018 7:42 PM, Pavan Nikhilesh wrote:
>>> Convert existing Tx service based pipeline to Tx adapter based APIs and
>>> simplify worker functions.
>>>
>>> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
>>> ---
>>>    app/test-eventdev/test_pipeline_atq.c    | 269 ++++++++++++-----------
>>>    app/test-eventdev/test_pipeline_common.c | 206 +++++------------
>>>    app/test-eventdev/test_pipeline_common.h |  62 +++---
>>>    app/test-eventdev/test_pipeline_queue.c  | 241 ++++++++++----------
>>>    4 files changed, 367 insertions(+), 411 deletions(-)
>>>
>>> diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
>>> index 832ab8b6e..ab407dbbb 100644
>>> --- a/app/test-eventdev/test_pipeline_common.c
>>> +++ b/app/test-eventdev/test_pipeline_common.c
>>> @@ -5,58 +5,6 @@
>>>
>>>
>>> @@ -215,7 +160,6 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
>>>    {
>>>        uint16_t i;
>>>        uint8_t nb_queues = 1;
>>> -     uint8_t mt_state = 0;
>>>        struct test_pipeline *t = evt_test_priv(test);
>>>        struct rte_eth_rxconf rx_conf;
>>>        struct rte_eth_conf port_conf = {
>>> @@ -238,13 +182,21 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
>>>                return -ENODEV;
>>>        }
>>>
>>> +     t->internal_port = 0;
>>>        RTE_ETH_FOREACH_DEV(i) {
>>>                struct rte_eth_dev_info dev_info;
>>>                struct rte_eth_conf local_port_conf = port_conf;
>>> +             uint32_t caps = 0;
>>> +
>>> +             rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
>>> +             if ((caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)) {
>>> +                     t->internal_port = 1;
>>> +             } else if (t->internal_port == 1) {
>>> +                     evt_err("Eventdev can't use %d port", i);
>>> +                     return -EINVAL;
>>> +             }
>>>
>> Shouldn't this function also return -EINVAL for the case where
>> internal_port = 0 for i = 0, and internal_port = 1 for i = 1 ?
> 
> I think it would be better to force all the ports to use the non-internal cap
> mode when we detect that one of the port doesn't have internal port capability
> rather than exiting. This was the behaviour previously it will leave room to
> support both the pipeline models in future.

Agreed.

Nikhil
  

Patch

diff --git a/app/test-eventdev/test_pipeline_atq.c b/app/test-eventdev/test_pipeline_atq.c
index f0b2f9015..01af298f3 100644
--- a/app/test-eventdev/test_pipeline_atq.c
+++ b/app/test-eventdev/test_pipeline_atq.c
@@ -15,7 +15,7 @@  pipeline_atq_nb_event_queues(struct evt_options *opt)
 	return rte_eth_dev_count_avail();
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_single_stage_tx(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
@@ -28,23 +28,18 @@  pipeline_atq_worker_single_stage_tx(void *arg)
 			continue;
 		}
 
-		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			pipeline_tx_pkt(ev.mbuf);
-			w->processed_pkts++;
-			continue;
-		}
-		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-		pipeline_event_enqueue(dev, port, &ev);
+		pipeline_event_tx(dev, port, &ev);
+		w->processed_pkts++;
 	}
 
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_single_stage_fwd(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
@@ -54,16 +49,16 @@  pipeline_atq_worker_single_stage_fwd(void *arg)
 			continue;
 		}
 
-		w->processed_pkts++;
-		ev.queue_id = tx_queue;
+		ev.queue_id = tx_queue[ev.mbuf->port];
 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
 		pipeline_event_enqueue(dev, port, &ev);
+		w->processed_pkts++;
 	}
 
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_single_stage_burst_tx(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
@@ -79,27 +74,21 @@  pipeline_atq_worker_single_stage_burst_tx(void *arg)
 
 		for (i = 0; i < nb_rx; i++) {
 			rte_prefetch0(ev[i + 1].mbuf);
-			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-				pipeline_tx_pkt(ev[i].mbuf);
-				ev[i].op = RTE_EVENT_OP_RELEASE;
-				w->processed_pkts++;
-			} else
-				pipeline_fwd_event(&ev[i],
-						RTE_SCHED_TYPE_ATOMIC);
+			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 		}
 
-		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+		pipeline_event_tx_burst(dev, port, ev, nb_rx);
+		w->processed_pkts += nb_rx;
 	}
 
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
@@ -112,23 +101,22 @@  pipeline_atq_worker_single_stage_burst_fwd(void *arg)
 
 		for (i = 0; i < nb_rx; i++) {
 			rte_prefetch0(ev[i + 1].mbuf);
-			ev[i].queue_id = tx_queue;
+			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
+			ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
-			w->processed_pkts++;
 		}
 
 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+		w->processed_pkts += nb_rx;
 	}
 
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_multi_stage_tx(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages;
-
 
 	while (t->done == false) {
 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
@@ -141,29 +129,24 @@  pipeline_atq_worker_multi_stage_tx(void *arg)
 		cq_id = ev.sub_event_type % nb_stages;
 
 		if (cq_id == last_queue) {
-			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-				pipeline_tx_pkt(ev.mbuf);
-				w->processed_pkts++;
-				continue;
-			}
-			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-		} else {
-			ev.sub_event_type++;
-			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+			pipeline_event_tx(dev, port, &ev);
+			w->processed_pkts++;
+			continue;
 		}
 
+		ev.sub_event_type++;
+		pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 		pipeline_event_enqueue(dev, port, &ev);
 	}
+
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_multi_stage_fwd(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
@@ -176,9 +159,9 @@  pipeline_atq_worker_multi_stage_fwd(void *arg)
 		cq_id = ev.sub_event_type % nb_stages;
 
 		if (cq_id == last_queue) {
-			w->processed_pkts++;
-			ev.queue_id = tx_queue;
+			ev.queue_id = tx_queue[ev.mbuf->port];
 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			w->processed_pkts++;
 		} else {
 			ev.sub_event_type++;
 			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
@@ -186,14 +169,14 @@  pipeline_atq_worker_multi_stage_fwd(void *arg)
 
 		pipeline_event_enqueue(dev, port, &ev);
 	}
+
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages;
 
 	while (t->done == false) {
 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
@@ -209,34 +192,27 @@  pipeline_atq_worker_multi_stage_burst_tx(void *arg)
 			cq_id = ev[i].sub_event_type % nb_stages;
 
 			if (cq_id == last_queue) {
-				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-					pipeline_tx_pkt(ev[i].mbuf);
-					ev[i].op = RTE_EVENT_OP_RELEASE;
-					w->processed_pkts++;
-					continue;
-				}
-
-				pipeline_fwd_event(&ev[i],
-						RTE_SCHED_TYPE_ATOMIC);
-			} else {
-				ev[i].sub_event_type++;
-				pipeline_fwd_event(&ev[i],
-						sched_type_list[cq_id]);
+				pipeline_event_tx(dev, port, &ev[i]);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts++;
+				continue;
 			}
+
+			ev[i].sub_event_type++;
+			pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
 		}
 
 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 	}
+
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
@@ -253,7 +229,7 @@  pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 
 			if (cq_id == last_queue) {
 				w->processed_pkts++;
-				ev[i].queue_id = tx_queue;
+				ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 				pipeline_fwd_event(&ev[i],
 						RTE_SCHED_TYPE_ATOMIC);
 			} else {
@@ -265,6 +241,7 @@  pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 
 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 	}
+
 	return 0;
 }
 
@@ -274,39 +251,36 @@  worker_wrapper(void *arg)
 	struct worker_data *w  = arg;
 	struct evt_options *opt = w->t->opt;
 	const bool burst = evt_has_burst_mode(w->dev_id);
-	const bool mt_safe = !w->t->mt_unsafe;
+	const bool internal_port = w->t->internal_port;
 	const uint8_t nb_stages = opt->nb_stages;
 	RTE_SET_USED(opt);
 
 	if (nb_stages == 1) {
-		if (!burst && mt_safe)
+		if (!burst && internal_port)
 			return pipeline_atq_worker_single_stage_tx(arg);
-		else if (!burst && !mt_safe)
+		else if (!burst && !internal_port)
 			return pipeline_atq_worker_single_stage_fwd(arg);
-		else if (burst && mt_safe)
+		else if (burst && internal_port)
 			return pipeline_atq_worker_single_stage_burst_tx(arg);
-		else if (burst && !mt_safe)
+		else if (burst && !internal_port)
 			return pipeline_atq_worker_single_stage_burst_fwd(arg);
 	} else {
-		if (!burst && mt_safe)
+		if (!burst && internal_port)
 			return pipeline_atq_worker_multi_stage_tx(arg);
-		else if (!burst && !mt_safe)
+		else if (!burst && !internal_port)
 			return pipeline_atq_worker_multi_stage_fwd(arg);
-		if (burst && mt_safe)
+		if (burst && internal_port)
 			return pipeline_atq_worker_multi_stage_burst_tx(arg);
-		else if (burst && !mt_safe)
+		else if (burst && !internal_port)
 			return pipeline_atq_worker_multi_stage_burst_fwd(arg);
 	}
+
 	rte_panic("invalid worker\n");
 }
 
 static int
 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
 {
-	struct test_pipeline *t = evt_test_priv(test);
-
-	if (t->mt_unsafe)
-		rte_service_component_runstate_set(t->tx_service.service_id, 1);
 	return pipeline_launch_lcores(test, opt, worker_wrapper);
 }
 
@@ -317,34 +291,36 @@  pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	int nb_ports;
 	int nb_queues;
 	uint8_t queue;
-	struct rte_event_dev_info info;
-	struct test_pipeline *t = evt_test_priv(test);
-	uint8_t tx_evqueue_id = 0;
+	uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS] = {0};
 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
 	uint8_t nb_worker_queues = 0;
+	uint8_t tx_evport_id = 0;
+	uint16_t prod = 0;
+	struct rte_event_dev_info info;
+	struct test_pipeline *t = evt_test_priv(test);
 
 	nb_ports = evt_nr_active_lcores(opt->wlcores);
 	nb_queues = rte_eth_dev_count_avail();
 
-	/* One extra port and queueu for Tx service */
-	if (t->mt_unsafe) {
-		tx_evqueue_id = nb_queues;
-		nb_ports++;
-		nb_queues++;
+	/* One queue for Tx adapter per port */
+	if (!t->internal_port) {
+		RTE_ETH_FOREACH_DEV(prod) {
+			tx_evqueue_id[prod] = nb_queues;
+			nb_queues++;
+		}
 	}
 
-
 	rte_event_dev_info_get(opt->dev_id, &info);
 
 	const struct rte_event_dev_config config = {
-			.nb_event_queues = nb_queues,
-			.nb_event_ports = nb_ports,
-			.nb_events_limit  = info.max_num_events,
-			.nb_event_queue_flows = opt->nb_flows,
-			.nb_event_port_dequeue_depth =
-				info.max_event_port_dequeue_depth,
-			.nb_event_port_enqueue_depth =
-				info.max_event_port_enqueue_depth,
+		.nb_event_queues = nb_queues,
+		.nb_event_ports = nb_ports,
+		.nb_events_limit  = info.max_num_events,
+		.nb_event_queue_flows = opt->nb_flows,
+		.nb_event_port_dequeue_depth =
+			info.max_event_port_dequeue_depth,
+		.nb_event_port_enqueue_depth =
+			info.max_event_port_enqueue_depth,
 	};
 	ret = rte_event_dev_configure(opt->dev_id, &config);
 	if (ret) {
@@ -353,21 +329,23 @@  pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	}
 
 	struct rte_event_queue_conf q_conf = {
-			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
-			.nb_atomic_flows = opt->nb_flows,
-			.nb_atomic_order_sequences = opt->nb_flows,
+		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+		.nb_atomic_flows = opt->nb_flows,
+		.nb_atomic_order_sequences = opt->nb_flows,
 	};
 	/* queue configurations */
 	for (queue = 0; queue < nb_queues; queue++) {
 		q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
 
-		if (t->mt_unsafe) {
-			if (queue == tx_evqueue_id) {
-				q_conf.event_queue_cfg =
-					RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
-			} else {
-				queue_arr[nb_worker_queues] = queue;
-				nb_worker_queues++;
+		if (!t->internal_port) {
+			RTE_ETH_FOREACH_DEV(prod) {
+				if (queue == tx_evqueue_id[prod]) {
+					q_conf.event_queue_cfg =
+						RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
+				} else {
+					queue_arr[nb_worker_queues] = queue;
+					nb_worker_queues++;
+				}
 			}
 		}
 
@@ -383,20 +361,15 @@  pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 
 	/* port configuration */
 	const struct rte_event_port_conf p_conf = {
-			.dequeue_depth = opt->wkr_deq_dep,
-			.enqueue_depth = info.max_event_port_dequeue_depth,
-			.new_event_threshold = info.max_num_events,
+		.dequeue_depth = opt->wkr_deq_dep,
+		.enqueue_depth = info.max_event_port_dequeue_depth,
+		.new_event_threshold = info.max_num_events,
 	};
 
-	if (t->mt_unsafe) {
+	if (!t->internal_port)
 		ret = pipeline_event_port_setup(test, opt, queue_arr,
 				nb_worker_queues, p_conf);
-		if (ret)
-			return ret;
-
-		ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
-				nb_ports - 1, p_conf);
-	} else
+	else
 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
 				p_conf);
 
@@ -408,30 +381,32 @@  pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	 *
 	 * eth_dev_count = 2, nb_stages = 2, atq mode
 	 *
-	 * Multi thread safe :
+	 * eth0, eth1 have Internal port capability :
 	 *	queues = 2
 	 *	stride = 1
 	 *
 	 *	event queue pipelines:
-	 *	eth0 -> q0 ->tx
-	 *	eth1 -> q1 ->tx
+	 *	eth0 -> q0 ->Tx
+	 *	eth1 -> q1 ->Tx
 	 *
 	 *	q0, q1 are configured as ATQ so, all the different stages can
 	 *	be enqueued on the same queue.
 	 *
-	 * Multi thread unsafe :
-	 *	queues = 3
+	 * eth0, eth1 use Tx adapters service core :
+	 *	queues = 4
 	 *	stride = 1
 	 *
 	 *	event queue pipelines:
-	 *	eth0 -> q0
-	 *		  } (q3->tx) Tx service
-	 *	eth1 -> q1
+	 *	eth0 -> q0  -> q2 -> Tx
+	 *	eth1 -> q1  -> q3 -> Tx
 	 *
-	 *	q0,q1 are configured as stated above.
-	 *	q3 configured as SINGLE_LINK|ATOMIC.
+	 *	q0, q1 are configured as stated above.
+	 *	q2, q3 configured as SINGLE_LINK.
 	 */
 	ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
+	if (ret)
+		return ret;
+	ret = pipeline_event_tx_adapter_setup(opt, p_conf);
 	if (ret)
 		return ret;
 
@@ -445,12 +420,58 @@  pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 		}
 	}
 
+	/* Connect the tx_evqueue_id to the Tx adapter port */
+	if (!t->internal_port) {
+		RTE_ETH_FOREACH_DEV(prod) {
+			ret = rte_event_eth_tx_adapter_event_port_get(prod,
+					&tx_evport_id);
+			if (ret) {
+				evt_err("Unable to get Tx adapter[%d]", prod);
+				return ret;
+			}
+
+			if (rte_event_port_link(opt->dev_id, tx_evport_id,
+						&tx_evqueue_id[prod],
+						NULL, 1) != 1) {
+				evt_err("Unable to link Tx adptr[%d] evprt[%d]",
+						prod, tx_evport_id);
+				return ret;
+			}
+		}
+	}
+
+	RTE_ETH_FOREACH_DEV(prod) {
+		ret = rte_eth_dev_start(prod);
+		if (ret) {
+			evt_err("Ethernet dev [%d] failed to start."
+					" Using synthetic producer", prod);
+			return ret;
+		}
+	}
+
 	ret = rte_event_dev_start(opt->dev_id);
 	if (ret) {
 		evt_err("failed to start eventdev %d", opt->dev_id);
 		return ret;
 	}
 
+	RTE_ETH_FOREACH_DEV(prod) {
+		ret = rte_event_eth_rx_adapter_start(prod);
+		if (ret) {
+			evt_err("Rx adapter[%d] start failed", prod);
+			return ret;
+		}
+
+		ret = rte_event_eth_tx_adapter_start(prod);
+		if (ret) {
+			evt_err("Tx adapter[%d] start failed", prod);
+			return ret;
+		}
+	}
+
+	memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
+			RTE_MAX_ETHPORTS);
+
 	return 0;
 }
 
diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
index 832ab8b6e..ab407dbbb 100644
--- a/app/test-eventdev/test_pipeline_common.c
+++ b/app/test-eventdev/test_pipeline_common.c
@@ -5,58 +5,6 @@ 
 
 #include "test_pipeline_common.h"
 
-static int32_t
-pipeline_event_tx_burst_service_func(void *args)
-{
-
-	int i;
-	struct tx_service_data *tx = args;
-	const uint8_t dev = tx->dev_id;
-	const uint8_t port = tx->port_id;
-	struct rte_event ev[BURST_SIZE + 1];
-
-	uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
-
-	if (!nb_rx) {
-		for (i = 0; i < tx->nb_ethports; i++)
-			rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]);
-		return 0;
-	}
-
-	for (i = 0; i < nb_rx; i++) {
-		struct rte_mbuf *m = ev[i].mbuf;
-		rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m);
-	}
-	tx->processed_pkts += nb_rx;
-
-	return 0;
-}
-
-static int32_t
-pipeline_event_tx_service_func(void *args)
-{
-
-	int i;
-	struct tx_service_data *tx = args;
-	const uint8_t dev = tx->dev_id;
-	const uint8_t port = tx->port_id;
-	struct rte_event ev;
-
-	uint16_t nb_rx = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
-
-	if (!nb_rx) {
-		for (i = 0; i < tx->nb_ethports; i++)
-			rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]);
-		return 0;
-	}
-
-	struct rte_mbuf *m = ev.mbuf;
-	rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m);
-	tx->processed_pkts++;
-
-	return 0;
-}
-
 int
 pipeline_test_result(struct evt_test *test, struct evt_options *opt)
 {
@@ -97,11 +45,8 @@  processed_pkts(struct test_pipeline *t)
 	uint64_t total = 0;
 
 	rte_smp_rmb();
-	if (t->mt_unsafe)
-		total = t->tx_service.processed_pkts;
-	else
-		for (i = 0; i < t->nb_workers; i++)
-			total += t->worker[i].processed_pkts;
+	for (i = 0; i < t->nb_workers; i++)
+		total += t->worker[i].processed_pkts;
 
 	return total;
 }
@@ -215,7 +160,6 @@  pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
 {
 	uint16_t i;
 	uint8_t nb_queues = 1;
-	uint8_t mt_state = 0;
 	struct test_pipeline *t = evt_test_priv(test);
 	struct rte_eth_rxconf rx_conf;
 	struct rte_eth_conf port_conf = {
@@ -238,13 +182,21 @@  pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
 		return -ENODEV;
 	}
 
+	t->internal_port = 0;
 	RTE_ETH_FOREACH_DEV(i) {
 		struct rte_eth_dev_info dev_info;
 		struct rte_eth_conf local_port_conf = port_conf;
+		uint32_t caps = 0;
+
+		rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
+		if ((caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)) {
+			t->internal_port = 1;
+		} else if (t->internal_port == 1) {
+			evt_err("Eventdev can't use %d port", i);
+			return -EINVAL;
+		}
 
 		rte_eth_dev_info_get(i, &dev_info);
-		mt_state = !(dev_info.tx_offload_capa &
-				DEV_TX_OFFLOAD_MT_LOCKFREE);
 		rx_conf = dev_info.default_rxconf;
 		rx_conf.offloads = port_conf.rxmode.offloads;
 
@@ -279,11 +231,6 @@  pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
 			return -EINVAL;
 		}
 
-		t->mt_unsafe |= mt_state;
-		t->tx_service.tx_buf[i] =
-			rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(BURST_SIZE), 0);
-		if (t->tx_service.tx_buf[i] == NULL)
-			rte_panic("Unable to allocate Tx buffer memory.");
 		rte_eth_promiscuous_enable(i);
 	}
 
@@ -295,7 +242,6 @@  pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
 		uint8_t *queue_arr, uint8_t nb_queues,
 		const struct rte_event_port_conf p_conf)
 {
-	int i;
 	int ret;
 	uint8_t port;
 	struct test_pipeline *t = evt_test_priv(test);
@@ -316,23 +262,15 @@  pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
 			return ret;
 		}
 
-		if (queue_arr == NULL) {
-			if (rte_event_port_link(opt->dev_id, port, NULL, NULL,
-						0) != nb_queues)
-				goto link_fail;
-		} else {
-			for (i = 0; i < nb_queues; i++) {
-				if (rte_event_port_link(opt->dev_id, port,
-						&queue_arr[i], NULL, 1) != 1)
-					goto link_fail;
-			}
-		}
+		if (rte_event_port_link(opt->dev_id, port, queue_arr, NULL,
+					0) != nb_queues)
+			goto link_fail;
 	}
 
 	return 0;
 
 link_fail:
-	evt_err("failed to link all queues to port %d", port);
+	evt_err("failed to link queues to port %d", port);
 	return -EINVAL;
 }
 
@@ -385,79 +323,64 @@  pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
 			}
 		}
 
-		ret = rte_eth_dev_start(prod);
-		if (ret) {
-			evt_err("Ethernet dev [%d] failed to start."
-					" Using synthetic producer", prod);
-			return ret;
-		}
-
-		ret = rte_event_eth_rx_adapter_start(prod);
-		if (ret) {
-			evt_err("Rx adapter[%d] start failed", prod);
-			return ret;
-		}
-		evt_info("Port[%d] using Rx adapter[%d] started", prod, prod);
+		evt_info("Port[%d] using Rx adapter[%d] configured", prod,
+				prod);
 	}
 
 	return ret;
 }
 
 int
-pipeline_event_tx_service_setup(struct evt_test *test, struct evt_options *opt,
-		uint8_t tx_queue_id, uint8_t tx_port_id,
-		const struct rte_event_port_conf p_conf)
+pipeline_event_tx_adapter_setup(struct evt_options *opt,
+		struct rte_event_port_conf port_conf)
 {
-	int ret;
-	struct rte_service_spec serv;
-	struct test_pipeline *t = evt_test_priv(test);
-	struct tx_service_data *tx = &t->tx_service;
+	int ret = 0;
+	uint16_t consm = 0;
 
-	ret = rte_event_port_setup(opt->dev_id, tx_port_id, &p_conf);
-	if (ret) {
-		evt_err("failed to setup port %d", tx_port_id);
-		return ret;
-	}
+	RTE_ETH_FOREACH_DEV(consm) {
+		uint32_t cap;
 
-	if (rte_event_port_link(opt->dev_id, tx_port_id, &tx_queue_id,
-				NULL, 1) != 1) {
-		evt_err("failed to link queues to port %d", tx_port_id);
-		return -EINVAL;
-	}
+		ret = rte_event_eth_tx_adapter_caps_get(opt->dev_id,
+				consm, &cap);
+		if (ret) {
+			evt_err("failed to get event tx adapter[%d] caps",
+					consm);
+			return ret;
+		}
 
-	tx->dev_id = opt->dev_id;
-	tx->queue_id = tx_queue_id;
-	tx->port_id = tx_port_id;
-	tx->nb_ethports = rte_eth_dev_count_avail();
-	tx->t = t;
-
-	/* Register Tx service */
-	memset(&serv, 0, sizeof(struct rte_service_spec));
-	snprintf(serv.name, sizeof(serv.name), "Tx_service");
-
-	if (evt_has_burst_mode(opt->dev_id))
-		serv.callback = pipeline_event_tx_burst_service_func;
-	else
-		serv.callback = pipeline_event_tx_service_func;
-
-	serv.callback_userdata = (void *)tx;
-	ret = rte_service_component_register(&serv, &tx->service_id);
-	if (ret) {
-		evt_err("failed to register Tx service");
-		return ret;
-	}
+		ret = rte_event_eth_tx_adapter_create(consm, opt->dev_id,
+				&port_conf);
+		if (ret) {
+			evt_err("failed to create tx adapter[%d]", consm);
+			return ret;
+		}
 
-	ret = evt_service_setup(tx->service_id);
-	if (ret) {
-		evt_err("Failed to setup service core for Tx service\n");
-		return ret;
-	}
+		ret = rte_event_eth_tx_adapter_queue_add(consm, consm, -1);
+		if (ret) {
+			evt_err("failed to add tx queues to adapter[%d]",
+					consm);
+			return ret;
+		}
 
-	rte_service_runstate_set(tx->service_id, 1);
+		if (!(cap & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)) {
+			uint32_t service_id;
 
-	return 0;
-}
+			rte_event_eth_tx_adapter_service_id_get(consm,
+					&service_id);
+			ret = evt_service_setup(service_id);
+			if (ret) {
+				evt_err("Failed to setup service core"
+						" for Tx adapter\n");
+				return ret;
+			}
+		}
 
+		evt_info("Port[%d] using Tx adapter[%d] Configured", consm,
+				consm);
+	}
+
+	return ret;
+}
 
 void
 pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt)
@@ -465,16 +388,10 @@  pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt)
 	uint16_t i;
 	RTE_SET_USED(test);
 	RTE_SET_USED(opt);
-	struct test_pipeline *t = evt_test_priv(test);
-
-	if (t->mt_unsafe) {
-		rte_service_component_runstate_set(t->tx_service.service_id, 0);
-		rte_service_runstate_set(t->tx_service.service_id, 0);
-		rte_service_component_unregister(t->tx_service.service_id);
-	}
 
 	RTE_ETH_FOREACH_DEV(i) {
 		rte_event_eth_rx_adapter_stop(i);
+		rte_event_eth_tx_adapter_stop(i);
 		rte_eth_dev_stop(i);
 	}
 }
@@ -484,7 +401,6 @@  pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
 {
 	RTE_SET_USED(test);
 
-	rte_event_dev_stop(opt->dev_id);
 	rte_event_dev_close(opt->dev_id);
 }
 
diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
index 9cd6b905b..0440b9e29 100644
--- a/app/test-eventdev/test_pipeline_common.h
+++ b/app/test-eventdev/test_pipeline_common.h
@@ -14,6 +14,7 @@ 
 #include <rte_ethdev.h>
 #include <rte_eventdev.h>
 #include <rte_event_eth_rx_adapter.h>
+#include <rte_event_eth_tx_adapter.h>
 #include <rte_lcore.h>
 #include <rte_malloc.h>
 #include <rte_mempool.h>
@@ -35,30 +36,19 @@  struct worker_data {
 	struct test_pipeline *t;
 } __rte_cache_aligned;
 
-struct tx_service_data {
-	uint8_t dev_id;
-	uint8_t queue_id;
-	uint8_t port_id;
-	uint32_t service_id;
-	uint64_t processed_pkts;
-	uint16_t nb_ethports;
-	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
-	struct test_pipeline *t;
-} __rte_cache_aligned;
-
 struct test_pipeline {
 	/* Don't change the offset of "done". Signal handler use this memory
 	 * to terminate all lcores work.
 	 */
 	int done;
 	uint8_t nb_workers;
-	uint8_t mt_unsafe;
+	uint8_t internal_port;
+	uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
 	enum evt_test_result result;
 	uint32_t nb_flows;
 	uint64_t outstand_pkts;
 	struct rte_mempool *pool;
 	struct worker_data worker[EVT_MAX_PORTS];
-	struct tx_service_data tx_service;
 	struct evt_options *opt;
 	uint8_t sched_type_list[EVT_MAX_STAGES] __rte_cache_aligned;
 } __rte_cache_aligned;
@@ -70,7 +60,7 @@  struct test_pipeline {
 	struct test_pipeline *t = w->t;   \
 	const uint8_t dev = w->dev_id;    \
 	const uint8_t port = w->port_id;  \
-	struct rte_event ev
+	struct rte_event ev __rte_cache_aligned
 
 #define PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT \
 	int i;                                  \
@@ -78,7 +68,7 @@  struct test_pipeline {
 	struct test_pipeline *t = w->t;         \
 	const uint8_t dev = w->dev_id;          \
 	const uint8_t port = w->port_id;        \
-	struct rte_event ev[BURST_SIZE + 1]
+	struct rte_event ev[BURST_SIZE + 1] __rte_cache_aligned
 
 #define PIPELINE_WORKER_MULTI_STAGE_INIT                         \
 	struct worker_data *w  = arg;                            \
@@ -88,10 +78,11 @@  struct test_pipeline {
 	const uint8_t port = w->port_id;                         \
 	const uint8_t last_queue = t->opt->nb_stages - 1;        \
 	uint8_t *const sched_type_list = &t->sched_type_list[0]; \
-	struct rte_event ev
+	const uint8_t nb_stages = t->opt->nb_stages + 1;	 \
+	struct rte_event ev __rte_cache_aligned
 
 #define PIPELINE_WORKER_MULTI_STAGE_BURST_INIT                   \
-	int i;                                  \
+	int i;                                                   \
 	struct worker_data *w  = arg;                            \
 	struct test_pipeline *t = w->t;                          \
 	uint8_t cq_id;                                           \
@@ -99,7 +90,8 @@  struct test_pipeline {
 	const uint8_t port = w->port_id;                         \
 	const uint8_t last_queue = t->opt->nb_stages - 1;        \
 	uint8_t *const sched_type_list = &t->sched_type_list[0]; \
-	struct rte_event ev[BURST_SIZE + 1]
+	const uint8_t nb_stages = t->opt->nb_stages + 1;	 \
+	struct rte_event ev[BURST_SIZE + 1] __rte_cache_aligned
 
 static __rte_always_inline void
 pipeline_fwd_event(struct rte_event *ev, uint8_t sched)
@@ -109,6 +101,28 @@  pipeline_fwd_event(struct rte_event *ev, uint8_t sched)
 	ev->sched_type = sched;
 }
 
+static __rte_always_inline void
+pipeline_event_tx(const uint8_t dev, const uint8_t port,
+		struct rte_event * const ev)
+{
+	rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
+	while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1))
+		rte_pause();
+}
+
+static __rte_always_inline void
+pipeline_event_tx_burst(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev, const uint16_t nb_rx)
+{
+	uint16_t enq;
+
+	enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, nb_rx);
+	while (enq < nb_rx) {
+		enq += rte_event_eth_tx_adapter_enqueue(dev, port,
+				ev + enq, nb_rx - enq);
+	}
+}
+
 static __rte_always_inline void
 pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
 		struct rte_event *ev)
@@ -130,13 +144,6 @@  pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
 	}
 }
 
-static __rte_always_inline void
-pipeline_tx_pkt(struct rte_mbuf *mbuf)
-{
-	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
-		rte_pause();
-}
-
 static inline int
 pipeline_nb_event_ports(struct evt_options *opt)
 {
@@ -149,9 +156,8 @@  int pipeline_test_setup(struct evt_test *test, struct evt_options *opt);
 int pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt);
 int pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
 		struct rte_event_port_conf prod_conf);
-int pipeline_event_tx_service_setup(struct evt_test *test,
-		struct evt_options *opt, uint8_t tx_queue_id,
-		uint8_t tx_port_id, const struct rte_event_port_conf p_conf);
+int pipeline_event_tx_adapter_setup(struct evt_options *opt,
+		struct rte_event_port_conf prod_conf);
 int pipeline_mempool_setup(struct evt_test *test, struct evt_options *opt);
 int pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
 		uint8_t *queue_arr, uint8_t nb_queues,
diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
index 2e0d93d99..1156c2555 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -15,7 +15,7 @@  pipeline_queue_nb_event_queues(struct evt_options *opt)
 	return (eth_count * opt->nb_stages) + eth_count;
 }
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_single_stage_tx(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
@@ -29,7 +29,7 @@  pipeline_queue_worker_single_stage_tx(void *arg)
 		}
 
 		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-			pipeline_tx_pkt(ev.mbuf);
+			pipeline_event_tx(dev, port, &ev);
 			w->processed_pkts++;
 		} else {
 			ev.queue_id++;
@@ -41,11 +41,11 @@  pipeline_queue_worker_single_stage_tx(void *arg)
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_single_stage_fwd(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_INIT;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
@@ -55,7 +55,8 @@  pipeline_queue_worker_single_stage_fwd(void *arg)
 			continue;
 		}
 
-		ev.queue_id = tx_queue;
+		ev.queue_id = tx_queue[ev.mbuf->port];
+		rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
 		pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
 		pipeline_event_enqueue(dev, port, &ev);
 		w->processed_pkts++;
@@ -64,7 +65,7 @@  pipeline_queue_worker_single_stage_fwd(void *arg)
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_single_stage_burst_tx(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
@@ -81,8 +82,7 @@  pipeline_queue_worker_single_stage_burst_tx(void *arg)
 		for (i = 0; i < nb_rx; i++) {
 			rte_prefetch0(ev[i + 1].mbuf);
 			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-				pipeline_tx_pkt(ev[i].mbuf);
+				pipeline_event_tx(dev, port, &ev[i]);
 				ev[i].op = RTE_EVENT_OP_RELEASE;
 				w->processed_pkts++;
 			} else {
@@ -98,11 +98,11 @@  pipeline_queue_worker_single_stage_burst_tx(void *arg)
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_single_stage_burst_fwd(void *arg)
 {
 	PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
@@ -115,23 +115,24 @@  pipeline_queue_worker_single_stage_burst_fwd(void *arg)
 
 		for (i = 0; i < nb_rx; i++) {
 			rte_prefetch0(ev[i + 1].mbuf);
-			ev[i].queue_id = tx_queue;
+			ev[i].queue_id = tx_queue[ev[i].mbuf->port];
+			rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 			pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
-			w->processed_pkts++;
 		}
 
 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+		w->processed_pkts += nb_rx;
 	}
 
 	return 0;
 }
 
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_multi_stage_tx(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
@@ -143,31 +144,27 @@  pipeline_queue_worker_multi_stage_tx(void *arg)
 
 		cq_id = ev.queue_id % nb_stages;
 
-		if (cq_id >= last_queue) {
-			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-				pipeline_tx_pkt(ev.mbuf);
-				w->processed_pkts++;
-				continue;
-			}
-			ev.queue_id += (cq_id == last_queue) ? 1 : 0;
-			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-		} else {
-			ev.queue_id++;
-			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+		if (ev.queue_id == tx_queue[ev.mbuf->port]) {
+			pipeline_event_tx(dev, port, &ev);
+			w->processed_pkts++;
+			continue;
 		}
 
+		ev.queue_id++;
+		pipeline_fwd_event(&ev, cq_id != last_queue ?
+				sched_type_list[cq_id] :
+				RTE_SCHED_TYPE_ATOMIC);
 		pipeline_event_enqueue(dev, port, &ev);
 	}
+
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_multi_stage_fwd(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages + 1;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
@@ -180,7 +177,8 @@  pipeline_queue_worker_multi_stage_fwd(void *arg)
 		cq_id = ev.queue_id % nb_stages;
 
 		if (cq_id == last_queue) {
-			ev.queue_id = tx_queue;
+			ev.queue_id = tx_queue[ev.mbuf->port];
+			rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
 			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
 			w->processed_pkts++;
 		} else {
@@ -190,14 +188,15 @@  pipeline_queue_worker_multi_stage_fwd(void *arg)
 
 		pipeline_event_enqueue(dev, port, &ev);
 	}
+
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_multi_stage_burst_tx(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
@@ -212,37 +211,30 @@  pipeline_queue_worker_multi_stage_burst_tx(void *arg)
 			rte_prefetch0(ev[i + 1].mbuf);
 			cq_id = ev[i].queue_id % nb_stages;
 
-			if (cq_id >= last_queue) {
-				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-					pipeline_tx_pkt(ev[i].mbuf);
-					ev[i].op = RTE_EVENT_OP_RELEASE;
-					w->processed_pkts++;
-					continue;
-				}
-
-				ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
-				pipeline_fwd_event(&ev[i],
-						RTE_SCHED_TYPE_ATOMIC);
-			} else {
-				ev[i].queue_id++;
-				pipeline_fwd_event(&ev[i],
-						sched_type_list[cq_id]);
+			if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
+				pipeline_event_tx(dev, port, &ev[i]);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts++;
+				continue;
 			}
 
+			ev[i].queue_id++;
+			pipeline_fwd_event(&ev[i], cq_id != last_queue ?
+					sched_type_list[cq_id] :
+					RTE_SCHED_TYPE_ATOMIC);
 		}
 
 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 	}
+
 	return 0;
 }
 
-static int
+static __rte_noinline int
 pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
 {
 	PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
-	const uint8_t nb_stages = t->opt->nb_stages + 1;
-	const uint8_t tx_queue = t->tx_service.queue_id;
+	const uint8_t *tx_queue = t->tx_evqueue_id;
 
 	while (t->done == false) {
 		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
@@ -258,7 +250,8 @@  pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
 			cq_id = ev[i].queue_id % nb_stages;
 
 			if (cq_id == last_queue) {
-				ev[i].queue_id = tx_queue;
+				ev[i].queue_id = tx_queue[ev[i].mbuf->port];
+				rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 				pipeline_fwd_event(&ev[i],
 						RTE_SCHED_TYPE_ATOMIC);
 				w->processed_pkts++;
@@ -271,6 +264,7 @@  pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
 
 		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 	}
+
 	return 0;
 }
 
@@ -280,28 +274,28 @@  worker_wrapper(void *arg)
 	struct worker_data *w  = arg;
 	struct evt_options *opt = w->t->opt;
 	const bool burst = evt_has_burst_mode(w->dev_id);
-	const bool mt_safe = !w->t->mt_unsafe;
+	const bool internal_port = w->t->internal_port;
 	const uint8_t nb_stages = opt->nb_stages;
 	RTE_SET_USED(opt);
 
 	if (nb_stages == 1) {
-		if (!burst && mt_safe)
+		if (!burst && internal_port)
 			return pipeline_queue_worker_single_stage_tx(arg);
-		else if (!burst && !mt_safe)
+		else if (!burst && !internal_port)
 			return pipeline_queue_worker_single_stage_fwd(arg);
-		else if (burst && mt_safe)
+		else if (burst && internal_port)
 			return pipeline_queue_worker_single_stage_burst_tx(arg);
-		else if (burst && !mt_safe)
+		else if (burst && !internal_port)
 			return pipeline_queue_worker_single_stage_burst_fwd(
 					arg);
 	} else {
-		if (!burst && mt_safe)
+		if (!burst && internal_port)
 			return pipeline_queue_worker_multi_stage_tx(arg);
-		else if (!burst && !mt_safe)
+		else if (!burst && !internal_port)
 			return pipeline_queue_worker_multi_stage_fwd(arg);
-		else if (burst && mt_safe)
+		else if (burst && internal_port)
 			return pipeline_queue_worker_multi_stage_burst_tx(arg);
-		else if (burst && !mt_safe)
+		else if (burst && !internal_port)
 			return pipeline_queue_worker_multi_stage_burst_fwd(arg);
 
 	}
@@ -311,10 +305,6 @@  worker_wrapper(void *arg)
 static int
 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
 {
-	struct test_pipeline *t = evt_test_priv(test);
-
-	if (t->mt_unsafe)
-		rte_service_component_runstate_set(t->tx_service.service_id, 1);
 	return pipeline_launch_lcores(test, opt, worker_wrapper);
 }
 
@@ -326,25 +316,21 @@  pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	int nb_queues;
 	int nb_stages = opt->nb_stages;
 	uint8_t queue;
-	struct rte_event_dev_info info;
-	struct test_pipeline *t = evt_test_priv(test);
-	uint8_t tx_evqueue_id = 0;
+	uint8_t tx_evport_id = 0;
+	uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS] = {0};
 	uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
 	uint8_t nb_worker_queues = 0;
+	uint16_t prod = 0;
+	struct rte_event_dev_info info;
+	struct test_pipeline *t = evt_test_priv(test);
 
 	nb_ports = evt_nr_active_lcores(opt->wlcores);
 	nb_queues = rte_eth_dev_count_avail() * (nb_stages);
 
-	/* Extra port for Tx service. */
-	if (t->mt_unsafe) {
-		tx_evqueue_id = nb_queues;
-		nb_ports++;
-		nb_queues++;
-	} else
-		nb_queues += rte_eth_dev_count_avail();
+	/* One queue for Tx adapter per port */
+	nb_queues += rte_eth_dev_count_avail();
 
 	rte_event_dev_info_get(opt->dev_id, &info);
-
 	const struct rte_event_dev_config config = {
 			.nb_event_queues = nb_queues,
 			.nb_event_ports = nb_ports,
@@ -370,24 +356,19 @@  pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	for (queue = 0; queue < nb_queues; queue++) {
 		uint8_t slot;
 
-		if (!t->mt_unsafe) {
-			slot = queue % (nb_stages + 1);
-			q_conf.schedule_type = slot == nb_stages ?
-				RTE_SCHED_TYPE_ATOMIC :
-				opt->sched_type_list[slot];
-		} else {
-			slot = queue % nb_stages;
-
-			if (queue == tx_evqueue_id) {
-				q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
+		q_conf.event_queue_cfg = 0;
+		slot = queue % (nb_stages + 1);
+		if (slot == nb_stages) {
+			q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
+			if (!t->internal_port) {
 				q_conf.event_queue_cfg =
 					RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
-			} else {
-				q_conf.schedule_type =
-					opt->sched_type_list[slot];
-				queue_arr[nb_worker_queues] = queue;
-				nb_worker_queues++;
 			}
+			tx_evqueue_id[prod++] = queue;
+		} else {
+			q_conf.schedule_type = opt->sched_type_list[slot];
+			queue_arr[nb_worker_queues] = queue;
+			nb_worker_queues++;
 		}
 
 		ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
@@ -407,19 +388,11 @@  pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 			.new_event_threshold = info.max_num_events,
 	};
 
-	/*
-	 * If tx is multi thread safe then allow workers to do Tx else use Tx
-	 * service to Tx packets.
-	 */
-	if (t->mt_unsafe) {
+	if (!t->internal_port) {
 		ret = pipeline_event_port_setup(test, opt, queue_arr,
 				nb_worker_queues, p_conf);
 		if (ret)
 			return ret;
-
-		ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
-				nb_ports - 1, p_conf);
-
 	} else
 		ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
 				p_conf);
@@ -431,7 +404,6 @@  pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	 *
 	 * eth_dev_count = 2, nb_stages = 2.
 	 *
-	 * Multi thread safe :
 	 *	queues = 6
 	 *	stride = 3
 	 *
@@ -439,21 +411,14 @@  pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 	 *	eth0 -> q0 -> q1 -> (q2->tx)
 	 *	eth1 -> q3 -> q4 -> (q5->tx)
 	 *
-	 *	q2, q5 configured as ATOMIC
-	 *
-	 * Multi thread unsafe :
-	 *	queues = 5
-	 *	stride = 2
+	 *	q2, q5 configured as ATOMIC | SINGLE_LINK
 	 *
-	 *	event queue pipelines:
-	 *	eth0 -> q0 -> q1
-	 *			} (q4->tx) Tx service
-	 *	eth1 -> q2 -> q3
-	 *
-	 *	q4 configured as SINGLE_LINK|ATOMIC
 	 */
-	ret = pipeline_event_rx_adapter_setup(opt,
-			t->mt_unsafe ? nb_stages : nb_stages + 1, p_conf);
+	ret = pipeline_event_rx_adapter_setup(opt, nb_stages + 1, p_conf);
+	if (ret)
+		return ret;
+
+	ret = pipeline_event_tx_adapter_setup(opt, p_conf);
 	if (ret)
 		return ret;
 
@@ -467,12 +432,60 @@  pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 		}
 	}
 
+	/* Connect the tx_evqueue_id to the Tx adapter port */
+	if (!t->internal_port) {
+		RTE_ETH_FOREACH_DEV(prod) {
+			ret = rte_event_eth_tx_adapter_event_port_get(prod,
+					&tx_evport_id);
+			if (ret) {
+				evt_err("Unable to get Tx adptr[%d] evprt[%d]",
+						prod, tx_evport_id);
+				return ret;
+			}
+
+			if (rte_event_port_link(opt->dev_id, tx_evport_id,
+						&tx_evqueue_id[prod],
+						NULL, 1) != 1) {
+				evt_err("Unable to link Tx adptr[%d] evprt[%d]",
+						prod, tx_evport_id);
+				return ret;
+			}
+		}
+	}
+
+	RTE_ETH_FOREACH_DEV(prod) {
+		ret = rte_eth_dev_start(prod);
+		if (ret) {
+			evt_err("Ethernet dev [%d] failed to start."
+					" Using synthetic producer", prod);
+			return ret;
+		}
+
+	}
+
 	ret = rte_event_dev_start(opt->dev_id);
 	if (ret) {
 		evt_err("failed to start eventdev %d", opt->dev_id);
 		return ret;
 	}
 
+	RTE_ETH_FOREACH_DEV(prod) {
+		ret = rte_event_eth_rx_adapter_start(prod);
+		if (ret) {
+			evt_err("Rx adapter[%d] start failed", prod);
+			return ret;
+		}
+
+		ret = rte_event_eth_tx_adapter_start(prod);
+		if (ret) {
+			evt_err("Tx adapter[%d] start failed", prod);
+			return ret;
+		}
+	}
+
+	memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
+			RTE_MAX_ETHPORTS);
+
 	return 0;
 }