[dpdk-dev] [Patch 1/2] i40e RX Bulk Alloc: Larger list size (33 to 128) throughput optimization

Polehn, Mike A mike.a.polehn at intel.com
Tue Oct 27 21:56:36 CET 2015


Combined 2 subroutines of code into one subroutine with one read operation followed by 
buffer allocate and load loop.

Eliminated the staging queue and subroutine, which removed extra pointer list movements 
and reduced number of active variable cache pages during for call.

Reduced queue position variables to just 2, the next read point and last NIC RX descriptor 
position, also changed to allowing NIC descriptor table to not always need to be filled.

Removed NIC register update write from per loop to one per driver write call to minimize CPU 
stalls waiting of multiple SMB synchronization points and for earlier NIC register writes that 
often take large cycle counts to complete. For example with an input packet list of 33, with 
the default loops size of 32, the second NIC register write will occur just after RX processing 
for just 1 packet, resulting in large CPU stall time.

Eliminated initial rx packet present test before rx processing loop that also checks, since less 
free time is generally available when packets are present than when not processing any input 
packets. 

Used some standard variables to help reduce overhead of non-standard variable sizes.

Reduced number of variables, reordered variable structure to put most active variables in 
first cache line, better utilize memory bytes inside cache line, and reduced active cache line 
count to 1 cache line during processing call. Other RX subroutine sets might still use more 
than 1 variable cache line.

Signed-off-by: Mike A. Polehn <mike.a.polehn at intel.com>

diff --git a/drivers/net/i40e/i40e_rxtx.c b/drivers/net/i40e/i40e_rxtx.c
index fd656d5..ea63f2f 100644
--- a/drivers/net/i40e/i40e_rxtx.c
+++ b/drivers/net/i40e/i40e_rxtx.c
@@ -63,6 +63,7 @@
 #define DEFAULT_TX_RS_THRESH   32
 #define DEFAULT_TX_FREE_THRESH 32
 #define I40E_MAX_PKT_TYPE      256
+#define I40E_RX_INPUT_BUF_MAX  256
 
 #define I40E_TX_MAX_BURST  32
 
@@ -959,115 +960,97 @@ check_rx_burst_bulk_alloc_preconditions(__rte_unused struct i40e_rx_queue *rxq)
 }
 
 #ifdef RTE_LIBRTE_I40E_RX_ALLOW_BULK_ALLOC
-#define I40E_LOOK_AHEAD 8
-#if (I40E_LOOK_AHEAD != 8)
-#error "PMD I40E: I40E_LOOK_AHEAD must be 8\n"
-#endif
-static inline int
-i40e_rx_scan_hw_ring(struct i40e_rx_queue *rxq)
+
+static inline unsigned
+i40e_rx_scan_hw_ring(struct i40e_rx_queue *rxq, struct rte_mbuf **rx_pkts,
+       unsigned nb_pkts)
 {
 	volatile union i40e_rx_desc *rxdp;
 	struct i40e_rx_entry *rxep;
-	struct rte_mbuf *mb;
-	uint16_t pkt_len;
-	uint64_t qword1;
-	uint32_t rx_status;
-	int32_t s[I40E_LOOK_AHEAD], nb_dd;
-	int32_t i, j, nb_rx = 0;
-	uint64_t pkt_flags;
+	unsigned i, n, tail;
 
-	rxdp = &rxq->rx_ring[rxq->rx_tail];
-	rxep = &rxq->sw_ring[rxq->rx_tail];
-
-	qword1 = rte_le_to_cpu_64(rxdp->wb.qword1.status_error_len);
-	rx_status = (qword1 & I40E_RXD_QW1_STATUS_MASK) >>
-				I40E_RXD_QW1_STATUS_SHIFT;
+	/* Wrap tail */
+	if (rxq->rx_tail >= rxq->nb_rx_desc)
+		tail = 0;
+	else
+		tail = rxq->rx_tail;
+
+	/* Stop at end of Q, for end, next read alligned at Q start */
+	n = rxq->nb_rx_desc - tail;
+	if (n < nb_pkts)
+		nb_pkts = n;
+
+	rxdp = &rxq->rx_ring[tail];
+	rte_prefetch0(rxdp);
+	rxep = &rxq->sw_ring[tail];
+	rte_prefetch0(rxep);
+
+	i = 0;
+	while (nb_pkts > 0) {
+		/* Prefetch NIC descriptors and packet list */
+		if (likely(nb_pkts > 4)) {
+			rte_prefetch0(&rxdp[4]);
+			if (likely(nb_pkts > 8)) {
+				rte_prefetch0(&rxdp[8]);
+				rte_prefetch0(&rxep[8]);
+			}
+		}
 
-	/* Make sure there is at least 1 packet to receive */
-	if (!(rx_status & (1 << I40E_RX_DESC_STATUS_DD_SHIFT)))
-		return 0;
+		for (n = 0; (nb_pkts > 0)&&(n < 8); n++, nb_pkts--, i++) {
+			uint64_t qword1;
+			uint64_t pkt_flags;
+			uint16_t pkt_len;
+			struct rte_mbuf *mb = rxep->mbuf;
+			rxep++;
 
-	/**
-	 * Scan LOOK_AHEAD descriptors at a time to determine which
-	 * descriptors reference packets that are ready to be received.
-	 */
-	for (i = 0; i < RTE_PMD_I40E_RX_MAX_BURST; i+=I40E_LOOK_AHEAD,
-			rxdp += I40E_LOOK_AHEAD, rxep += I40E_LOOK_AHEAD) {
-		/* Read desc statuses backwards to avoid race condition */
-		for (j = I40E_LOOK_AHEAD - 1; j >= 0; j--) {
+			/* Translate descriptor info to mbuf parameters */
 			qword1 = rte_le_to_cpu_64(\
-				rxdp[j].wb.qword1.status_error_len);
-			s[j] = (qword1 & I40E_RXD_QW1_STATUS_MASK) >>
-					I40E_RXD_QW1_STATUS_SHIFT;
-		}
+			rxdp->wb.qword1.status_error_len);
 
-		/* Compute how many status bits were set */
-		for (j = 0, nb_dd = 0; j < I40E_LOOK_AHEAD; j++)
-			nb_dd += s[j] & (1 << I40E_RX_DESC_STATUS_DD_SHIFT);
+			if (!(((qword1 & I40E_RXD_QW1_STATUS_MASK) >>
+				I40E_RXD_QW1_STATUS_SHIFT)
+				& (1 << I40E_RX_DESC_STATUS_DD_SHIFT)))
+				goto DONE; /* Packet not yet completed */
 
-		nb_rx += nb_dd;
 
-		/* Translate descriptor info to mbuf parameters */
-		for (j = 0; j < nb_dd; j++) {
-			mb = rxep[j].mbuf;
-			qword1 = rte_le_to_cpu_64(\
-				rxdp[j].wb.qword1.status_error_len);
 			pkt_len = ((qword1 & I40E_RXD_QW1_LENGTH_PBUF_MASK) >>
 				I40E_RXD_QW1_LENGTH_PBUF_SHIFT) - rxq->crc_len;
-			mb->data_len = pkt_len;
 			mb->pkt_len = pkt_len;
-			mb->ol_flags = 0;
-			i40e_rxd_to_vlan_tci(mb, &rxdp[j]);
+			mb->data_len = pkt_len;
+			i40e_rxd_to_vlan_tci(mb, rxdp);
 			pkt_flags = i40e_rxd_status_to_pkt_flags(qword1);
 			pkt_flags |= i40e_rxd_error_to_pkt_flags(qword1);
 			mb->packet_type =
 				i40e_rxd_pkt_type_mapping((uint8_t)((qword1 &
-						I40E_RXD_QW1_PTYPE_MASK) >>
-						I40E_RXD_QW1_PTYPE_SHIFT));
+					I40E_RXD_QW1_PTYPE_MASK) >>
+					I40E_RXD_QW1_PTYPE_SHIFT));
 			if (pkt_flags & PKT_RX_RSS_HASH)
 				mb->hash.rss = rte_le_to_cpu_32(\
-					rxdp[j].wb.qword0.hi_dword.rss);
+					rxdp->wb.qword0.hi_dword.rss);
 			if (pkt_flags & PKT_RX_FDIR)
-				pkt_flags |= i40e_rxd_build_fdir(&rxdp[j], mb);
+				pkt_flags |= i40e_rxd_build_fdir(rxdp, mb);
+			rxdp++;
 
 #ifdef RTE_LIBRTE_IEEE1588
 			pkt_flags |= i40e_get_iee15888_flags(mb, qword1);
 #endif
-			mb->ol_flags |= pkt_flags;
-
+			mb->ol_flags = pkt_flags;
 		}
-
-		for (j = 0; j < I40E_LOOK_AHEAD; j++)
-			rxq->rx_stage[i + j] = rxep[j].mbuf;
-
-		if (nb_dd != I40E_LOOK_AHEAD)
-			break;
 	}
 
-	/* Clear software ring entries */
-	for (i = 0; i < nb_rx; i++)
-		rxq->sw_ring[rxq->rx_tail + i].mbuf = NULL;
-
-	return nb_rx;
-}
-
-static inline uint16_t
-i40e_rx_fill_from_stage(struct i40e_rx_queue *rxq,
-			struct rte_mbuf **rx_pkts,
-			uint16_t nb_pkts)
-{
-	uint16_t i;
-	struct rte_mbuf **stage = &rxq->rx_stage[rxq->rx_next_avail];
-
-	nb_pkts = (uint16_t)RTE_MIN(nb_pkts, rxq->rx_nb_avail);
-
-	for (i = 0; i < nb_pkts; i++)
-		rx_pkts[i] = stage[i];
+DONE:
+	/* Copy packets to output list and clear NIC list */
+	rxep = &rxq->sw_ring[tail];
+	for (n = 0; n < i; n++) {
+		*rx_pkts++ = rxep->mbuf;
+		rxep->mbuf = NULL;
+		rxep++;
+	}
 
-	rxq->rx_nb_avail = (uint16_t)(rxq->rx_nb_avail - nb_pkts);
-	rxq->rx_next_avail = (uint16_t)(rxq->rx_next_avail + nb_pkts);
+	if (i)  /* Don't wrap if no packets received */
+		rxq->rx_tail = tail + i; /* Includes pointer wrap */
 
-	return nb_pkts;
+	return i;
 }
 
 static inline int
@@ -1076,13 +1059,15 @@ i40e_rx_alloc_bufs(struct i40e_rx_queue *rxq)
 	volatile union i40e_rx_desc *rxdp;
 	struct i40e_rx_entry *rxep;
 	struct rte_mbuf *mb;
-	uint16_t alloc_idx, i;
+	unsigned alloc_idx, i;
 	uint64_t dma_addr;
 	int diag;
 
 	/* Allocate buffers in bulk */
-	alloc_idx = (uint16_t)(rxq->rx_free_trigger -
-				(rxq->rx_free_thresh - 1));
+	alloc_idx = rxq->rx_last_pos + 1;
+	if (alloc_idx >= rxq->nb_rx_desc)
+		alloc_idx = 0;
+
 	rxep = &(rxq->sw_ring[alloc_idx]);
 	diag = rte_mempool_get_bulk(rxq->mp, (void *)rxep,
 					rxq->rx_free_thresh);
@@ -1109,84 +1094,72 @@ i40e_rx_alloc_bufs(struct i40e_rx_queue *rxq)
 		rxdp[i].read.pkt_addr = dma_addr;
 	}
 
-	/* Update rx tail regsiter */
-	rte_wmb();
-	I40E_PCI_REG_WRITE(rxq->qrx_tail, rxq->rx_free_trigger);
-
-	rxq->rx_free_trigger =
-		(uint16_t)(rxq->rx_free_trigger + rxq->rx_free_thresh);
-	if (rxq->rx_free_trigger >= rxq->nb_rx_desc)
-		rxq->rx_free_trigger = (uint16_t)(rxq->rx_free_thresh - 1);
+	rxq->rx_last_pos = alloc_idx + rxq->rx_free_thresh - 1;
 
 	return 0;
 }
 
-static inline uint16_t
-rx_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
+static uint16_t
+i40e_recv_pkts_bulk_alloc(void *rx_queue, struct rte_mbuf **rx_pkts, 
+	uint16_t nb_pkts)
 {
 	struct i40e_rx_queue *rxq = (struct i40e_rx_queue *)rx_queue;
-	uint16_t nb_rx = 0;
-
-	if (!nb_pkts)
-		return 0;
-
-	if (rxq->rx_nb_avail)
-		return i40e_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);
+	unsigned nb_rx, n_buf, n_empty, n, max_alloc;
+	uint8_t alloced = 0;
 
-	nb_rx = (uint16_t)i40e_rx_scan_hw_ring(rxq);
-	rxq->rx_next_avail = 0;
-	rxq->rx_nb_avail = nb_rx;
-	rxq->rx_tail = (uint16_t)(rxq->rx_tail + nb_rx);
+	/* Note: to calc n_buf correctly, tail wraps at start of RX operation */
+	/* Note 2: rxq->rx_last_pos is last packet buf location of NIC */
 
-	if (rxq->rx_tail > rxq->rx_free_trigger) {
-		if (i40e_rx_alloc_bufs(rxq) != 0) {
-			uint16_t i, j;
-
-			PMD_RX_LOG(DEBUG, "Rx mbuf alloc failed for "
-				   "port_id=%u, queue_id=%u",
-				   rxq->port_id, rxq->queue_id);
-			rxq->rx_nb_avail = 0;
-			rxq->rx_tail = (uint16_t)(rxq->rx_tail - nb_rx);
-			for (i = 0, j = rxq->rx_tail; i < nb_rx; i++, j++)
-				rxq->sw_ring[j].mbuf = rxq->rx_stage[i];
-
-			return 0;
+	/* Calculate current number of buffers */
+	n_buf = rxq->rx_last_pos + 1;
+	if (rxq->rx_tail <= n_buf)
+		n_buf = n_buf - rxq->rx_tail;
+	else
+		n_buf = n_buf + rxq->nb_rx_desc - rxq->rx_tail;
+
+	n = nb_pkts;
+	max_alloc = n + rxq->rx_free_thresh; /* Round up, finish in loop */
+	if (unlikely(n_buf < n)) /* Cannot receive more then buffer count */
+		n = n_buf;
+
+	/* Receive packets */
+	if (likely(n)) {
+		if ((unlikely(n > I40E_RX_INPUT_BUF_MAX))) { /* Limit rx count */
+			n = I40E_RX_INPUT_BUF_MAX;
+			max_alloc = I40E_RX_INPUT_BUF_MAX + 1;
 		}
-	}
 
-	if (rxq->rx_tail >= rxq->nb_rx_desc)
-		rxq->rx_tail = 0;
-
-	if (rxq->rx_nb_avail)
-		return i40e_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);
+		nb_rx = i40e_rx_scan_hw_ring(rxq, rx_pkts, n);
+	} else {
+		nb_rx = 0;
+		if (unlikely(!nb_pkts)) /* Input rx of 0, allow 1 alloc block */
+			max_alloc = rxq->rx_free_thresh + 1; 
+	}
 
-	return 0;
-}
+	/* Determine empty count */
+	n_empty = rxq->nb_rx_desc - n_buf + nb_rx;
 
-static uint16_t
-i40e_recv_pkts_bulk_alloc(void *rx_queue,
-			  struct rte_mbuf **rx_pkts,
-			  uint16_t nb_pkts)
-{
-	uint16_t nb_rx = 0, n, count;
+	if (n_empty > max_alloc) /* Limit alloc to rounded up rx receive count */
+		n_empty = max_alloc;
 
-	if (unlikely(nb_pkts == 0))
-		return 0;
+	/* Add empty buffers to NIC discriptor table */
+	while (n_empty > rxq->rx_free_thresh) { /* Round and/or leave 1 empty */
+		if (i40e_rx_alloc_bufs(rxq) != 0)
+			break;
 
-	if (likely(nb_pkts <= RTE_PMD_I40E_RX_MAX_BURST))
-		return rx_recv_pkts(rx_queue, rx_pkts, nb_pkts);
+		alloced = 1;
+		n_empty -= rxq->rx_free_thresh;
+	}
 
-	while (nb_pkts) {
-		n = RTE_MIN(nb_pkts, RTE_PMD_I40E_RX_MAX_BURST);
-		count = rx_recv_pkts(rx_queue, &rx_pkts[nb_rx], n);
-		nb_rx = (uint16_t)(nb_rx + count);
-		nb_pkts = (uint16_t)(nb_pkts - count);
-		if (count < n)
-			break;
+	if (alloced) {
+		/* Update NIC rx tail register */
+		rte_wmb();
+		I40E_PCI_REG_WRITE(rxq->qrx_tail, rxq->rx_last_pos);
 	}
 
-	return nb_rx;
+	return (uint16_t)nb_rx;
 }
+
 #endif /* RTE_LIBRTE_I40E_RX_ALLOW_BULK_ALLOC */
 
 uint16_t
@@ -1296,7 +1269,7 @@ i40e_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
 	nb_hold = (uint16_t)(nb_hold + rxq->nb_rx_hold);
 	if (nb_hold > rxq->rx_free_thresh) {
 		rx_id = (uint16_t) ((rx_id == 0) ?
-			(rxq->nb_rx_desc - 1) : (rx_id - 1));
+			(uint16_t)(rxq->nb_rx_desc - 1) : (rx_id - 1));
 		I40E_PCI_REG_WRITE(rxq->qrx_tail, rx_id);
 		nb_hold = 0;
 	}
@@ -1468,7 +1441,7 @@ i40e_recv_scattered_pkts(void *rx_queue,
 	nb_hold = (uint16_t)(nb_hold + rxq->nb_rx_hold);
 	if (nb_hold > rxq->rx_free_thresh) {
 		rx_id = (uint16_t)(rx_id == 0 ?
-			(rxq->nb_rx_desc - 1) : (rx_id - 1));
+			(uint16_t)(rxq->nb_rx_desc - 1) : (rx_id - 1));
 		I40E_PCI_REG_WRITE(rxq->qrx_tail, rx_id);
 		nb_hold = 0;
 	}
@@ -2578,17 +2551,6 @@ i40e_rx_queue_release_mbufs(struct i40e_rx_queue *rxq)
 			rxq->sw_ring[i].mbuf = NULL;
 		}
 	}
-#ifdef RTE_LIBRTE_I40E_RX_ALLOW_BULK_ALLOC
-	if (rxq->rx_nb_avail == 0)
-		return;
-	for (i = 0; i < rxq->rx_nb_avail; i++) {
-		struct rte_mbuf *mbuf;
-
-		mbuf = rxq->rx_stage[rxq->rx_next_avail + i];
-		rte_pktmbuf_free_seg(mbuf);
-	}
-	rxq->rx_nb_avail = 0;
-#endif /* RTE_LIBRTE_I40E_RX_ALLOW_BULK_ALLOC */
 }
 
 void
@@ -2617,9 +2579,7 @@ i40e_reset_rx_queue(struct i40e_rx_queue *rxq)
 	for (i = 0; i < RTE_PMD_I40E_RX_MAX_BURST; ++i)
 		rxq->sw_ring[rxq->nb_rx_desc + i].mbuf = &rxq->fake_mbuf;
 
-	rxq->rx_nb_avail = 0;
-	rxq->rx_next_avail = 0;
-	rxq->rx_free_trigger = (uint16_t)(rxq->rx_free_thresh - 1);
+	rxq->rx_last_pos = rxq->nb_rx_desc - 1;
 #endif /* RTE_LIBRTE_I40E_RX_ALLOW_BULK_ALLOC */
 	rxq->rx_tail = 0;
 	rxq->nb_rx_hold = 0;
diff --git a/drivers/net/i40e/i40e_rxtx.h b/drivers/net/i40e/i40e_rxtx.h
index 4385142..4146a63 100644
--- a/drivers/net/i40e/i40e_rxtx.h
+++ b/drivers/net/i40e/i40e_rxtx.h
@@ -85,34 +85,35 @@ struct i40e_rx_entry {
 struct i40e_rx_queue {
 	struct rte_mempool *mp; /**< mbuf pool to populate RX ring */
 	volatile union i40e_rx_desc *rx_ring;/**< RX ring virtual address */
-	uint64_t rx_ring_phys_addr; /**< RX ring DMA address */
 	struct i40e_rx_entry *sw_ring; /**< address of RX soft ring */
-	uint16_t nb_rx_desc; /**< number of RX descriptors */
-	uint16_t rx_free_thresh; /**< max free RX desc to hold */
-	uint16_t rx_tail; /**< current value of tail */
-	uint16_t nb_rx_hold; /**< number of held free RX desc */
-	struct rte_mbuf *pkt_first_seg; /**< first segment of current packet */
-	struct rte_mbuf *pkt_last_seg; /**< last segment of current packet */
+	volatile uint8_t *qrx_tail; /**< register address of tail */
+	unsigned nb_rx_desc; /**< number of RX descriptors */
+	unsigned rx_free_thresh; /**< max free RX desc to hold */
+	unsigned rx_tail; /**< current value of tail */
 #ifdef RTE_LIBRTE_I40E_RX_ALLOW_BULK_ALLOC
-	uint16_t rx_nb_avail; /**< number of staged packets ready */
-	uint16_t rx_next_avail; /**< index of next staged packets */
-	uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
-	struct rte_mbuf fake_mbuf; /**< dummy mbuf */
-	struct rte_mbuf *rx_stage[RTE_PMD_I40E_RX_MAX_BURST * 2];
+	unsigned rx_last_pos; /* Position of last packet buf: NIC reg value */
 #endif
 	uint8_t port_id; /**< device port ID */
 	uint8_t crc_len; /**< 0 if CRC stripped, 4 otherwise */
-	uint16_t queue_id; /**< RX queue index */
-	uint16_t reg_idx; /**< RX queue register index */
+	uint8_t hs_mode; /* Header Split mode */
 	uint8_t drop_en; /**< if not 0, set register bit */
-	volatile uint8_t *qrx_tail; /**< register address of tail */
+	uint16_t nb_rx_hold; /**< number of held free RX desc */
+	uint16_t queue_id; /**< RX queue index */
+	struct rte_mbuf *pkt_first_seg; /**< first segment of current packet */
+	struct rte_mbuf *pkt_last_seg; /**< last segment of current packet */
+
+	/* Setup and seldom used variables */
+	uint64_t rx_ring_phys_addr; /**< RX ring DMA address */
 	struct i40e_vsi *vsi; /**< the VSI this queue belongs to */
 	uint16_t rx_buf_len; /* The packet buffer size */
 	uint16_t rx_hdr_len; /* The header buffer size */
+	uint16_t reg_idx; /**< RX queue register index */
 	uint16_t max_pkt_len; /* Maximum packet length */
-	uint8_t hs_mode; /* Header Split mode */
 	bool q_set; /**< indicate if rx queue has been configured */
 	bool rx_deferred_start; /**< don't start this queue in dev start */
+#ifdef RTE_LIBRTE_I40E_RX_ALLOW_BULK_ALLOC
+	struct rte_mbuf fake_mbuf; /**< dummy mbuf */
+#endif
 };
 
 struct i40e_tx_entry {


More information about the dev mailing list