[dpdk-dev] [PATCH 7/7 v2] net/dpaa: further push mode optimizations

Nipun Gupta nipun.gupta at nxp.com
Tue Jan 23 13:27:07 CET 2018


This patch supports batch processing of multiple packets
in the Rx side

Signed-off-by: Nipun Gupta <nipun.gupta at nxp.com>
Signed-off-by: Hemant Agrawal <hemant.agrawal at nxp.com>
---
 drivers/bus/dpaa/base/qbman/qman.c  | 89 ++++++++++++++++++-------------------
 drivers/bus/dpaa/include/fsl_qman.h | 10 +++++
 drivers/net/dpaa/dpaa_ethdev.c      |  9 +++-
 drivers/net/dpaa/dpaa_rxtx.c        | 81 +++++++++++++++++++++++++++++----
 drivers/net/dpaa/dpaa_rxtx.h        |  9 ++--
 5 files changed, 137 insertions(+), 61 deletions(-)

diff --git a/drivers/bus/dpaa/base/qbman/qman.c b/drivers/bus/dpaa/base/qbman/qman.c
index 4d8bdae..2b97671 100644
--- a/drivers/bus/dpaa/base/qbman/qman.c
+++ b/drivers/bus/dpaa/base/qbman/qman.c
@@ -1055,64 +1055,63 @@ unsigned int qman_portal_poll_rx(unsigned int poll_limit,
 				 void **bufs,
 				 struct qman_portal *p)
 {
-	const struct qm_dqrr_entry *dq;
-	struct qman_fq *fq;
-	enum qman_cb_dqrr_result res;
-	unsigned int limit = 0;
-#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
-	struct qm_dqrr_entry *shadow;
-#endif
-	unsigned int rx_number = 0;
+	struct qm_portal *portal = &p->p;
+	register struct qm_dqrr *dqrr = &portal->dqrr;
+	struct qm_dqrr_entry *dq[QM_DQRR_SIZE], *shadow[QM_DQRR_SIZE];
+	struct qman_fq *fq[QM_DQRR_SIZE];
+	unsigned int limit = 0, rx_number = 0;
+	uint32_t consume = 0;
 
 	do {
 		qm_dqrr_pvb_update(&p->p);
-		dq = qm_dqrr_current(&p->p);
-		if (unlikely(!dq))
+		if (!dqrr->fill)
 			break;
+
+		dq[rx_number] = dqrr->cursor;
+		dqrr->cursor = DQRR_CARRYCLEAR(dqrr->cursor + 1);
+		/* Prefetch the next DQRR entry */
+		rte_prefetch0(dqrr->cursor);
+
 #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
-	/* If running on an LE system the fields of the
-	 * dequeue entry must be swapper.  Because the
-	 * QMan HW will ignore writes the DQRR entry is
-	 * copied and the index stored within the copy
-	 */
-		shadow = &p->shadow_dqrr[DQRR_PTR2IDX(dq)];
-		*shadow = *dq;
-		dq = shadow;
-		shadow->fqid = be32_to_cpu(shadow->fqid);
-		shadow->contextB = be32_to_cpu(shadow->contextB);
-		shadow->seqnum = be16_to_cpu(shadow->seqnum);
-		hw_fd_to_cpu(&shadow->fd);
+		/* If running on an LE system the fields of the
+		 * dequeue entry must be swapper.  Because the
+		 * QMan HW will ignore writes the DQRR entry is
+		 * copied and the index stored within the copy
+		 */
+		shadow[rx_number] =
+			&p->shadow_dqrr[DQRR_PTR2IDX(dq[rx_number])];
+		shadow[rx_number]->fd.opaque_addr =
+			dq[rx_number]->fd.opaque_addr;
+		shadow[rx_number]->fd.addr =
+			be40_to_cpu(dq[rx_number]->fd.addr);
+		shadow[rx_number]->fd.opaque =
+			be32_to_cpu(dq[rx_number]->fd.opaque);
+#else
+		shadow = dq;
 #endif
 
 		/* SDQCR: context_b points to the FQ */
 #ifdef CONFIG_FSL_QMAN_FQ_LOOKUP
-		fq = get_fq_table_entry(dq->contextB);
+		fq[rx_number] = qman_fq_lookup_table[be32_to_cpu(
+						dq[rx_number]->contextB)];
 #else
-		fq = (void *)(uintptr_t)dq->contextB;
+		fq[rx_number] = (void *)(uintptr_t)be32_to_cpu(dq->contextB);
 #endif
-		/* Now let the callback do its stuff */
-		res = fq->cb.dqrr_dpdk_cb(NULL, p, fq, dq, &bufs[rx_number]);
+		fq[rx_number]->cb.dqrr_prepare(shadow[rx_number],
+						 &bufs[rx_number]);
+
+		consume |= (1 << (31 - DQRR_PTR2IDX(shadow[rx_number])));
 		rx_number++;
-		/* Interpret 'dq' from a driver perspective. */
-		/*
-		 * Parking isn't possible unless HELDACTIVE was set. NB,
-		 * FORCEELIGIBLE implies HELDACTIVE, so we only need to
-		 * check for HELDACTIVE to cover both.
-		 */
-		DPAA_ASSERT((dq->stat & QM_DQRR_STAT_FQ_HELDACTIVE) ||
-			    (res != qman_cb_dqrr_park));
-		qm_dqrr_cdc_consume_1ptr(&p->p, dq, res == qman_cb_dqrr_park);
-		/* Move forward */
-		qm_dqrr_next(&p->p);
-		/*
-		 * Entry processed and consumed, increment our counter.  The
-		 * callback can request that we exit after consuming the
-		 * entry, and we also exit if we reach our processing limit,
-		 * so loop back only if neither of these conditions is met.
-		 */
-	} while (likely(++limit < poll_limit));
+		--dqrr->fill;
+	} while (++limit < poll_limit);
 
-	return limit;
+	if (rx_number)
+		fq[0]->cb.dqrr_dpdk_pull_cb(fq, shadow, bufs, rx_number);
+
+	/* Consume all the DQRR enries together */
+	qm_out(DQRR_DCAP, (1 << 8) | consume);
+
+	return rx_number;
 }
 
 u32 qman_portal_dequeue(struct rte_event ev[], unsigned int poll_limit,
diff --git a/drivers/bus/dpaa/include/fsl_qman.h b/drivers/bus/dpaa/include/fsl_qman.h
index 99e46e1..e9793f3 100644
--- a/drivers/bus/dpaa/include/fsl_qman.h
+++ b/drivers/bus/dpaa/include/fsl_qman.h
@@ -1131,6 +1131,14 @@ typedef enum qman_cb_dqrr_result (*qman_dpdk_cb_dqrr)(void *event,
 					const struct qm_dqrr_entry *dqrr,
 					void **bd);
 
+/* This callback type is used when handling buffers in dpdk pull mode */
+typedef void (*qman_dpdk_pull_cb_dqrr)(struct qman_fq **fq,
+					struct qm_dqrr_entry **dqrr,
+					void **bufs,
+					int num_bufs);
+
+typedef void (*qman_dpdk_cb_prepare)(struct qm_dqrr_entry *dq, void **bufs);
+
 /*
  * This callback type is used when handling ERNs, FQRNs and FQRLs via MR. They
  * are always consumed after the callback returns.
@@ -1191,8 +1199,10 @@ enum qman_fq_state {
 struct qman_fq_cb {
 	union { /* for dequeued frames */
 		qman_dpdk_cb_dqrr dqrr_dpdk_cb;
+		qman_dpdk_pull_cb_dqrr dqrr_dpdk_pull_cb;
 		qman_cb_dqrr dqrr;
 	};
+	qman_dpdk_cb_prepare dqrr_prepare;
 	qman_cb_mr ern;		/* for s/w ERNs */
 	qman_cb_mr fqs;		/* frame-queue state changes*/
 };
diff --git a/drivers/net/dpaa/dpaa_ethdev.c b/drivers/net/dpaa/dpaa_ethdev.c
index b60ed3b..97679eb 100644
--- a/drivers/net/dpaa/dpaa_ethdev.c
+++ b/drivers/net/dpaa/dpaa_ethdev.c
@@ -503,7 +503,11 @@ int dpaa_eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
 				   QM_FQCTRL_CTXASTASHING |
 				   QM_FQCTRL_PREFERINCACHE;
 		opts.fqd.context_a.stashing.exclusive = 0;
-		opts.fqd.context_a.stashing.annotation_cl =
+		/* In muticore scenario stashing becomes a bottleneck on LS1046.
+		 * So do not enable stashing in this case
+		 */
+		if (dpaa_svr_family != SVR_LS1046A_FAMILY)
+			opts.fqd.context_a.stashing.annotation_cl =
 						DPAA_IF_RX_ANNOTATION_STASH;
 		opts.fqd.context_a.stashing.data_cl = DPAA_IF_RX_DATA_STASH;
 		opts.fqd.context_a.stashing.context_cl =
@@ -526,7 +530,8 @@ int dpaa_eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
 		if (ret)
 			DPAA_PMD_ERR("Channel/Queue association failed. fqid %d"
 				     " ret: %d", rxq->fqid, ret);
-		rxq->cb.dqrr_dpdk_cb = dpaa_rx_cb;
+		rxq->cb.dqrr_dpdk_pull_cb = dpaa_rx_cb;
+		rxq->cb.dqrr_prepare = dpaa_rx_cb_prepare;
 		rxq->is_static = true;
 	}
 	dev->data->rx_queues[queue_idx] = rxq;
diff --git a/drivers/net/dpaa/dpaa_rxtx.c b/drivers/net/dpaa/dpaa_rxtx.c
index f969ccf..fc8144f 100644
--- a/drivers/net/dpaa/dpaa_rxtx.c
+++ b/drivers/net/dpaa/dpaa_rxtx.c
@@ -399,17 +399,80 @@ struct rte_mbuf *
 	return mbuf;
 }
 
-enum qman_cb_dqrr_result dpaa_rx_cb(void *event __always_unused,
-				    struct qman_portal *qm __always_unused,
-				    struct qman_fq *fq,
-				    const struct qm_dqrr_entry *dqrr,
-				    void **bufs)
+void
+dpaa_rx_cb(struct qman_fq **fq, struct qm_dqrr_entry **dqrr,
+	   void **bufs, int num_bufs)
 {
-	const struct qm_fd *fd = &dqrr->fd;
+	struct rte_mbuf *mbuf;
+	struct dpaa_bp_info *bp_info;
+	const struct qm_fd *fd;
+	void *ptr;
+	struct dpaa_if *dpaa_intf;
+	uint16_t offset, i;
+	uint32_t length;
+	uint8_t format;
+
+	if (dpaa_svr_family != SVR_LS1046A_FAMILY) {
+		bp_info = DPAA_BPID_TO_POOL_INFO(dqrr[0]->fd.bpid);
+		ptr = rte_dpaa_mem_ptov(qm_fd_addr(&dqrr[0]->fd));
+		rte_prefetch0((void *)((uint8_t *)ptr + DEFAULT_RX_ICEOF));
+		bufs[0] = (struct rte_mbuf *)((char *)ptr -
+				bp_info->meta_data_size);
+	}
 
-	*bufs = dpaa_eth_fd_to_mbuf(fd,
-			((struct dpaa_if *)fq->dpaa_intf)->ifid);
-	return qman_cb_dqrr_consume;
+	for (i = 0; i < num_bufs; i++) {
+		if (dpaa_svr_family != SVR_LS1046A_FAMILY &&
+		    i < num_bufs - 1) {
+			bp_info = DPAA_BPID_TO_POOL_INFO(dqrr[i + 1]->fd.bpid);
+			ptr = rte_dpaa_mem_ptov(qm_fd_addr(&dqrr[i + 1]->fd));
+			rte_prefetch0((void *)((uint8_t *)ptr +
+					DEFAULT_RX_ICEOF));
+			bufs[i + 1] = (struct rte_mbuf *)((char *)ptr -
+					bp_info->meta_data_size);
+		}
+
+		fd = &dqrr[i]->fd;
+		dpaa_intf = fq[i]->dpaa_intf;
+
+		format = (fd->opaque & DPAA_FD_FORMAT_MASK) >>
+				DPAA_FD_FORMAT_SHIFT;
+		if (unlikely(format == qm_fd_sg)) {
+			bufs[i] = dpaa_eth_sg_to_mbuf(fd, dpaa_intf->ifid);
+			continue;
+		}
+
+		offset = (fd->opaque & DPAA_FD_OFFSET_MASK) >>
+				DPAA_FD_OFFSET_SHIFT;
+		length = fd->opaque & DPAA_FD_LENGTH_MASK;
+
+		mbuf = bufs[i];
+		mbuf->data_off = offset;
+		mbuf->data_len = length;
+		mbuf->pkt_len = length;
+		mbuf->port = dpaa_intf->ifid;
+
+		mbuf->nb_segs = 1;
+		mbuf->ol_flags = 0;
+		mbuf->next = NULL;
+		rte_mbuf_refcnt_set(mbuf, 1);
+		dpaa_eth_packet_info(mbuf, (uint64_t)mbuf->buf_addr);
+	}
+}
+
+void dpaa_rx_cb_prepare(struct qm_dqrr_entry *dq, void **bufs)
+{
+	struct dpaa_bp_info *bp_info = DPAA_BPID_TO_POOL_INFO(dq->fd.bpid);
+	void *ptr = rte_dpaa_mem_ptov(qm_fd_addr(&dq->fd));
+
+	/* In case of LS1046, annotation stashing is disabled due to L2 cache
+	 * being bottleneck in case of multicore scanario for this platform.
+	 * So we prefetch the annoation beforehand, so that it is available
+	 * in cache when accessed.
+	 */
+	if (dpaa_svr_family == SVR_LS1046A_FAMILY)
+		rte_prefetch0((void *)((uint8_t *)ptr + DEFAULT_RX_ICEOF));
+
+	*bufs = (struct rte_mbuf *)((char *)ptr - bp_info->meta_data_size);
 }
 
 static uint16_t
diff --git a/drivers/net/dpaa/dpaa_rxtx.h b/drivers/net/dpaa/dpaa_rxtx.h
index 29d8f95..d3e6351 100644
--- a/drivers/net/dpaa/dpaa_rxtx.h
+++ b/drivers/net/dpaa/dpaa_rxtx.h
@@ -268,9 +268,8 @@ int dpaa_eth_mbuf_to_sg_fd(struct rte_mbuf *mbuf,
 			   struct qm_fd *fd,
 			   uint32_t bpid);
 
-enum qman_cb_dqrr_result dpaa_rx_cb(void *event,
-				    struct qman_portal *qm,
-				    struct qman_fq *fq,
-				    const struct qm_dqrr_entry *dqrr,
-				    void **bd);
+void dpaa_rx_cb(struct qman_fq **fq,
+		struct qm_dqrr_entry **dqrr, void **bufs, int num_bufs);
+
+void dpaa_rx_cb_prepare(struct qm_dqrr_entry *dq, void **bufs);
 #endif
-- 
1.9.1



More information about the dev mailing list