[dpdk-dev,11/13] examples/eventdev: add atq single stage pipeline worker
Checks
Commit Message
Add optimized eventdev pipeline when ethdev supports thread safe Tx,
number of configured stages is one and all type queue option is enabled.
Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
.../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c | 88 +++++++++++++++++++++-
1 file changed, 86 insertions(+), 2 deletions(-)
Comments
> From: Pavan Nikhilesh [mailto:pbhagavatula@caviumnetworks.com]
> Sent: Thursday, December 7, 2017 8:37 PM
> To: Eads, Gage <gage.eads@intel.com>; jerin.jacobkollanukkaran@cavium.com;
> Van Haaren, Harry <harry.van.haaren@intel.com>; Rao, Nikhil
> <nikhil.rao@intel.com>; hemant.agrawal@nxp.com; Ma, Liang J
> <liang.j.ma@intel.com>
> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> Subject: [PATCH 11/13] examples/eventdev: add atq single stage pipeline
> worker
>
> Add optimized eventdev pipeline when ethdev supports thread safe Tx,
> number of configured stages is one and all type queue option is enabled.
>
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
> ---
> .../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c | 88
> +++++++++++++++++++++-
> 1 file changed, 86 insertions(+), 2 deletions(-)
> if (cdata.num_stages == 1) {
> - if (burst)
> + if (burst && atq)
> + caps->worker_loop = worker_do_tx_single_burst_atq;
> + if (burst && !atq)
> caps->worker_loop = worker_do_tx_single_burst;
> - if (!burst)
> + if (!burst && atq)
> + caps->worker_loop = worker_do_tx_single_atq;
> + if (!burst && !atq)
> caps->worker_loop = worker_do_tx_single;
> } else {
> if (burst && atq)
As per previous notes, this doesn't scale.
@@ -106,6 +106,41 @@ worker_do_tx_single(void *arg)
return 0;
}
+static int
+worker_do_tx_single_atq(void *arg)
+{
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+ struct rte_event ev;
+
+ while (!fdata->done) {
+
+ if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+ rte_pause();
+ continue;
+ }
+
+ received++;
+
+ if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev.mbuf);
+ tx++;
+ continue;
+ }
+ work(ev.mbuf);
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
static int
worker_do_tx_single_burst(void *arg)
{
@@ -153,6 +188,51 @@ worker_do_tx_single_burst(void *arg)
return 0;
}
+static int
+worker_do_tx_single_burst_atq(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE + 1];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+ BATCH_SIZE, 0);
+
+ if (!nb_rx) {
+ rte_pause();
+ continue;
+ }
+
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ rte_prefetch0(ev[i + 1].mbuf);
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+ worker_tx_pkt(ev[i].mbuf);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ tx++;
+ } else
+ worker_fwd_event(&ev[i],
+ RTE_SCHED_TYPE_ATOMIC);
+ work(ev[i].mbuf);
+ }
+
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
/* Multi stage Pipeline Workers */
static int
@@ -697,9 +777,13 @@ set_worker_tx_setup_data(struct setup_data *caps, bool burst)
uint8_t atq = cdata.all_type_queues ? 1 : 0;
if (cdata.num_stages == 1) {
- if (burst)
+ if (burst && atq)
+ caps->worker_loop = worker_do_tx_single_burst_atq;
+ if (burst && !atq)
caps->worker_loop = worker_do_tx_single_burst;
- if (!burst)
+ if (!burst && atq)
+ caps->worker_loop = worker_do_tx_single_atq;
+ if (!burst && !atq)
caps->worker_loop = worker_do_tx_single;
} else {
if (burst && atq)