[dpdk-dev,08/13] examples/eventdev: add burst for thread safe pipeline
Checks
Commit Message
Add burst mode worker pipeline when Tx is multi thread safe.
Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
.../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c | 78 +++++++++++++++++++++-
1 file changed, 76 insertions(+), 2 deletions(-)
@@ -48,6 +48,19 @@ worker_event_enqueue(const uint8_t dev, const uint8_t port,
rte_pause();
}
+static __rte_always_inline void
+worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+ struct rte_event *ev, const uint16_t nb_rx)
+{
+ uint16_t enq;
+
+ enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+ while (enq < nb_rx) {
+ enq += rte_event_enqueue_burst(dev, port,
+ ev + enq, nb_rx - enq);
+ }
+}
+
static __rte_always_inline void
worker_tx_pkt(struct rte_mbuf *mbuf)
{
@@ -106,6 +119,65 @@ worker_do_tx(void *arg)
return 0;
}
+static int
+worker_do_tx_burst(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ uint8_t dev = data->dev_id;
+ uint8_t port = data->port_id;
+ uint8_t lst_qid = cdata.num_stages - 1;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+ ev, BATCH_SIZE, 0);
+
+ if (nb_rx == 0) {
+ rte_pause();
+ continue;
+ }
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
+
+ if (cq_id >= lst_qid) {
+ if (ev[i].sched_type ==
+ RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev[i].mbuf);
+ tx++;
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ continue;
+ }
+ ev[i].queue_id = (cq_id == lst_qid) ?
+ cdata.next_qid[ev[i].queue_id] :
+ ev[i].queue_id;
+
+ worker_fwd_event(&ev[i],
+ RTE_SCHED_TYPE_ATOMIC);
+ } else {
+ ev[i].queue_id = cdata.next_qid[
+ ev[i].queue_id];
+ worker_fwd_event(&ev[i],
+ cdata.queue_type);
+ }
+ work(ev[i].mbuf);
+ }
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+
+ return 0;
+}
+
static int
setup_eventdev_w(struct prod_data *prod_data,
struct cons_data *cons_data,
@@ -422,8 +494,10 @@ opt_check(void)
void
set_worker_tx_setup_data(struct setup_data *caps, bool burst)
{
- RTE_SET_USED(burst);
- caps->worker_loop = worker_do_tx;
+ if (burst)
+ caps->worker_loop = worker_do_tx_burst;
+ if (!burst)
+ caps->worker_loop = worker_do_tx;
caps->opt_check = opt_check;
caps->consumer_loop = NULL;