[dpdk-dev,08/13] examples/eventdev: add burst for thread safe pipeline

Message ID 20171207203705.25020-9-pbhagavatula@caviumnetworks.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Pavan Nikhilesh Dec. 7, 2017, 8:37 p.m. UTC
  Add burst mode worker pipeline when Tx is multi thread safe.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
 .../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c  | 78 +++++++++++++++++++++-
 1 file changed, 76 insertions(+), 2 deletions(-)
  

Patch

diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
index 31b7d8936..a824f1f49 100644
--- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
@@ -48,6 +48,19 @@  worker_event_enqueue(const uint8_t dev, const uint8_t port,
 		rte_pause();
 }
 
+static __rte_always_inline void
+worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+		struct rte_event *ev, const uint16_t nb_rx)
+{
+	uint16_t enq;
+
+	enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+	while (enq < nb_rx) {
+		enq += rte_event_enqueue_burst(dev, port,
+						ev + enq, nb_rx - enq);
+	}
+}
+
 static __rte_always_inline void
 worker_tx_pkt(struct rte_mbuf *mbuf)
 {
@@ -106,6 +119,65 @@  worker_do_tx(void *arg)
 	return 0;
 }
 
+static int
+worker_do_tx_burst(void *arg)
+{
+	struct rte_event ev[BATCH_SIZE];
+
+	struct worker_data *data = (struct worker_data *)arg;
+	uint8_t dev = data->dev_id;
+	uint8_t port = data->port_id;
+	uint8_t lst_qid = cdata.num_stages - 1;
+	size_t fwd = 0, received = 0, tx = 0;
+
+	while (!fdata->done) {
+		uint16_t i;
+		const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+				ev, BATCH_SIZE, 0);
+
+		if (nb_rx == 0) {
+			rte_pause();
+			continue;
+		}
+		received += nb_rx;
+
+		for (i = 0; i < nb_rx; i++) {
+			const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
+
+			if (cq_id >= lst_qid) {
+				if (ev[i].sched_type ==
+						RTE_SCHED_TYPE_ATOMIC) {
+					worker_tx_pkt(ev[i].mbuf);
+					tx++;
+					ev[i].op = RTE_EVENT_OP_RELEASE;
+					continue;
+				}
+				ev[i].queue_id = (cq_id == lst_qid) ?
+					cdata.next_qid[ev[i].queue_id] :
+					ev[i].queue_id;
+
+				worker_fwd_event(&ev[i],
+						RTE_SCHED_TYPE_ATOMIC);
+			} else {
+				ev[i].queue_id = cdata.next_qid[
+					ev[i].queue_id];
+				worker_fwd_event(&ev[i],
+						cdata.queue_type);
+			}
+			work(ev[i].mbuf);
+		}
+		worker_event_enqueue_burst(dev, port, ev, nb_rx);
+
+		fwd += nb_rx;
+	}
+
+	if (!cdata.quiet)
+		printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+				rte_lcore_id(), received, fwd, tx);
+
+	return 0;
+}
+
 static int
 setup_eventdev_w(struct prod_data *prod_data,
 		struct cons_data *cons_data,
@@ -422,8 +494,10 @@  opt_check(void)
 void
 set_worker_tx_setup_data(struct setup_data *caps, bool burst)
 {
-	RTE_SET_USED(burst);
-	caps->worker_loop = worker_do_tx;
+	if (burst)
+		caps->worker_loop = worker_do_tx_burst;
+	if (!burst)
+		caps->worker_loop = worker_do_tx;
 
 	caps->opt_check = opt_check;
 	caps->consumer_loop = NULL;