[dpdk-dev,v3,09/12] app/eventdev: add pipeline queue worker functions

Message ID 20180110145144.28403-9-pbhagavatula@caviumnetworks.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Pavan Nikhilesh Jan. 10, 2018, 2:51 p.m. UTC
  Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
 app/test-eventdev/test_pipeline_common.h |  80 +++++++
 app/test-eventdev/test_pipeline_queue.c  | 367 ++++++++++++++++++++++++++++++-
 2 files changed, 446 insertions(+), 1 deletion(-)
  

Comments

Van Haaren, Harry Jan. 10, 2018, 4:45 p.m. UTC | #1
> From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> Sent: Wednesday, January 10, 2018 2:52 PM
> To: jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Van
> Haaren, Harry <harry.van.haaren@intel.com>; Eads, Gage
> <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com; Ma,
> Liang J <liang.j.ma@intel.com>
> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> Subject: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue worker
> functions
> 

<snip>


> +static __rte_always_inline void
> +pipeline_tx_pkt_safe(struct rte_mbuf *mbuf)
> +{
> +	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
> +		rte_pause();
> +}

re safe, see comment below

> +
> +static __rte_always_inline void
> +pipeline_tx_pkt_unsafe(struct rte_mbuf *mbuf, struct test_pipeline *t)
> +{
> +	rte_spinlock_t *lk = &t->tx_lk[mbuf->port];
> +
> +	rte_spinlock_lock(lk);
> +	pipeline_tx_pkt_safe(mbuf);
> +	rte_spinlock_unlock(lk);
> +}

IIRC usually the "Safe" version of a function has extra locks/protection, while the "normal" version has better performance, but less-error-checking.

Here, the "unsafe" function does the extra locking. If looking from the HW POV, that makes sense, but I think its inverted from most existing code...

Happy to be proved wrong here .. ?

<snip>

> +static int
> +pipeline_queue_worker_single_stage_safe(void *arg)
> +{
> +	struct worker_data *w  = arg;
> +	struct test_pipeline *t = w->t;
> +	const uint8_t dev = w->dev_id;
> +	const uint8_t port = w->port_id;
> +	struct rte_event ev;
> +
> +	while (t->done == false) {
> +		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> +
> +		if (!event) {
> +			rte_pause();
> +			continue;
> +		}
> +
> +		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
> +			pipeline_tx_pkt_safe(ev.mbuf);

I guess that means that the functions where they're used are inverted in name too.

<snip>
  
Van Haaren, Harry Jan. 10, 2018, 4:53 p.m. UTC | #2
Replying to self...

> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Van Haaren, Harry
> Sent: Wednesday, January 10, 2018 4:45 PM
> To: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>;
> jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Eads,
> Gage <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com; Ma,
> Liang J <liang.j.ma@intel.com>
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue
> worker functions
> 
> > From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> > Sent: Wednesday, January 10, 2018 2:52 PM
> > To: jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Van
> > Haaren, Harry <harry.van.haaren@intel.com>; Eads, Gage
> > <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com; Ma,
> > Liang J <liang.j.ma@intel.com>
> > Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > Subject: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue
> worker
> > functions
> >
> 
> <snip>
> 
> 
> > +static __rte_always_inline void
> > +pipeline_tx_pkt_safe(struct rte_mbuf *mbuf)
> > +{
> > +	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
> > +		rte_pause();
> > +}
> 
> re safe, see comment below
> 
> > +
> > +static __rte_always_inline void
> > +pipeline_tx_pkt_unsafe(struct rte_mbuf *mbuf, struct test_pipeline *t)
> > +{
> > +	rte_spinlock_t *lk = &t->tx_lk[mbuf->port];
> > +
> > +	rte_spinlock_lock(lk);
> > +	pipeline_tx_pkt_safe(mbuf);
> > +	rte_spinlock_unlock(lk);
> > +}
> 
> IIRC usually the "Safe" version of a function has extra locks/protection,
> while the "normal" version has better performance, but less-error-checking.
> 
> Here, the "unsafe" function does the extra locking. If looking from the HW
> POV, that makes sense, but I think its inverted from most existing code...
> 
> Happy to be proved wrong here .. ?
> 
> <snip>


Thinking a little more about this, also in light of patch 11/12 of this series.

The code here has a "safe" and "unsafe" version of TX. This involves adding a spinlock inside the code, which is being locked/unlocked before doing the actual TX action.

I don't understand why this is necessary? DPDK's general stance on locking for data-path is DPDK functions do not provide locks, and that application level must implement thread-synchronization if it is required.

In this case, the app/eventdev can be considered an App, but I don't like the idea of providing a sample application and code that duplicates core functionality with safe/unsafe versions..

Hope I'm making some sense here..


> 
> > +static int
> > +pipeline_queue_worker_single_stage_safe(void *arg)
> > +{
> > +	struct worker_data *w  = arg;
> > +	struct test_pipeline *t = w->t;
> > +	const uint8_t dev = w->dev_id;
> > +	const uint8_t port = w->port_id;
> > +	struct rte_event ev;
> > +
> > +	while (t->done == false) {
> > +		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> > +
> > +		if (!event) {
> > +			rte_pause();
> > +			continue;
> > +		}
> > +
> > +		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
> > +			pipeline_tx_pkt_safe(ev.mbuf);
> 
> I guess that means that the functions where they're used are inverted in
> name too.
> 
> <snip>
  
Pavan Nikhilesh Jan. 10, 2018, 8:17 p.m. UTC | #3
On Wed, Jan 10, 2018 at 04:53:53PM +0000, Van Haaren, Harry wrote:
> Replying to self...
>
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Van Haaren, Harry
> > Sent: Wednesday, January 10, 2018 4:45 PM
> > To: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>;
> > jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Eads,
> > Gage <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com; Ma,
> > Liang J <liang.j.ma@intel.com>
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue
> > worker functions
> >
> > > From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> > > Sent: Wednesday, January 10, 2018 2:52 PM
> > > To: jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Van
> > > Haaren, Harry <harry.van.haaren@intel.com>; Eads, Gage
> > > <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com; Ma,
> > > Liang J <liang.j.ma@intel.com>
> > > Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > > Subject: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue
> > worker
> > > functions
> > >
> >
> > <snip>
> >
> >
> > > +static __rte_always_inline void
> > > +pipeline_tx_pkt_safe(struct rte_mbuf *mbuf)
> > > +{
> > > +	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
> > > +		rte_pause();
> > > +}
> >
> > re safe, see comment below
> >
> > > +
> > > +static __rte_always_inline void
> > > +pipeline_tx_pkt_unsafe(struct rte_mbuf *mbuf, struct test_pipeline *t)
> > > +{
> > > +	rte_spinlock_t *lk = &t->tx_lk[mbuf->port];
> > > +
> > > +	rte_spinlock_lock(lk);
> > > +	pipeline_tx_pkt_safe(mbuf);
> > > +	rte_spinlock_unlock(lk);
> > > +}
> >
> > IIRC usually the "Safe" version of a function has extra locks/protection,
> > while the "normal" version has better performance, but less-error-checking.
> >
> > Here, the "unsafe" function does the extra locking. If looking from the HW
> > POV, that makes sense, but I think its inverted from most existing code...
> >
> > Happy to be proved wrong here .. ?
> >
> > <snip>
>
>
> Thinking a little more about this, also in light of patch 11/12 of this series.
>
> The code here has a "safe" and "unsafe" version of TX. This involves adding a spinlock inside the code, which is being locked/unlocked before doing the actual TX action.
>
> I don't understand why this is necessary? DPDK's general stance on locking for data-path is DPDK functions do not provide locks, and that application level must implement thread-synchronization if it is required.
>
> In this case, the app/eventdev can be considered an App, but I don't like the idea of providing a sample application and code that duplicates core functionality with safe/unsafe versions..
>

Some PMD's (net/octeontx) have capability to do multi-thread safe Tx where no
thread-synchronization is required. This is exposed via the offload flag
'DEV_TX_OFFLOAD_MT_LOCKFREE'.

So, the _safe Tx functions are selected based on the above offload capability
and when the capability is absent _unsafe Tx functions are selected i.e.
synchronized Tx via spin locks based on the Egress port id.

The patch 5/12 has the below check to see if the connected ethernet dev(s) have
the capability to do thread safe Tx

+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		struct rte_eth_dev_info dev_info;
....
+		rte_eth_dev_info_get(i, &dev_info);
+		mt_state = !(dev_info.tx_offload_capa &
+				DEV_TX_OFFLOAD_MT_LOCKFREE);
....
+		t->mt_unsafe |= mt_state;
+	}

Based on the value of t->mt_unsafe the appropriate worker function is selected.

> Hope I'm making some sense here..
>
Hope this clears thing up.

Cheers,
Pavan.

>
> >
> > > +static int
> > > +pipeline_queue_worker_single_stage_safe(void *arg)
> > > +{
> > > +	struct worker_data *w  = arg;
> > > +	struct test_pipeline *t = w->t;
> > > +	const uint8_t dev = w->dev_id;
> > > +	const uint8_t port = w->port_id;
> > > +	struct rte_event ev;
> > > +
> > > +	while (t->done == false) {
> > > +		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
> > > +
> > > +		if (!event) {
> > > +			rte_pause();
> > > +			continue;
> > > +		}
> > > +
> > > +		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
> > > +			pipeline_tx_pkt_safe(ev.mbuf);
> >
> > I guess that means that the functions where they're used are inverted in
> > name too.
> >
> > <snip>
  
Van Haaren, Harry Jan. 11, 2018, 12:17 p.m. UTC | #4
> From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> Sent: Wednesday, January 10, 2018 8:17 PM
> To: Van Haaren, Harry <harry.van.haaren@intel.com>;
> jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Eads,
> Gage <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com; Ma,
> Liang J <liang.j.ma@intel.com>
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue
> worker functions
> 
> On Wed, Jan 10, 2018 at 04:53:53PM +0000, Van Haaren, Harry wrote:
> > Replying to self...
> >
> > > -----Original Message-----
> > > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Van Haaren, Harry
> > > Sent: Wednesday, January 10, 2018 4:45 PM
> > > To: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>;
> > > jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Eads,
> > > Gage <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com;
> Ma,
> > > Liang J <liang.j.ma@intel.com>
> > > Cc: dev@dpdk.org
> > > Subject: Re: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline
> queue
> > > worker functions
> > >
> > > > From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> > > > Sent: Wednesday, January 10, 2018 2:52 PM
> > > > To: jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com;
> Van
> > > > Haaren, Harry <harry.van.haaren@intel.com>; Eads, Gage
> > > > <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com;
> Ma,
> > > > Liang J <liang.j.ma@intel.com>
> > > > Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> > > > Subject: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue
> > > worker
> > > > functions
> > > >
> > >
> > > <snip>
> > >
> > >
> > > > +static __rte_always_inline void
> > > > +pipeline_tx_pkt_safe(struct rte_mbuf *mbuf)
> > > > +{
> > > > +	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
> > > > +		rte_pause();
> > > > +}
> > >
> > > re safe, see comment below
> > >
> > > > +
> > > > +static __rte_always_inline void
> > > > +pipeline_tx_pkt_unsafe(struct rte_mbuf *mbuf, struct test_pipeline
> *t)
> > > > +{
> > > > +	rte_spinlock_t *lk = &t->tx_lk[mbuf->port];
> > > > +
> > > > +	rte_spinlock_lock(lk);
> > > > +	pipeline_tx_pkt_safe(mbuf);
> > > > +	rte_spinlock_unlock(lk);
> > > > +}
> > >
> > > IIRC usually the "Safe" version of a function has extra
> locks/protection,
> > > while the "normal" version has better performance, but less-error-
> checking.
> > >
> > > Here, the "unsafe" function does the extra locking. If looking from the
> HW
> > > POV, that makes sense, but I think its inverted from most existing
> code...
> > >
> > > Happy to be proved wrong here .. ?
> > >
> > > <snip>
> >
> >
> > Thinking a little more about this, also in light of patch 11/12 of this
> series.
> >
> > The code here has a "safe" and "unsafe" version of TX. This involves
> adding a spinlock inside the code, which is being locked/unlocked before
> doing the actual TX action.
> >
> > I don't understand why this is necessary? DPDK's general stance on locking
> for data-path is DPDK functions do not provide locks, and that application
> level must implement thread-synchronization if it is required.
> >
> > In this case, the app/eventdev can be considered an App, but I don't like
> the idea of providing a sample application and code that duplicates core
> functionality with safe/unsafe versions..
> >
> 
> Some PMD's (net/octeontx) have capability to do multi-thread safe Tx where
> no
> thread-synchronization is required. This is exposed via the offload flag
> 'DEV_TX_OFFLOAD_MT_LOCKFREE'.

Yes understood.


> So, the _safe Tx functions are selected based on the above offload
> capability
> and when the capability is absent _unsafe Tx functions are selected i.e.
> synchronized Tx via spin locks based on the Egress port id.


This part changes the current behavior of the sample app.

Currently there is a (SINGLE_LINK | ATOMIC) stage at the end of the pipeline, which performs this "many-to-one" action, allowing a single core to dequeue all TX traffic, and perform the TX operation in a lock-free manner.

Changing this to a locking mechanism is going to hurt performance on platforms that do not support TX_OFFLOAD_MT_LOCKFREE.

In my opinion, the correct fix is to alter the overall pipeline, and always use lockless TX. Examples below;

NO TX_OFFLOAD_MT_LOCKFREE:

   Eth RX adapter -> stage 1 -> stage 2...(N-1) -> stage N -> stage TX (Atomic | SINGLE_LINK) -> eth TX


WITH TX_OFFLOAD_MT_LOCKFREE:

   Eth RX adapter -> stage 1 -> stage 2...(N-1) -> stage N -> eth TX MT Capable


By configuring the pipeline based on MT_OFFLOAD_LOCKFREE capability flag, and adding the SINGLE_LINK at the end if required, we can support both models without resorting to locked TX functions.

I think this will lead to a cleaner and more performant solution.

<snip>
  
Pavan Nikhilesh Jan. 11, 2018, 1:52 p.m. UTC | #5
On Thu, Jan 11, 2018 at 12:17:38PM +0000, Van Haaren, Harry wrote:
 > >
 <snip>
> > > Thinking a little more about this, also in light of patch 11/12 of this
> > series.
> > >
> > > The code here has a "safe" and "unsafe" version of TX. This involves
> > adding a spinlock inside the code, which is being locked/unlocked before
> > doing the actual TX action.
> > >
> > > I don't understand why this is necessary? DPDK's general stance on locking
> > for data-path is DPDK functions do not provide locks, and that application
> > level must implement thread-synchronization if it is required.
> > >
> > > In this case, the app/eventdev can be considered an App, but I don't like
> > the idea of providing a sample application and code that duplicates core
> > functionality with safe/unsafe versions..
> > >
> >
> > Some PMD's (net/octeontx) have capability to do multi-thread safe Tx where
> > no
> > thread-synchronization is required. This is exposed via the offload flag
> > 'DEV_TX_OFFLOAD_MT_LOCKFREE'.
>
> Yes understood.
>
>
> > So, the _safe Tx functions are selected based on the above offload
> > capability
> > and when the capability is absent _unsafe Tx functions are selected i.e.
> > synchronized Tx via spin locks based on the Egress port id.
>
>
> This part changes the current behavior of the sample app.
>
> Currently there is a (SINGLE_LINK | ATOMIC) stage at the end of the pipeline, which performs this "many-to-one" action, allowing a single core to dequeue all TX traffic, and perform the TX operation in a lock-free manner.
>
> Changing this to a locking mechanism is going to hurt performance on platforms that do not support TX_OFFLOAD_MT_LOCKFREE.
>
> In my opinion, the correct fix is to alter the overall pipeline, and always use lockless TX. Examples below;
>
> NO TX_OFFLOAD_MT_LOCKFREE:
>
>    Eth RX adapter -> stage 1 -> stage 2...(N-1) -> stage N -> stage TX (Atomic | SINGLE_LINK) -> eth TX

Agreed, when we detect that tx is not lockfree the workers would just forward
the events to  (Atomic | SINGLE_LINK) event queue which would be dequeued by a
service(mt_unsafe) and Tx them lockfree.

>
>
> WITH TX_OFFLOAD_MT_LOCKFREE:
>
>    Eth RX adapter -> stage 1 -> stage 2...(N-1) -> stage N -> eth TX MT Capable

The current lockfree pipeline would remain the same.
>
>
> By configuring the pipeline based on MT_OFFLOAD_LOCKFREE capability flag, and adding the SINGLE_LINK at the end if required, we can support both models without resorting to locked TX functions.
>
> I think this will lead to a cleaner and more performant solution.
>

Thoughts?

Pavan.

> <snip>
  
Van Haaren, Harry Jan. 11, 2018, 3:47 p.m. UTC | #6
> From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> Sent: Thursday, January 11, 2018 1:52 PM
> To: Van Haaren, Harry <harry.van.haaren@intel.com>;
> jerin.jacob@caviumnetworks.com; santosh.shukla@caviumnetworks.com; Eads,
> Gage <gage.eads@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com; Ma,
> Liang J <liang.j.ma@intel.com>
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3 09/12] app/eventdev: add pipeline queue
> worker functions
> 
> On Thu, Jan 11, 2018 at 12:17:38PM +0000, Van Haaren, Harry wrote:
>  > >
>  <snip>
> > > > Thinking a little more about this, also in light of patch 11/12 of
> this
> > > series.
> > > >
> > > > The code here has a "safe" and "unsafe" version of TX. This involves
> > > adding a spinlock inside the code, which is being locked/unlocked before
> > > doing the actual TX action.
> > > >
> > > > I don't understand why this is necessary? DPDK's general stance on
> locking
> > > for data-path is DPDK functions do not provide locks, and that
> application
> > > level must implement thread-synchronization if it is required.
> > > >
> > > > In this case, the app/eventdev can be considered an App, but I don't
> like
> > > the idea of providing a sample application and code that duplicates core
> > > functionality with safe/unsafe versions..
> > > >
> > >
> > > Some PMD's (net/octeontx) have capability to do multi-thread safe Tx
> where
> > > no
> > > thread-synchronization is required. This is exposed via the offload flag
> > > 'DEV_TX_OFFLOAD_MT_LOCKFREE'.
> >
> > Yes understood.
> >
> >
> > > So, the _safe Tx functions are selected based on the above offload
> > > capability
> > > and when the capability is absent _unsafe Tx functions are selected i.e.
> > > synchronized Tx via spin locks based on the Egress port id.
> >
> >
> > This part changes the current behavior of the sample app.
> >
> > Currently there is a (SINGLE_LINK | ATOMIC) stage at the end of the
> pipeline, which performs this "many-to-one" action, allowing a single core
> to dequeue all TX traffic, and perform the TX operation in a lock-free
> manner.
> >
> > Changing this to a locking mechanism is going to hurt performance on
> platforms that do not support TX_OFFLOAD_MT_LOCKFREE.
> >
> > In my opinion, the correct fix is to alter the overall pipeline, and
> always use lockless TX. Examples below;
> >
> > NO TX_OFFLOAD_MT_LOCKFREE:
> >
> >    Eth RX adapter -> stage 1 -> stage 2...(N-1) -> stage N -> stage TX
> (Atomic | SINGLE_LINK) -> eth TX
> 
> Agreed, when we detect that tx is not lockfree the workers would just
> forward
> the events to  (Atomic | SINGLE_LINK) event queue which would be dequeued by
> a
> service(mt_unsafe) and Tx them lockfree.
> 
> >
> >
> > WITH TX_OFFLOAD_MT_LOCKFREE:
> >
> >    Eth RX adapter -> stage 1 -> stage 2...(N-1) -> stage N -> eth TX MT
> Capable
> 
> The current lockfree pipeline would remain the same.
> >
> >
> > By configuring the pipeline based on MT_OFFLOAD_LOCKFREE capability flag,
> and adding the SINGLE_LINK at the end if required, we can support both
> models without resorting to locked TX functions.
> >
> > I think this will lead to a cleaner and more performant solution.
> >
> 
> Thoughts?

A quick summary of the issue here, and then an overview of my understanding of the proposed solution.


=== Issue ===
Ethdev hardware has a flag TX_OFFLOAD_MT_LOCKFREE, which when set means that multiple CPU threads can safely TX on a single ethdev-queue concurrently (aka; without locking). Not all hardware supports this, so applications must be able to gracefully handle hardware where this capability is not provided.


=== Solution ===
In eventdev pipelines with MT_LOCKFREE capability, the CPU running the last "worker" stage can also perform the ethdev-TX operation.

In eventdev pipelines without MT_LOCKFREE caps, we use a (Single Link | Atomic) stage to "fan in" the traffic to a single point, and use a TX service in order to abstract away the difference in CPU core requirements.



The above solution avoids placing locks in the datapath by modifying the pipeline design, and the difference in CPU requirements is abstracted by only registering the TX service if required.

Note that the TX service doesn't need the infrastructure like the RX adapter, as it is much simpler (dequeue from eventdev port, tx on ethdev port).


@Pavan, I believe this is the same solution as you - just making sure we're aligned!


Cheers, -Harry
  

Patch

diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
index 481cb133b..b50e20adc 100644
--- a/app/test-eventdev/test_pipeline_common.h
+++ b/app/test-eventdev/test_pipeline_common.h
@@ -51,6 +51,86 @@  struct test_pipeline {
 	uint8_t sched_type_list[EVT_MAX_STAGES] __rte_cache_aligned;
 } __rte_cache_aligned;
 
+#define BURST_SIZE 16
+
+static __rte_always_inline void
+pipeline_fwd_event(struct rte_event *ev, uint8_t sched)
+{
+	ev->event_type = RTE_EVENT_TYPE_CPU;
+	ev->op = RTE_EVENT_OP_FORWARD;
+	ev->sched_type = sched;
+}
+
+static __rte_always_inline void
+pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev)
+{
+	while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+		rte_pause();
+}
+
+static __rte_always_inline void
+pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev, const uint16_t nb_rx)
+{
+	uint16_t enq;
+
+	enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+	while (enq < nb_rx) {
+		enq += rte_event_enqueue_burst(dev, port,
+						ev + enq, nb_rx - enq);
+	}
+}
+
+static __rte_always_inline void
+pipeline_tx_pkt_safe(struct rte_mbuf *mbuf)
+{
+	while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+		rte_pause();
+}
+
+static __rte_always_inline void
+pipeline_tx_pkt_unsafe(struct rte_mbuf *mbuf, struct test_pipeline *t)
+{
+	rte_spinlock_t *lk = &t->tx_lk[mbuf->port];
+
+	rte_spinlock_lock(lk);
+	pipeline_tx_pkt_safe(mbuf);
+	rte_spinlock_unlock(lk);
+}
+
+static __rte_always_inline void
+pipeline_tx_unsafe_burst(struct rte_mbuf *mbuf, struct test_pipeline *t)
+{
+	uint16_t port = mbuf->port;
+	rte_spinlock_t *lk = &t->tx_lk[port];
+
+	rte_spinlock_lock(lk);
+	rte_eth_tx_buffer(port, 0, t->tx_buf[port], mbuf);
+	rte_spinlock_unlock(lk);
+}
+
+static __rte_always_inline void
+pipeline_tx_flush(struct test_pipeline *t, const uint8_t nb_ports)
+{
+	int i;
+	rte_spinlock_t *lk;
+
+	for (i = 0; i < nb_ports; i++) {
+		lk = &t->tx_lk[i];
+
+		rte_spinlock_lock(lk);
+		rte_eth_tx_buffer_flush(i, 0, t->tx_buf[i]);
+		rte_spinlock_unlock(lk);
+	}
+}
+
+static inline int
+pipeline_nb_event_ports(struct evt_options *opt)
+{
+	return evt_nr_active_lcores(opt->wlcores);
+}
+
 int pipeline_test_result(struct evt_test *test, struct evt_options *opt);
 int pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues);
 int pipeline_test_setup(struct evt_test *test, struct evt_options *opt);
diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c
index 4b50e7b54..bc3f3dc18 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -15,10 +15,375 @@  pipeline_queue_nb_event_queues(struct evt_options *opt)
 	return (eth_count * opt->nb_stages) + eth_count;
 }
 
+static int
+pipeline_queue_worker_single_stage_safe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+			pipeline_tx_pkt_safe(ev.mbuf);
+			w->processed_pkts++;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			pipeline_event_enqueue(dev, port, &ev);
+		}
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_unsafe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev;
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+			pipeline_tx_pkt_unsafe(ev.mbuf, t);
+			w->processed_pkts++;
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+			pipeline_event_enqueue(dev, port, &ev);
+		}
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_burst_safe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev[BURST_SIZE + 1];
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt_safe(ev[i].mbuf);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts++;
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+static int
+pipeline_queue_worker_single_stage_burst_unsafe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	struct rte_event ev[BURST_SIZE + 1];
+	const uint16_t nb_ports = rte_eth_dev_count();
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			pipeline_tx_flush(t, nb_ports);
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_unsafe_burst(ev[i].mbuf, t);
+				ev[i].op = RTE_EVENT_OP_RELEASE;
+				w->processed_pkts++;
+			} else {
+
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+
+	return 0;
+}
+
+
+static int
+pipeline_queue_worker_multi_stage_safe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	uint8_t cq_id;
+	struct rte_event ev;
+
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (cq_id >= last_queue) {
+			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt_safe(ev.mbuf);
+				w->processed_pkts++;
+				continue;
+			}
+			ev.queue_id += (cq_id == last_queue) ? 1 : 0;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_unsafe(void *arg)
+{
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	uint8_t cq_id;
+	struct rte_event ev;
+
+
+	while (t->done == false) {
+		uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+		if (!event) {
+			rte_pause();
+			continue;
+		}
+
+		cq_id = ev.queue_id % nb_stages;
+
+		if (cq_id >= last_queue) {
+			if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+				pipeline_tx_pkt_unsafe(ev.mbuf, t);
+				w->processed_pkts++;
+				continue;
+			}
+			ev.queue_id += (cq_id == last_queue) ? 1 : 0;
+			pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+		} else {
+			ev.queue_id++;
+			pipeline_fwd_event(&ev, sched_type_list[cq_id]);
+		}
+
+		pipeline_event_enqueue(dev, port, &ev);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_burst_safe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t cq_id;
+	struct rte_event ev[BURST_SIZE + 1];
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (cq_id >= last_queue) {
+				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+					pipeline_tx_pkt_safe(ev[i].mbuf);
+					ev[i].op = RTE_EVENT_OP_RELEASE;
+					w->processed_pkts++;
+					continue;
+				}
+
+				ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						sched_type_list[cq_id]);
+			}
+
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+	return 0;
+}
+
+static int
+pipeline_queue_worker_multi_stage_burst_unsafe(void *arg)
+{
+	int i;
+	struct worker_data *w  = arg;
+	struct test_pipeline *t = w->t;
+	const uint8_t dev = w->dev_id;
+	const uint8_t port = w->port_id;
+	uint8_t *const sched_type_list = &t->sched_type_list[0];
+	const uint8_t last_queue = t->opt->nb_stages - 1;
+	const uint8_t nb_stages = t->opt->nb_stages + 1;
+	uint8_t cq_id;
+	struct rte_event ev[BURST_SIZE + 1];
+	const uint16_t nb_ports = rte_eth_dev_count();
+
+	while (t->done == false) {
+		uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+				BURST_SIZE, 0);
+
+		if (!nb_rx) {
+			pipeline_tx_flush(t, nb_ports);
+			rte_pause();
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			rte_prefetch0(ev[i + 1].mbuf);
+			cq_id = ev[i].queue_id % nb_stages;
+
+			if (cq_id >= last_queue) {
+				if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+					pipeline_tx_unsafe_burst(ev[i].mbuf, t);
+					ev[i].op = RTE_EVENT_OP_RELEASE;
+					w->processed_pkts++;
+					continue;
+				}
+
+				ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
+				pipeline_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			} else {
+				ev[i].queue_id++;
+				pipeline_fwd_event(&ev[i],
+						sched_type_list[cq_id]);
+			}
+		}
+
+		pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+	}
+	return 0;
+}
+
 static int
 worker_wrapper(void *arg)
 {
-	RTE_SET_USED(arg);
+	struct worker_data *w  = arg;
+	struct evt_options *opt = w->t->opt;
+	const bool burst = evt_has_burst_mode(w->dev_id);
+	const bool mt_safe = !w->t->mt_unsafe;
+	const uint8_t nb_stages = opt->nb_stages;
+	RTE_SET_USED(opt);
+
+	/* allow compiler to optimize */
+	if (nb_stages == 1) {
+		if (!burst && mt_safe)
+			return pipeline_queue_worker_single_stage_safe(arg);
+		else if (!burst && !mt_safe)
+			return pipeline_queue_worker_single_stage_unsafe(
+					arg);
+		else if (burst && mt_safe)
+			return pipeline_queue_worker_single_stage_burst_safe(
+					arg);
+		else if (burst && !mt_safe)
+			return pipeline_queue_worker_single_stage_burst_unsafe(
+					arg);
+	} else {
+		if (!burst && mt_safe)
+			return pipeline_queue_worker_multi_stage_safe(arg);
+		else if (!burst && !mt_safe)
+			return pipeline_queue_worker_multi_stage_unsafe(arg);
+		if (burst && mt_safe)
+			return pipeline_queue_worker_multi_stage_burst_safe(
+					arg);
+		else if (burst && !mt_safe)
+			return pipeline_queue_worker_multi_stage_burst_unsafe(
+					arg);
+
+	}
 	rte_panic("invalid worker\n");
 }