[dpdk-dev] [PATCH] atomic: clarify use of memory barriers

Olivier Matz olivier.matz at 6wind.com
Tue May 20 11:36:28 CEST 2014


This commit introduce rte_smp_mb(), rte_smp_wmb() and rte_smp_rmb(), in
order to differentiate memory barriers used between lcores, and memory
barriers used between a lcore and a device. The patch does not provide
any functional change, the goal is to have more explicit call, making
the code more readable.

Note that on x86 CPUs, memory barriers between different cores can be
guaranted by a simple compiler barrier. If the memory is also accessed
by a device, a real memory barrier instruction has to be used.

Signed-off-by: Olivier Matz <olivier.matz at 6wind.com>
---
 app/test/test_mbuf.c                       |  2 +-
 lib/librte_eal/bsdapp/eal/eal_thread.c     |  2 +-
 lib/librte_eal/common/eal_common_launch.c  |  2 +-
 lib/librte_eal/common/include/rte_atomic.h | 24 ++++++++++++++++++++++++
 lib/librte_eal/linuxapp/eal/eal_thread.c   |  2 +-
 lib/librte_pmd_virtio/virtqueue.h          |  6 +++---
 lib/librte_pmd_xenvirt/rte_eth_xenvirt.c   |  4 ++--
 lib/librte_pmd_xenvirt/virtqueue.h         |  2 +-
 lib/librte_ring/rte_ring.h                 |  8 ++++----
 lib/librte_timer/rte_timer.c               |  8 ++++----
 10 files changed, 42 insertions(+), 18 deletions(-)

diff --git a/app/test/test_mbuf.c b/app/test/test_mbuf.c
index ba2f2d9..7945069 100644
--- a/app/test/test_mbuf.c
+++ b/app/test/test_mbuf.c
@@ -603,7 +603,7 @@ test_refcnt_master(void)
 		test_refcnt_iter(lcore, i);
 
 	refcnt_stop_slaves = 1;
-	rte_wmb();
+	rte_smp_wmb();
 
 	printf("%s finished at lcore %u\n", __func__, lcore);
 	return (0);
diff --git a/lib/librte_eal/bsdapp/eal/eal_thread.c b/lib/librte_eal/bsdapp/eal/eal_thread.c
index d2bec2e..00e1647 100644
--- a/lib/librte_eal/bsdapp/eal/eal_thread.c
+++ b/lib/librte_eal/bsdapp/eal/eal_thread.c
@@ -223,7 +223,7 @@ eal_thread_loop(__attribute__((unused)) void *arg)
 		fct_arg = lcore_config[lcore_id].arg;
 		ret = lcore_config[lcore_id].f(fct_arg);
 		lcore_config[lcore_id].ret = ret;
-		rte_wmb();
+		rte_smp_wmb();
 		lcore_config[lcore_id].state = FINISHED;
 	}
 
diff --git a/lib/librte_eal/common/eal_common_launch.c b/lib/librte_eal/common/eal_common_launch.c
index 6f7c696..a3cc282 100644
--- a/lib/librte_eal/common/eal_common_launch.c
+++ b/lib/librte_eal/common/eal_common_launch.c
@@ -57,7 +57,7 @@ rte_eal_wait_lcore(unsigned slave_id)
 	while (lcore_config[slave_id].state != WAIT &&
 	       lcore_config[slave_id].state != FINISHED);
 
-	rte_rmb();
+	rte_smp_rmb();
 
 	/* we are in finished state, go to wait state */
 	lcore_config[slave_id].state = WAIT;
diff --git a/lib/librte_eal/common/include/rte_atomic.h b/lib/librte_eal/common/include/rte_atomic.h
index ef95160..4056e53 100644
--- a/lib/librte_eal/common/include/rte_atomic.h
+++ b/lib/librte_eal/common/include/rte_atomic.h
@@ -82,6 +82,30 @@ extern "C" {
 #define	rte_rmb() _mm_lfence()
 
 /**
+ * General memory barrier between 2 lcore.
+ *
+ * On Intel CPUs, it's already guaranted by the hardware so only a
+ * compiler barrier is needed.
+ */
+#define	rte_smp_mb() rte_compiler_barrier()
+
+/**
+ * Write memory barrier between 2 lcores.
+ *
+ * On Intel CPUs, it's already guaranted by the hardware so only a
+ * compiler barrier is needed.
+ */
+#define	rte_smp_wmb() rte_compiler_barrier()
+
+/**
+ * Read memory barrier between 2 lcores.
+ *
+ * On Intel CPUs, it's already guaranted by the hardware so only a
+ * compiler barrier is needed.
+ */
+#define	rte_smp_rmb() rte_compiler_barrier()
+
+/**
  * Compiler barrier.
  *
  * Guarantees that operation reordering does not occur at compile time 
diff --git a/lib/librte_eal/linuxapp/eal/eal_thread.c b/lib/librte_eal/linuxapp/eal/eal_thread.c
index bf77873..2d839e6 100644
--- a/lib/librte_eal/linuxapp/eal/eal_thread.c
+++ b/lib/librte_eal/linuxapp/eal/eal_thread.c
@@ -223,7 +223,7 @@ eal_thread_loop(__attribute__((unused)) void *arg)
 		fct_arg = lcore_config[lcore_id].arg;
 		ret = lcore_config[lcore_id].f(fct_arg);
 		lcore_config[lcore_id].ret = ret;
-		rte_wmb();
+		rte_smp_wmb();
 		lcore_config[lcore_id].state = FINISHED;
 	}
 
diff --git a/lib/librte_pmd_virtio/virtqueue.h b/lib/librte_pmd_virtio/virtqueue.h
index 11f908c..f45dd47 100644
--- a/lib/librte_pmd_virtio/virtqueue.h
+++ b/lib/librte_pmd_virtio/virtqueue.h
@@ -46,9 +46,9 @@
 #include "virtio_ring.h"
 #include "virtio_logs.h"
 
-#define mb()  rte_mb()
-#define wmb() rte_wmb()
-#define rmb() rte_rmb()
+#define mb()  rte_smp_mb()
+#define wmb() rte_smp_wmb()
+#define rmb() rte_smp_rmb()
 
 #ifdef RTE_PMD_PACKET_PREFETCH
 #define rte_packet_prefetch(p)  rte_prefetch1(p)
diff --git a/lib/librte_pmd_xenvirt/rte_eth_xenvirt.c b/lib/librte_pmd_xenvirt/rte_eth_xenvirt.c
index 4f8b712..7ebde3b 100644
--- a/lib/librte_pmd_xenvirt/rte_eth_xenvirt.c
+++ b/lib/librte_pmd_xenvirt/rte_eth_xenvirt.c
@@ -98,7 +98,7 @@ eth_xenvirt_rx(void *q, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
 
 	nb_used = VIRTQUEUE_NUSED(rxvq);
 
-	rte_compiler_barrier(); /* rmb */
+	rte_smp_rmb();
 	num = (uint16_t)(likely(nb_used <= nb_pkts) ? nb_used : nb_pkts);
 	num = (uint16_t)(likely(num <= VIRTIO_MBUF_BURST_SZ) ? num : VIRTIO_MBUF_BURST_SZ);
 	if (unlikely(num == 0)) return 0;
@@ -149,7 +149,7 @@ eth_xenvirt_tx(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
 	nb_used = VIRTQUEUE_NUSED(txvq);
 
-	rte_compiler_barrier();   /* rmb */
+	rte_smp_rmb();
 
 	num = (uint16_t)(likely(nb_used <= VIRTIO_MBUF_BURST_SZ) ? nb_used : VIRTIO_MBUF_BURST_SZ);
 	num = virtqueue_dequeue_burst(txvq, snd_pkts, len, num);
diff --git a/lib/librte_pmd_xenvirt/virtqueue.h b/lib/librte_pmd_xenvirt/virtqueue.h
index f36030f..59b4469 100644
--- a/lib/librte_pmd_xenvirt/virtqueue.h
+++ b/lib/librte_pmd_xenvirt/virtqueue.h
@@ -150,7 +150,7 @@ vq_ring_update_avail(struct virtqueue *vq, uint16_t desc_idx)
 	 */
 	avail_idx = (uint16_t)(vq->vq_ring.avail->idx & (vq->vq_nentries - 1));
 	vq->vq_ring.avail->ring[avail_idx] = desc_idx;
-	rte_compiler_barrier();  /* wmb , for IA memory model barrier is enough*/
+	rte_smp_wmb();
 	vq->vq_ring.avail->idx++;
 }
 
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index da54e34..1cbf8a9 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -451,7 +451,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 
 	/* write entries in ring */
 	ENQUEUE_PTRS();
-	rte_compiler_barrier();
+	rte_smp_wmb();
 
 	/* if we exceed the watermark */
 	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
@@ -537,7 +537,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 
 	/* write entries in ring */
 	ENQUEUE_PTRS();
-	rte_compiler_barrier();
+	rte_smp_wmb();
 
 	/* if we exceed the watermark */
 	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
@@ -628,7 +628,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 
 	/* copy in table */
 	DEQUEUE_PTRS();
-	rte_compiler_barrier();
+	rte_smp_rmb();
 
 	/*
 	 * If there are other dequeues in progress that preceded us,
@@ -703,7 +703,7 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
 
 	/* copy in table */
 	DEQUEUE_PTRS();
-	rte_compiler_barrier();
+	rte_smp_rmb();
 
 	__RING_STAT_ADD(r, deq_success, n);
 	r->cons.tail = cons_next;
diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c
index 884ee0e..520360e 100755
--- a/lib/librte_timer/rte_timer.c
+++ b/lib/librte_timer/rte_timer.c
@@ -396,7 +396,7 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 
 	/* update state: as we are in CONFIG state, only us can modify
 	 * the state so we don't need to use cmpset() here */
-	rte_wmb();
+	rte_smp_wmb();
 	status.state = RTE_TIMER_PENDING;
 	status.owner = (int16_t)tim_lcore;
 	tim->status.u32 = status.u32;
@@ -462,7 +462,7 @@ rte_timer_stop(struct rte_timer *tim)
 	}
 
 	/* mark timer as stopped */
-	rte_wmb();
+	rte_smp_wmb();
 	status.state = RTE_TIMER_STOP;
 	status.owner = RTE_TIMER_NO_OWNER;
 	tim->status.u32 = status.u32;
@@ -558,14 +558,14 @@ void rte_timer_manage(void)
 			__TIMER_STAT_ADD(pending, -1);
 			status.state = RTE_TIMER_STOP;
 			status.owner = RTE_TIMER_NO_OWNER;
-			rte_wmb();
+			rte_smp_wmb();
 			tim->status.u32 = status.u32;
 		}
 		else {
 			/* keep it in list and mark timer as pending */
 			status.state = RTE_TIMER_PENDING;
 			status.owner = (int16_t)lcore_id;
-			rte_wmb();
+			rte_smp_wmb();
 			tim->status.u32 = status.u32;
 			__rte_timer_reset(tim, cur_time + tim->period,
 					tim->period, lcore_id, tim->f, tim->arg, 1);
-- 
1.9.2



More information about the dev mailing list