[dpdk-dev] [PATCH] net/memif: relax barrier for zero copy path

Phil Yang phil.yang at arm.com
Fri Sep 11 07:38:19 CEST 2020


Using 'rte_mb' to synchronize the shared ring head/tail between producer
and consumer will stall the pipeline and damage performance on the weak
memory model platforms, such like aarch64.

Relax the expensive barrier with c11 atomic with explicit memory
ordering can improve 3.6% performance on throughput.

Signed-off-by: Phil Yang <phil.yang at arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
---
 drivers/net/memif/rte_eth_memif.c | 35 +++++++++++++++++++++++++----------
 1 file changed, 25 insertions(+), 10 deletions(-)

diff --git a/drivers/net/memif/rte_eth_memif.c b/drivers/net/memif/rte_eth_memif.c
index c1c7e9f..a19c0f3 100644
--- a/drivers/net/memif/rte_eth_memif.c
+++ b/drivers/net/memif/rte_eth_memif.c
@@ -253,7 +253,12 @@ memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_q
 	memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
 
 	/* FIXME: improve performance */
-	while (mq->last_tail != ring->tail) {
+	/* The ring->tail acts as a guard variable between Tx and Rx
+	 * threads, so using load-acquire pairs with store-release
+	 * to synchronize it between threads.
+	 */
+	while (mq->last_tail != __atomic_load_n(&ring->tail,
+						__ATOMIC_ACQUIRE)) {
 		RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]);
 		/* Decrement refcnt and free mbuf. (current segment) */
 		rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask], -1);
@@ -455,7 +460,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	mask = ring_size - 1;
 
 	cur_slot = mq->last_tail;
-	last_slot = ring->tail;
+	/* The ring->tail acts as a guard variable between Tx and Rx
+	 * threads, so using load-acquire pairs with store-release
+	 * to synchronize it between threads.
+	 */
+	last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
 	if (cur_slot == last_slot)
 		goto refill;
 	n_slots = last_slot - cur_slot;
@@ -501,7 +510,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 /* Supply master with new buffers */
 refill:
-	head = ring->head;
+	/* The ring->head acts as a guard variable between Tx and Rx
+	 * threads, so using load-acquire pairs with store-release
+	 * to synchronize it between threads.
+	 */
+	head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
 	n_slots = ring_size - head + mq->last_tail;
 
 	if (n_slots < 32)
@@ -526,8 +539,7 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 			(uint8_t *)proc_private->regions[d0->region]->addr;
 	}
 no_free_mbufs:
-	rte_mb();
-	ring->head = head;
+	__atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
 
 	mq->n_pkts += n_rx_pkts;
 
@@ -723,8 +735,12 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	memif_free_stored_mbufs(proc_private, mq);
 
 	/* ring type always MEMIF_RING_S2M */
-	slot = ring->head;
-	n_free = ring_size - ring->head + mq->last_tail;
+	/* The ring->head acts as a guard variable between Tx and Rx
+	 * threads, so using load-acquire pairs with store-release
+	 * to synchronize it between threads.
+	 */
+	slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
+	n_free = ring_size - slot + mq->last_tail;
 
 	int used_slots;
 
@@ -778,12 +794,11 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	}
 
 no_free_slots:
-	rte_mb();
 	/* update ring pointers */
 	if (type == MEMIF_RING_S2M)
-		ring->head = slot;
+		__atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
 	else
-		ring->tail = slot;
+		__atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
 
 	/* Send interrupt, if enabled. */
 	if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
-- 
2.7.4



More information about the dev mailing list