[dpdk-dev,v2,12/15] examples/eventdev: add atq single stage pipeline worker
Checks
Commit Message
Add optimized eventdev pipeline when ethdev supports thread safe Tx,
number of configured stages is one and all type queue option is enabled.
Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
.../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c | 104 ++++++++++++++++++++-
1 file changed, 101 insertions(+), 3 deletions(-)
@@ -81,6 +81,41 @@ worker_do_tx_single(void *arg)
return 0;
}
+static int
+worker_do_tx_single_atq(void *arg)
+{
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+ struct rte_event ev;
+
+ while (!fdata->done) {
+
+ if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+ rte_pause();
+ continue;
+ }
+
+ received++;
+
+ if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev.mbuf);
+ tx++;
+ continue;
+ }
+ work();
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
static int
worker_do_tx_single_burst(void *arg)
{
@@ -127,6 +162,50 @@ worker_do_tx_single_burst(void *arg)
return 0;
}
+static int
+worker_do_tx_single_burst_atq(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE + 1];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+ BATCH_SIZE, 0);
+
+ if (!nb_rx) {
+ rte_pause();
+ continue;
+ }
+
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ rte_prefetch0(ev[i + 1].mbuf);
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+ worker_tx_pkt(ev[i].mbuf);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ tx++;
+ } else
+ worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+ work();
+ }
+
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
/* Multi stage Pipeline Workers */
static int
@@ -683,6 +762,24 @@ worker_tx_opt_check(void)
}
}
+static worker_loop
+get_worker_loop_single_burst(uint8_t atq)
+{
+ if (atq)
+ return worker_do_tx_single_burst_atq;
+
+ return worker_do_tx_single_burst;
+}
+
+static worker_loop
+get_worker_loop_single_non_burst(uint8_t atq)
+{
+ if (atq)
+ return worker_do_tx_single_atq;
+
+ return worker_do_tx_single;
+}
+
static worker_loop
get_worker_loop_burst(uint8_t atq)
{
@@ -704,10 +801,12 @@ get_worker_loop_non_burst(uint8_t atq)
static worker_loop
get_worker_single_stage(bool burst)
{
+ uint8_t atq = cdata.all_type_queues ? 1 : 0;
+
if (burst)
- return worker_do_tx_single_burst;
+ return get_worker_loop_single_burst(atq);
- return worker_do_tx_single;
+ return get_worker_loop_single_non_burst(atq);
}
static worker_loop
@@ -736,5 +835,4 @@ set_worker_tx_setup_data(struct setup_data *caps, bool burst)
caps->scheduler = schedule_devices;
caps->evdev_setup = setup_eventdev_worker_tx;
caps->adptr_setup = init_rx_adapter;
-
}