[dpdk-dev] [PATCH v2] ring: guarantee ordering of cons/prod loading when doing

Jia He hejianet at gmail.com
Thu Nov 2 09:43:30 CET 2017


We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
As for the possible race condition, please refer to [1].

Furthermore, there are 2 options as suggested by Jerin:
1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).
CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
default it is n;

The reason why providing 2 options is due to the performance benchmark
difference in different arm machines, please refer to [3].

Already fuctionally tested on the machines as follows:
on X86(passed the compilation)
on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n

[1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
[2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
[3] http://dpdk.org/ml/archives/dev/2017-October/080861.html

---
Changelog:
V2: let users choose whether using load_acquire/store_release
V1: rte_smp_rmb() between 2 loads

Signed-off-by: Jia He <hejianet at gmail.com>
Signed-off-by: jie2.liu at hxt-semitech.com
Signed-off-by: bing.zhao at hxt-semitech.com
Signed-off-by: jia.he at hxt-semitech.com
Suggested-by: jerin.jacob at caviumnetworks.com
---
 lib/librte_ring/Makefile           |  4 +++-
 lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
 lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
 4 files changed, 127 insertions(+), 8 deletions(-)
 create mode 100644 lib/librte_ring/rte_ring_arm64.h
 create mode 100644 lib/librte_ring/rte_ring_generic.h

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index e34d9d9..fa57a86 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -45,6 +45,8 @@ LIBABIVER := 1
 SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 
 # install includes
-SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+					rte_ring_arm64.h \
+					rte_ring_generic.h
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 5e9b3b7..943b1f9 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -103,6 +103,18 @@ extern "C" {
 #include <rte_memzone.h>
 #include <rte_pause.h>
 
+/* In those strong memory models (e.g. x86), there is no need to add rmb()
+ * between load and load.
+ * In those weak models(powerpc/arm), there are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direcion load_acquire/store_release barrier
+ * It depends on performance test results. */
+#ifdef RTE_ARCH_ARM64
+#include "rte_ring_arm64.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
 #define RTE_TAILQ_RING_NAME "RTE_RING"
 
 enum rte_ring_queue_behavior {
@@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
 		while (unlikely(ht->tail != old_val))
 			rte_pause();
 
-	ht->tail = new_val;
+	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
 }
 
 /**
@@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = r->prod.head;
+		*old_head = arch_rte_atomic_load(&r->prod.head,
+						__ATOMIC_ACQUIRE);
 		const uint32_t cons_tail = r->cons.tail;
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
@@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
 		if (is_sp)
 			r->prod.head = *new_head, success = 1;
 		else
-			success = rte_atomic32_cmpset(&r->prod.head,
-					*old_head, *new_head);
+			success = arch_rte_atomic32_cmpset(&r->prod.head,
+					old_head, *new_head,
+					0, __ATOMIC_ACQUIRE,
+					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }
@@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
 		goto end;
 
 	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
 	rte_smp_wmb();
+#endif
 
 	update_tail(&r->prod, prod_head, prod_next, is_sp);
 end:
@@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 		/* Restore n as it may change every loop */
 		n = max;
 
-		*old_head = r->cons.head;
+		*old_head = arch_rte_atomic_load(&r->cons.head,
+						__ATOMIC_ACQUIRE);
 		const uint32_t prod_tail = r->prod.tail;
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
@@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 		if (is_sc)
 			r->cons.head = *new_head, success = 1;
 		else
-			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
-					*new_head);
+			success = arch_rte_atomic32_cmpset(&r->cons.head,
+					old_head, *new_head,
+					0, __ATOMIC_ACQUIRE,
+					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }
@@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
 		goto end;
 
 	DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
 	rte_smp_rmb();
+#endif
 
 	update_tail(&r->cons, cons_head, cons_next, is_sc);
 
diff --git a/lib/librte_ring/rte_ring_arm64.h b/lib/librte_ring/rte_ring_arm64.h
new file mode 100644
index 0000000..09438e5
--- /dev/null
+++ b/lib/librte_ring/rte_ring_arm64.h
@@ -0,0 +1,48 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of hxt-semitech nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_ARM64_H_
+#define _RTE_RING_ARM64_H_
+
+#define arch_rte_atomic_store __atomic_store_n
+#define arch_rte_atomic32_cmpset __atomic_compare_exchange_n
+#ifdef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
+#define arch_rte_atomic_load __atomic_load_n
+#else
+#define arch_rte_atomic_load(a, b) \
+	*(a); \
+	rte_smp_rmb()
+
+#endif
+
+#endif /* _RTE_RING_ARM64_H_ */
+
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
new file mode 100644
index 0000000..4fb777c
--- /dev/null
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -0,0 +1,45 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of hxt-semitech nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_GENERIC_H_
+#define _RTE_RING_GENERIC_H_
+
+#define arch_rte_atomic_load(a, b) *(a)
+#define arch_rte_atomic_store(a, b, c) \
+do { \
+	*(a) = b; \
+} while(0)
+#define arch_rte_atomic32_cmpset(a, b, c, d, e, f)  \
+	rte_atomic32_cmpset(a, *(b), c)
+
+#endif /* _RTE_RING_GENERIC_H_ */
+
-- 
2.7.4



More information about the dev mailing list