[dpdk-stable] patch 'eventdev: relax SMP barriers with C11 atomics' has been queued to stable release 19.11.4

luca.boccassi at gmail.com luca.boccassi at gmail.com
Fri Jul 24 13:59:01 CEST 2020


Hi,

FYI, your patch has been queued to stable release 19.11.4

Note it hasn't been pushed to http://dpdk.org/browse/dpdk-stable yet.
It will be pushed if I get no objections before 07/26/20. So please
shout if anyone has objections.

Also note that after the patch there's a diff of the upstream commit vs the
patch applied to the branch. This will indicate if there was any rebasing
needed to apply to the stable branch. If there were code changes for rebasing
(ie: not only metadata diffs), please double check that the rebase was
correctly done.

Thanks.

Luca Boccassi

---
>From 0a4303995ca42e53c942a6f79a4850f7e8df429b Mon Sep 17 00:00:00 2001
From: Phil Yang <phil.yang at arm.com>
Date: Tue, 7 Jul 2020 23:54:53 +0800
Subject: [PATCH] eventdev: relax SMP barriers with C11 atomics

[ upstream commit 030c2164117b87b7e81d06be9c228b555b00963e ]

The impl_opaque field is shared between the timer arm and cancel
operations. Meanwhile, the state flag acts as a guard variable to
make sure the update of impl_opaque is synchronized. The original
code uses rte_smp barriers to achieve that. This patch uses C11
atomics with an explicit one-way memory barrier instead of full
barriers rte_smp_w/rmb() to avoid the unnecessary barrier on aarch64.

Since compilers can generate the same instructions for volatile and
non-volatile variable in C11 __atomics built-ins, so remain the volatile
keyword in front of state enum to avoid the ABI break issue.

Signed-off-by: Phil Yang <phil.yang at arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
Acked-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
---
 lib/librte_eventdev/rte_event_timer_adapter.c | 55 +++++++++++++------
 1 file changed, 37 insertions(+), 18 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index af5087132..36c13fe3b 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -625,7 +625,8 @@ swtim_callback(struct rte_timer *tim)
 		sw->expired_timers[sw->n_expired_timers++] = tim;
 		sw->stats.evtim_exp_count++;
 
-		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+		__atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED,
+				__ATOMIC_RELEASE);
 	}
 
 	if (event_buffer_batch_ready(&sw->buffer)) {
@@ -1016,6 +1017,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
 	int n_lcores;
 	/* Timer list for this lcore is not in use. */
 	uint16_t exp_state = 0;
+	enum rte_event_timer_state n_state;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1056,30 +1058,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
 	}
 
 	for (i = 0; i < nb_evtims; i++) {
-		/* Don't modify the event timer state in these cases */
-		if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
+		n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+		if (n_state == RTE_EVENT_TIMER_ARMED) {
 			rte_errno = EALREADY;
 			break;
-		} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
-			     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+		} else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED ||
+			     n_state == RTE_EVENT_TIMER_CANCELED)) {
 			rte_errno = EINVAL;
 			break;
 		}
 
 		ret = check_timeout(evtims[i], adapter);
 		if (unlikely(ret == -1)) {
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR_TOOLATE,
+					__ATOMIC_RELAXED);
 			rte_errno = EINVAL;
 			break;
 		} else if (unlikely(ret == -2)) {
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR_TOOEARLY,
+					__ATOMIC_RELAXED);
 			rte_errno = EINVAL;
 			break;
 		}
 
 		if (unlikely(check_destination_event_queue(evtims[i],
 							   adapter) < 0)) {
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR,
+					__ATOMIC_RELAXED);
 			rte_errno = EINVAL;
 			break;
 		}
@@ -1095,13 +1103,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
 					  SINGLE, lcore_id, NULL, evtims[i]);
 		if (ret < 0) {
 			/* tim was in RUNNING or CONFIG state */
-			evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+			__atomic_store_n(&evtims[i]->state,
+					RTE_EVENT_TIMER_ERROR,
+					__ATOMIC_RELEASE);
 			break;
 		}
 
-		rte_smp_wmb();
 		EVTIM_LOG_DBG("armed an event timer");
-		evtims[i]->state = RTE_EVENT_TIMER_ARMED;
+		/* RELEASE ordering guarantees the adapter specific value
+		 * changes observed before the update of state.
+		 */
+		__atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED,
+				__ATOMIC_RELEASE);
 	}
 
 	if (i < nb_evtims)
@@ -1128,6 +1141,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 	struct rte_timer *timp;
 	uint64_t opaque;
 	struct swtim *sw = swtim_pmd_priv(adapter);
+	enum rte_event_timer_state n_state;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 	/* Check that the service is running. */
@@ -1139,16 +1153,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 
 	for (i = 0; i < nb_evtims; i++) {
 		/* Don't modify the event timer state in these cases */
-		if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
+		/* ACQUIRE ordering guarantees the access of implementation
+		 * specific opaque data under the correct state.
+		 */
+		n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+		if (n_state == RTE_EVENT_TIMER_CANCELED) {
 			rte_errno = EALREADY;
 			break;
-		} else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) {
+		} else if (n_state != RTE_EVENT_TIMER_ARMED) {
 			rte_errno = EINVAL;
 			break;
 		}
 
-		rte_smp_rmb();
-
 		opaque = evtims[i]->impl_opaque[0];
 		timp = (struct rte_timer *)(uintptr_t)opaque;
 		RTE_ASSERT(timp != NULL);
@@ -1162,9 +1178,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 
 		rte_mempool_put(sw->tim_pool, (void **)timp);
 
-		evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
-
-		rte_smp_wmb();
+		/* The RELEASE ordering here pairs with atomic ordering
+		 * to make sure the state update data observed between
+		 * threads.
+		 */
+		__atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED,
+				__ATOMIC_RELEASE);
 	}
 
 	return i;
-- 
2.20.1

---
  Diff of the applied patch vs upstream commit (please double-check if non-empty:
---
--- -	2020-07-24 12:53:52.515148907 +0100
+++ 0103-eventdev-relax-SMP-barriers-with-C11-atomics.patch	2020-07-24 12:53:48.347007578 +0100
@@ -1,8 +1,10 @@
-From 030c2164117b87b7e81d06be9c228b555b00963e Mon Sep 17 00:00:00 2001
+From 0a4303995ca42e53c942a6f79a4850f7e8df429b Mon Sep 17 00:00:00 2001
 From: Phil Yang <phil.yang at arm.com>
 Date: Tue, 7 Jul 2020 23:54:53 +0800
 Subject: [PATCH] eventdev: relax SMP barriers with C11 atomics
 
+[ upstream commit 030c2164117b87b7e81d06be9c228b555b00963e ]
+
 The impl_opaque field is shared between the timer arm and cancel
 operations. Meanwhile, the state flag acts as a guard variable to
 make sure the update of impl_opaque is synchronized. The original
@@ -14,8 +16,6 @@
 non-volatile variable in C11 __atomics built-ins, so remain the volatile
 keyword in front of state enum to avoid the ABI break issue.
 
-Cc: stable at dpdk.org
-
 Signed-off-by: Phil Yang <phil.yang at arm.com>
 Reviewed-by: Dharmik Thakkar <dharmik.thakkar at arm.com>
 Reviewed-by: Ruifeng Wang <ruifeng.wang at arm.com>
@@ -25,10 +25,10 @@
  1 file changed, 37 insertions(+), 18 deletions(-)
 
 diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
-index aa01b4d9f..4c5e49ea3 100644
+index af5087132..36c13fe3b 100644
 --- a/lib/librte_eventdev/rte_event_timer_adapter.c
 +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
-@@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim)
+@@ -625,7 +625,8 @@ swtim_callback(struct rte_timer *tim)
  		sw->expired_timers[sw->n_expired_timers++] = tim;
  		sw->stats.evtim_exp_count++;
  
@@ -38,7 +38,7 @@
  	}
  
  	if (event_buffer_batch_ready(&sw->buffer)) {
-@@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+@@ -1016,6 +1017,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
  	int n_lcores;
  	/* Timer list for this lcore is not in use. */
  	uint16_t exp_state = 0;
@@ -46,7 +46,7 @@
  
  #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
  	/* Check that the service is running. */
-@@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+@@ -1056,30 +1058,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
  	}
  
  	for (i = 0; i < nb_evtims; i++) {
@@ -90,7 +90,7 @@
  			rte_errno = EINVAL;
  			break;
  		}
-@@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+@@ -1095,13 +1103,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
  					  SINGLE, lcore_id, NULL, evtims[i]);
  		if (ret < 0) {
  			/* tim was in RUNNING or CONFIG state */
@@ -112,7 +112,7 @@
  	}
  
  	if (i < nb_evtims)
-@@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
+@@ -1128,6 +1141,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
  	struct rte_timer *timp;
  	uint64_t opaque;
  	struct swtim *sw = swtim_pmd_priv(adapter);
@@ -120,7 +120,7 @@
  
  #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
  	/* Check that the service is running. */
-@@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
+@@ -1139,16 +1153,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
  
  	for (i = 0; i < nb_evtims; i++) {
  		/* Don't modify the event timer state in these cases */
@@ -143,7 +143,7 @@
  		opaque = evtims[i]->impl_opaque[0];
  		timp = (struct rte_timer *)(uintptr_t)opaque;
  		RTE_ASSERT(timp != NULL);
-@@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
+@@ -1162,9 +1178,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
  
  		rte_mempool_put(sw->tim_pool, (void **)timp);
  


More information about the stable mailing list