[v2] lib/timer: relax barrier for status update

Message ID 1587398752-9345-1-git-send-email-phil.yang@arm.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series [v2] lib/timer: relax barrier for status update |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-nxp-Performance success Performance Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/Intel-compilation success Compilation OK
ci/travis-robot warning Travis build: failed
ci/iol-testing fail Testing issues

Commit Message

Phil Yang April 20, 2020, 4:05 p.m. UTC
  Volatile has no ordering semantics. The rte_timer structure defines
timer status as a volatile variable and uses the rte_r/wmb barrier
to guarantee inter-thread visibility.

This patch optimized the volatile operation with c11 atomic operations
and one-way barrier to save the performance penalty. According to the
timer_perf_autotest benchmarking results, this patch can uplift 10%~16%
timer appending performance, 3%~20% timer resetting performance and 45%
timer callbacks scheduling performance on aarch64 and no loss in
performance for x86.

Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>

---
This patch depends on patch:
http://patchwork.dpdk.org/patch/65997/

v2:
1. Changed the memory ordering comment in timer_set_config_state.
2. It is still using built-ins as the wrapper functions for C11 built-ins
are not defined yet.

 lib/librte_timer/rte_timer.c | 85 ++++++++++++++++++++++++++++++--------------
 lib/librte_timer/rte_timer.h |  2 +-
 2 files changed, 60 insertions(+), 27 deletions(-)
  

Comments

Honnappa Nagarahalli April 23, 2020, 8:06 p.m. UTC | #1
Hi Erik,

> Subject: [PATCH v2] lib/timer: relax barrier for status update
> 
> Volatile has no ordering semantics. The rte_timer structure defines timer
> status as a volatile variable and uses the rte_r/wmb barrier to guarantee
> inter-thread visibility.
> 
> This patch optimized the volatile operation with c11 atomic operations and
> one-way barrier to save the performance penalty. According to the
> timer_perf_autotest benchmarking results, this patch can uplift 10%~16%
> timer appending performance, 3%~20% timer resetting performance and 45%
> timer callbacks scheduling performance on aarch64 and no loss in
> performance for x86.
> 
> Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Signed-off-by: Phil Yang <phil.yang@arm.com>
> Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> 
> ---
> This patch depends on patch:
> http://patchwork.dpdk.org/patch/65997/
> 
> v2:
> 1. Changed the memory ordering comment in timer_set_config_state.
> 2. It is still using built-ins as the wrapper functions for C11 built-ins are not
> defined yet.
It is too late to get the wrapper functions done for 20.05. It was decided in yesterday's tech board meeting to go ahead with C11 atomic built-ins (since there is lot of code in DPDK that uses C11 built-ins). If there are no further comments, can you please provide your ack?

> 
>  lib/librte_timer/rte_timer.c | 85 ++++++++++++++++++++++++++++++-----------
> ---
>  lib/librte_timer/rte_timer.h |  2 +-
>  2 files changed, 60 insertions(+), 27 deletions(-)
> 
> diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c index
> 269e921..ba17216 100644
> --- a/lib/librte_timer/rte_timer.c
> +++ b/lib/librte_timer/rte_timer.c
> @@ -10,7 +10,6 @@
>  #include <assert.h>
>  #include <sys/queue.h>
> 
> -#include <rte_atomic.h>
>  #include <rte_common.h>
>  #include <rte_cycles.h>
>  #include <rte_eal_memconfig.h>
> @@ -218,7 +217,7 @@ rte_timer_init(struct rte_timer *tim)
> 
>  	status.state = RTE_TIMER_STOP;
>  	status.owner = RTE_TIMER_NO_OWNER;
> -	tim->status.u32 = status.u32;
> +	__atomic_store_n(&tim->status.u32, status.u32,
> __ATOMIC_RELAXED);
>  }
> 
>  /*
> @@ -239,9 +238,9 @@ timer_set_config_state(struct rte_timer *tim,
> 
>  	/* wait that the timer is in correct status before update,
>  	 * and mark it as being configured */
> -	while (success == 0) {
> -		prev_status.u32 = tim->status.u32;
> +	prev_status.u32 = __atomic_load_n(&tim->status.u32,
> __ATOMIC_RELAXED);
> 
> +	while (success == 0) {
>  		/* timer is running on another core
>  		 * or ready to run on local core, exit
>  		 */
> @@ -258,9 +257,15 @@ timer_set_config_state(struct rte_timer *tim,
>  		 * mark it atomically as being configured */
>  		status.state = RTE_TIMER_CONFIG;
>  		status.owner = (int16_t)lcore_id;
> -		success = rte_atomic32_cmpset(&tim->status.u32,
> -					      prev_status.u32,
> -					      status.u32);
> +		/* CONFIG states are acting as locked states. If the
> +		 * timer is in CONFIG state, the state cannot be changed
> +		 * by other threads. So, we should use ACQUIRE here.
> +		 */
> +		success = __atomic_compare_exchange_n(&tim->status.u32,
> +					      &prev_status.u32,
> +					      status.u32, 0,
> +					      __ATOMIC_ACQUIRE,
> +					      __ATOMIC_RELAXED);
>  	}
> 
>  	ret_prev_status->u32 = prev_status.u32; @@ -279,20 +284,27 @@
> timer_set_running_state(struct rte_timer *tim)
> 
>  	/* wait that the timer is in correct status before update,
>  	 * and mark it as running */
> -	while (success == 0) {
> -		prev_status.u32 = tim->status.u32;
> +	prev_status.u32 = __atomic_load_n(&tim->status.u32,
> __ATOMIC_RELAXED);
> 
> +	while (success == 0) {
>  		/* timer is not pending anymore */
>  		if (prev_status.state != RTE_TIMER_PENDING)
>  			return -1;
> 
>  		/* here, we know that timer is stopped or pending,
> -		 * mark it atomically as being configured */
> +		 * mark it atomically as being running
> +		 */
>  		status.state = RTE_TIMER_RUNNING;
>  		status.owner = (int16_t)lcore_id;
> -		success = rte_atomic32_cmpset(&tim->status.u32,
> -					      prev_status.u32,
> -					      status.u32);
> +		/* RUNNING states are acting as locked states. If the
> +		 * timer is in RUNNING state, the state cannot be changed
> +		 * by other threads. So, we should use ACQUIRE here.
> +		 */
> +		success = __atomic_compare_exchange_n(&tim->status.u32,
> +					      &prev_status.u32,
> +					      status.u32, 0,
> +					      __ATOMIC_ACQUIRE,
> +					      __ATOMIC_RELAXED);
>  	}
> 
>  	return 0;
> @@ -520,10 +532,12 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t
> expire,
> 
>  	/* update state: as we are in CONFIG state, only us can modify
>  	 * the state so we don't need to use cmpset() here */
> -	rte_wmb();
>  	status.state = RTE_TIMER_PENDING;
>  	status.owner = (int16_t)tim_lcore;
> -	tim->status.u32 = status.u32;
> +	/* The "RELEASE" ordering guarantees the memory operations above
> +	 * the status update are observed before the update by all threads
> +	 */
> +	__atomic_store_n(&tim->status.u32, status.u32, __ATOMIC_RELEASE);
> 
>  	if (tim_lcore != lcore_id || !local_is_locked)
>  		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
> @@ -600,10 +614,12 @@ __rte_timer_stop(struct rte_timer *tim, int
> local_is_locked,
>  	}
> 
>  	/* mark timer as stopped */
> -	rte_wmb();
>  	status.state = RTE_TIMER_STOP;
>  	status.owner = RTE_TIMER_NO_OWNER;
> -	tim->status.u32 = status.u32;
> +	/* The "RELEASE" ordering guarantees the memory operations above
> +	 * the status update are observed before the update by all threads
> +	 */
> +	__atomic_store_n(&tim->status.u32, status.u32, __ATOMIC_RELEASE);
> 
>  	return 0;
>  }
> @@ -637,7 +653,8 @@ rte_timer_stop_sync(struct rte_timer *tim)  int
> rte_timer_pending(struct rte_timer *tim)  {
> -	return tim->status.state == RTE_TIMER_PENDING;
> +	return __atomic_load_n(&tim->status.state,
> +				__ATOMIC_RELAXED) ==
> RTE_TIMER_PENDING;
>  }
> 
>  /* must be called periodically, run all timer that expired */ @@ -739,8
> +756,12 @@ __rte_timer_manage(struct rte_timer_data *timer_data)
>  			/* remove from done list and mark timer as stopped
> */
>  			status.state = RTE_TIMER_STOP;
>  			status.owner = RTE_TIMER_NO_OWNER;
> -			rte_wmb();
> -			tim->status.u32 = status.u32;
> +			/* The "RELEASE" ordering guarantees the memory
> +			 * operations above the status update are observed
> +			 * before the update by all threads
> +			 */
> +			__atomic_store_n(&tim->status.u32, status.u32,
> +				__ATOMIC_RELEASE);
>  		}
>  		else {
>  			/* keep it in list and mark timer as pending */ @@ -
> 748,8 +769,12 @@ __rte_timer_manage(struct rte_timer_data *timer_data)
>  			status.state = RTE_TIMER_PENDING;
>  			__TIMER_STAT_ADD(priv_timer, pending, 1);
>  			status.owner = (int16_t)lcore_id;
> -			rte_wmb();
> -			tim->status.u32 = status.u32;
> +			/* The "RELEASE" ordering guarantees the memory
> +			 * operations above the status update are observed
> +			 * before the update by all threads
> +			 */
> +			__atomic_store_n(&tim->status.u32, status.u32,
> +				__ATOMIC_RELEASE);
>  			__rte_timer_reset(tim, tim->expire + tim->period,
>  				tim->period, lcore_id, tim->f, tim->arg, 1,
>  				timer_data);
> @@ -919,8 +944,12 @@ rte_timer_alt_manage(uint32_t timer_data_id,
>  			/* remove from done list and mark timer as stopped
> */
>  			status.state = RTE_TIMER_STOP;
>  			status.owner = RTE_TIMER_NO_OWNER;
> -			rte_wmb();
> -			tim->status.u32 = status.u32;
> +			/* The "RELEASE" ordering guarantees the memory
> +			 * operations above the status update are observed
> +			 * before the update by all threads
> +			 */
> +			__atomic_store_n(&tim->status.u32, status.u32,
> +				__ATOMIC_RELEASE);
>  		} else {
>  			/* keep it in list and mark timer as pending */
>  			rte_spinlock_lock(
> @@ -928,8 +957,12 @@ rte_timer_alt_manage(uint32_t timer_data_id,
>  			status.state = RTE_TIMER_PENDING;
>  			__TIMER_STAT_ADD(data->priv_timer, pending, 1);
>  			status.owner = (int16_t)this_lcore;
> -			rte_wmb();
> -			tim->status.u32 = status.u32;
> +			/* The "RELEASE" ordering guarantees the memory
> +			 * operations above the status update are observed
> +			 * before the update by all threads
> +			 */
> +			__atomic_store_n(&tim->status.u32, status.u32,
> +				__ATOMIC_RELEASE);
>  			__rte_timer_reset(tim, tim->expire + tim->period,
>  				tim->period, this_lcore, tim->f, tim->arg, 1,
>  				data);
> diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h index
> c6b3d45..df533fa 100644
> --- a/lib/librte_timer/rte_timer.h
> +++ b/lib/librte_timer/rte_timer.h
> @@ -101,7 +101,7 @@ struct rte_timer
>  {
>  	uint64_t expire;       /**< Time when timer expire. */
>  	struct rte_timer *sl_next[MAX_SKIPLIST_DEPTH];
> -	volatile union rte_timer_status status; /**< Status of timer. */
> +	union rte_timer_status status; /**< Status of timer. */
>  	uint64_t period;       /**< Period of timer (0 if not periodic). */
>  	rte_timer_cb_t f;      /**< Callback function. */
>  	void *arg;             /**< Argument to callback function. */
> --
> 2.7.4
  
Carrillo, Erik G April 24, 2020, 1:26 a.m. UTC | #2
Hi Honnappa,

> -----Original Message-----
> From: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Sent: Thursday, April 23, 2020 3:06 PM
> To: Phil Yang <Phil.Yang@arm.com>; Carrillo, Erik G
> <erik.g.carrillo@intel.com>; rsanford@akamai.com; dev@dpdk.org
> Cc: thomas@monjalon.net; david.marchand@redhat.com; Ananyev,
> Konstantin <konstantin.ananyev@intel.com>; jerinj@marvell.com;
> hemant.agrawal@nxp.com; Gavin Hu <Gavin.Hu@arm.com>; nd
> <nd@arm.com>; Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>;
> nd <nd@arm.com>
> Subject: RE: [PATCH v2] lib/timer: relax barrier for status update
> 
> Hi Erik,
> 
> > Subject: [PATCH v2] lib/timer: relax barrier for status update
> >
> > Volatile has no ordering semantics. The rte_timer structure defines
> > timer status as a volatile variable and uses the rte_r/wmb barrier to
> > guarantee inter-thread visibility.
> >
> > This patch optimized the volatile operation with c11 atomic operations
> > and one-way barrier to save the performance penalty. According to the
> > timer_perf_autotest benchmarking results, this patch can uplift
> > 10%~16% timer appending performance, 3%~20% timer resetting
> > performance and 45% timer callbacks scheduling performance on aarch64
> > and no loss in performance for x86.
> >
> > Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > Signed-off-by: Phil Yang <phil.yang@arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> >
> > ---
> > This patch depends on patch:
> > http://patchwork.dpdk.org/patch/65997/
> >
> > v2:
> > 1. Changed the memory ordering comment in timer_set_config_state.
> > 2. It is still using built-ins as the wrapper functions for C11
> > built-ins are not defined yet.
> It is too late to get the wrapper functions done for 20.05. It was decided in
> yesterday's tech board meeting to go ahead with C11 atomic built-ins (since
> there is lot of code in DPDK that uses C11 built-ins). If there are no further
> comments, can you please provide your ack?
> 

Ok, thanks for letting me know.  Based on that decision,  I've taken another look 
and done some testing and it looks good to me.  I've made one comment in-line
below and acked it.

<... snipped ...>

> > @@ -258,9 +257,15 @@ timer_set_config_state(struct rte_timer *tim,
> >  		 * mark it atomically as being configured */
> >  		status.state = RTE_TIMER_CONFIG;
> >  		status.owner = (int16_t)lcore_id;
> > -		success = rte_atomic32_cmpset(&tim->status.u32,
> > -					      prev_status.u32,
> > -					      status.u32);
> > +		/* CONFIG states are acting as locked states. If the
> > +		 * timer is in CONFIG state, the state cannot be changed
> > +		 * by other threads. So, we should use ACQUIRE here.
> > +		 */
> > +		success = __atomic_compare_exchange_n(&tim-
> >status.u32,
> > +					      &prev_status.u32,
> > +					      status.u32, 0,
> > +					      __ATOMIC_ACQUIRE,
> > +					      __ATOMIC_RELAXED);
> >  	}
> >
> >  	ret_prev_status->u32 = prev_status.u32; @@ -279,20 +284,27 @@
> > timer_set_running_state(struct rte_timer *tim)
> >
> >  	/* wait that the timer is in correct status before update,
> >  	 * and mark it as running */
> > -	while (success == 0) {
> > -		prev_status.u32 = tim->status.u32;
> > +	prev_status.u32 = __atomic_load_n(&tim->status.u32,
> > __ATOMIC_RELAXED);
> >
> > +	while (success == 0) {
> >  		/* timer is not pending anymore */
> >  		if (prev_status.state != RTE_TIMER_PENDING)
> >  			return -1;
> >
> >  		/* here, we know that timer is stopped or pending,

We know that the timer will be pending at this point... Since we're correcting the comment below, we can correct this part too.

With that change:
Acked-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>

> > -		 * mark it atomically as being configured */
> > +		 * mark it atomically as being running
> > +		 */
> >  		status.state = RTE_TIMER_RUNNING;
> >  		status.owner = (int16_t)lcore_id;
> > -		success = rte_atomic32_cmpset(&tim->status.u32,
> > -					      prev_status.u32,
> > -					      status.u32);
> > +		/* RUNNING states are acting as locked states. If the
> > +		 * timer is in RUNNING state, the state cannot be changed
> > +		 * by other threads. So, we should use ACQUIRE here.
> > +		 */
> > +		success = __atomic_compare_exchange_n(&tim-
> >status.u32,
> > +					      &prev_status.u32,
> > +					      status.u32, 0,
> > +					      __ATOMIC_ACQUIRE,
> > +					      __ATOMIC_RELAXED);
> >  	}
> >
> >  	return 0;

Thanks,
Erik
  
Phil Yang April 24, 2020, 7:27 a.m. UTC | #3
> -----Original Message-----
> From: Carrillo, Erik G <erik.g.carrillo@intel.com>
> Sent: Friday, April 24, 2020 9:27 AM
> To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Phil Yang
> <Phil.Yang@arm.com>; rsanford@akamai.com; dev@dpdk.org
> Cc: thomas@monjalon.net; david.marchand@redhat.com; Ananyev,
> Konstantin <konstantin.ananyev@intel.com>; jerinj@marvell.com;
> hemant.agrawal@nxp.com; Gavin Hu <Gavin.Hu@arm.com>; nd
> <nd@arm.com>; nd <nd@arm.com>
> Subject: RE: [PATCH v2] lib/timer: relax barrier for status update
> 
> Hi Honnappa,
> 
> > -----Original Message-----
> > From: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> > Sent: Thursday, April 23, 2020 3:06 PM
> > To: Phil Yang <Phil.Yang@arm.com>; Carrillo, Erik G
> > <erik.g.carrillo@intel.com>; rsanford@akamai.com; dev@dpdk.org
> > Cc: thomas@monjalon.net; david.marchand@redhat.com; Ananyev,
> > Konstantin <konstantin.ananyev@intel.com>; jerinj@marvell.com;
> > hemant.agrawal@nxp.com; Gavin Hu <Gavin.Hu@arm.com>; nd
> > <nd@arm.com>; Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>;
> > nd <nd@arm.com>
> > Subject: RE: [PATCH v2] lib/timer: relax barrier for status update
> >
> > Hi Erik,
> >
> > > Subject: [PATCH v2] lib/timer: relax barrier for status update
> > >
> > > Volatile has no ordering semantics. The rte_timer structure defines
> > > timer status as a volatile variable and uses the rte_r/wmb barrier to
> > > guarantee inter-thread visibility.
> > >
> > > This patch optimized the volatile operation with c11 atomic operations
> > > and one-way barrier to save the performance penalty. According to the
> > > timer_perf_autotest benchmarking results, this patch can uplift
> > > 10%~16% timer appending performance, 3%~20% timer resetting
> > > performance and 45% timer callbacks scheduling performance on aarch64
> > > and no loss in performance for x86.
> > >
> > > Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > Signed-off-by: Phil Yang <phil.yang@arm.com>
> > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > >
> > > ---
> > > This patch depends on patch:
> > > http://patchwork.dpdk.org/patch/65997/
> > >
> > > v2:
> > > 1. Changed the memory ordering comment in timer_set_config_state.
> > > 2. It is still using built-ins as the wrapper functions for C11
> > > built-ins are not defined yet.
> > It is too late to get the wrapper functions done for 20.05. It was decided in
> > yesterday's tech board meeting to go ahead with C11 atomic built-ins (since
> > there is lot of code in DPDK that uses C11 built-ins). If there are no further
> > comments, can you please provide your ack?
> >
> 
> Ok, thanks for letting me know.  Based on that decision,  I've taken another
> look
> and done some testing and it looks good to me.  I've made one comment in-
> line
> below and acked it.
> 
> <... snipped ...>
> 
> > > @@ -258,9 +257,15 @@ timer_set_config_state(struct rte_timer *tim,
> > >  		 * mark it atomically as being configured */
> > >  		status.state = RTE_TIMER_CONFIG;
> > >  		status.owner = (int16_t)lcore_id;
> > > -		success = rte_atomic32_cmpset(&tim->status.u32,
> > > -					      prev_status.u32,
> > > -					      status.u32);
> > > +		/* CONFIG states are acting as locked states. If the
> > > +		 * timer is in CONFIG state, the state cannot be changed
> > > +		 * by other threads. So, we should use ACQUIRE here.
> > > +		 */
> > > +		success = __atomic_compare_exchange_n(&tim-
> > >status.u32,
> > > +					      &prev_status.u32,
> > > +					      status.u32, 0,
> > > +					      __ATOMIC_ACQUIRE,
> > > +					      __ATOMIC_RELAXED);
> > >  	}
> > >
> > >  	ret_prev_status->u32 = prev_status.u32; @@ -279,20 +284,27 @@
> > > timer_set_running_state(struct rte_timer *tim)
> > >
> > >  	/* wait that the timer is in correct status before update,
> > >  	 * and mark it as running */
> > > -	while (success == 0) {
> > > -		prev_status.u32 = tim->status.u32;
> > > +	prev_status.u32 = __atomic_load_n(&tim->status.u32,
> > > __ATOMIC_RELAXED);
> > >
> > > +	while (success == 0) {
> > >  		/* timer is not pending anymore */
> > >  		if (prev_status.state != RTE_TIMER_PENDING)
> > >  			return -1;
> > >
> > >  		/* here, we know that timer is stopped or pending,
> 
> We know that the timer will be pending at this point... Since we're correcting
> the comment below, we can correct this part too.
> 
> With that change:
> Acked-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>

Thanks Erik.
Updated in v3.

Thanks,
Phil 

> 
> > > -		 * mark it atomically as being configured */
> > > +		 * mark it atomically as being running
> > > +		 */
> > >  		status.state = RTE_TIMER_RUNNING;
> > >  		status.owner = (int16_t)lcore_id;
> > > -		success = rte_atomic32_cmpset(&tim->status.u32,
> > > -					      prev_status.u32,
> > > -					      status.u32);
> > > +		/* RUNNING states are acting as locked states. If the
> > > +		 * timer is in RUNNING state, the state cannot be changed
> > > +		 * by other threads. So, we should use ACQUIRE here.
> > > +		 */
> > > +		success = __atomic_compare_exchange_n(&tim-
> > >status.u32,
> > > +					      &prev_status.u32,
> > > +					      status.u32, 0,
> > > +					      __ATOMIC_ACQUIRE,
> > > +					      __ATOMIC_RELAXED);
> > >  	}
> > >
> > >  	return 0;
> 
> Thanks,
> Erik
  
Thomas Monjalon April 25, 2020, 2:36 p.m. UTC | #4
20/04/2020 18:05, Phil Yang:
> This patch depends on patch:
> http://patchwork.dpdk.org/patch/65997/

In order to ease patch tracking, you should have kept the first patch
in the next version of your series. We don't split series in general.
  
Phil Yang April 25, 2020, 3:51 p.m. UTC | #5
> -----Original Message-----
> From: Thomas Monjalon <thomas@monjalon.net>
> Sent: Saturday, April 25, 2020 10:36 PM
> To: Phil Yang <Phil.Yang@arm.com>
> Cc: erik.g.carrillo@intel.com; rsanford@akamai.com;
> david.marchand@redhat.com; konstantin.ananyev@intel.com;
> jerinj@marvell.com; hemant.agrawal@nxp.com; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; Gavin Hu <Gavin.Hu@arm.com>; nd
> <nd@arm.com>; dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v2] lib/timer: relax barrier for status update
> 
> 20/04/2020 18:05, Phil Yang:
> > This patch depends on patch:
> > http://patchwork.dpdk.org/patch/65997/
> 
> In order to ease patch tracking, you should have kept the first patch
> in the next version of your series. We don't split series in general.

Thanks for reminding me.
I will update the patch series in the new version.

Thanks,
Phil
> 
>
  
Thomas Monjalon April 25, 2020, 4:07 p.m. UTC | #6
25/04/2020 17:51, Phil Yang:
> From: Thomas Monjalon <thomas@monjalon.net>
> > 20/04/2020 18:05, Phil Yang:
> > > This patch depends on patch:
> > > http://patchwork.dpdk.org/patch/65997/
> > 
> > In order to ease patch tracking, you should have kept the first patch
> > in the next version of your series. We don't split series in general.
> 
> Thanks for reminding me.
> I will update the patch series in the new version.

No that's fine, I'm merging it already.
Next time :-)
  

Patch

diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c
index 269e921..ba17216 100644
--- a/lib/librte_timer/rte_timer.c
+++ b/lib/librte_timer/rte_timer.c
@@ -10,7 +10,6 @@ 
 #include <assert.h>
 #include <sys/queue.h>
 
-#include <rte_atomic.h>
 #include <rte_common.h>
 #include <rte_cycles.h>
 #include <rte_eal_memconfig.h>
@@ -218,7 +217,7 @@  rte_timer_init(struct rte_timer *tim)
 
 	status.state = RTE_TIMER_STOP;
 	status.owner = RTE_TIMER_NO_OWNER;
-	tim->status.u32 = status.u32;
+	__atomic_store_n(&tim->status.u32, status.u32, __ATOMIC_RELAXED);
 }
 
 /*
@@ -239,9 +238,9 @@  timer_set_config_state(struct rte_timer *tim,
 
 	/* wait that the timer is in correct status before update,
 	 * and mark it as being configured */
-	while (success == 0) {
-		prev_status.u32 = tim->status.u32;
+	prev_status.u32 = __atomic_load_n(&tim->status.u32, __ATOMIC_RELAXED);
 
+	while (success == 0) {
 		/* timer is running on another core
 		 * or ready to run on local core, exit
 		 */
@@ -258,9 +257,15 @@  timer_set_config_state(struct rte_timer *tim,
 		 * mark it atomically as being configured */
 		status.state = RTE_TIMER_CONFIG;
 		status.owner = (int16_t)lcore_id;
-		success = rte_atomic32_cmpset(&tim->status.u32,
-					      prev_status.u32,
-					      status.u32);
+		/* CONFIG states are acting as locked states. If the
+		 * timer is in CONFIG state, the state cannot be changed
+		 * by other threads. So, we should use ACQUIRE here.
+		 */
+		success = __atomic_compare_exchange_n(&tim->status.u32,
+					      &prev_status.u32,
+					      status.u32, 0,
+					      __ATOMIC_ACQUIRE,
+					      __ATOMIC_RELAXED);
 	}
 
 	ret_prev_status->u32 = prev_status.u32;
@@ -279,20 +284,27 @@  timer_set_running_state(struct rte_timer *tim)
 
 	/* wait that the timer is in correct status before update,
 	 * and mark it as running */
-	while (success == 0) {
-		prev_status.u32 = tim->status.u32;
+	prev_status.u32 = __atomic_load_n(&tim->status.u32, __ATOMIC_RELAXED);
 
+	while (success == 0) {
 		/* timer is not pending anymore */
 		if (prev_status.state != RTE_TIMER_PENDING)
 			return -1;
 
 		/* here, we know that timer is stopped or pending,
-		 * mark it atomically as being configured */
+		 * mark it atomically as being running
+		 */
 		status.state = RTE_TIMER_RUNNING;
 		status.owner = (int16_t)lcore_id;
-		success = rte_atomic32_cmpset(&tim->status.u32,
-					      prev_status.u32,
-					      status.u32);
+		/* RUNNING states are acting as locked states. If the
+		 * timer is in RUNNING state, the state cannot be changed
+		 * by other threads. So, we should use ACQUIRE here.
+		 */
+		success = __atomic_compare_exchange_n(&tim->status.u32,
+					      &prev_status.u32,
+					      status.u32, 0,
+					      __ATOMIC_ACQUIRE,
+					      __ATOMIC_RELAXED);
 	}
 
 	return 0;
@@ -520,10 +532,12 @@  __rte_timer_reset(struct rte_timer *tim, uint64_t expire,
 
 	/* update state: as we are in CONFIG state, only us can modify
 	 * the state so we don't need to use cmpset() here */
-	rte_wmb();
 	status.state = RTE_TIMER_PENDING;
 	status.owner = (int16_t)tim_lcore;
-	tim->status.u32 = status.u32;
+	/* The "RELEASE" ordering guarantees the memory operations above
+	 * the status update are observed before the update by all threads
+	 */
+	__atomic_store_n(&tim->status.u32, status.u32, __ATOMIC_RELEASE);
 
 	if (tim_lcore != lcore_id || !local_is_locked)
 		rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock);
@@ -600,10 +614,12 @@  __rte_timer_stop(struct rte_timer *tim, int local_is_locked,
 	}
 
 	/* mark timer as stopped */
-	rte_wmb();
 	status.state = RTE_TIMER_STOP;
 	status.owner = RTE_TIMER_NO_OWNER;
-	tim->status.u32 = status.u32;
+	/* The "RELEASE" ordering guarantees the memory operations above
+	 * the status update are observed before the update by all threads
+	 */
+	__atomic_store_n(&tim->status.u32, status.u32, __ATOMIC_RELEASE);
 
 	return 0;
 }
@@ -637,7 +653,8 @@  rte_timer_stop_sync(struct rte_timer *tim)
 int
 rte_timer_pending(struct rte_timer *tim)
 {
-	return tim->status.state == RTE_TIMER_PENDING;
+	return __atomic_load_n(&tim->status.state,
+				__ATOMIC_RELAXED) == RTE_TIMER_PENDING;
 }
 
 /* must be called periodically, run all timer that expired */
@@ -739,8 +756,12 @@  __rte_timer_manage(struct rte_timer_data *timer_data)
 			/* remove from done list and mark timer as stopped */
 			status.state = RTE_TIMER_STOP;
 			status.owner = RTE_TIMER_NO_OWNER;
-			rte_wmb();
-			tim->status.u32 = status.u32;
+			/* The "RELEASE" ordering guarantees the memory
+			 * operations above the status update are observed
+			 * before the update by all threads
+			 */
+			__atomic_store_n(&tim->status.u32, status.u32,
+				__ATOMIC_RELEASE);
 		}
 		else {
 			/* keep it in list and mark timer as pending */
@@ -748,8 +769,12 @@  __rte_timer_manage(struct rte_timer_data *timer_data)
 			status.state = RTE_TIMER_PENDING;
 			__TIMER_STAT_ADD(priv_timer, pending, 1);
 			status.owner = (int16_t)lcore_id;
-			rte_wmb();
-			tim->status.u32 = status.u32;
+			/* The "RELEASE" ordering guarantees the memory
+			 * operations above the status update are observed
+			 * before the update by all threads
+			 */
+			__atomic_store_n(&tim->status.u32, status.u32,
+				__ATOMIC_RELEASE);
 			__rte_timer_reset(tim, tim->expire + tim->period,
 				tim->period, lcore_id, tim->f, tim->arg, 1,
 				timer_data);
@@ -919,8 +944,12 @@  rte_timer_alt_manage(uint32_t timer_data_id,
 			/* remove from done list and mark timer as stopped */
 			status.state = RTE_TIMER_STOP;
 			status.owner = RTE_TIMER_NO_OWNER;
-			rte_wmb();
-			tim->status.u32 = status.u32;
+			/* The "RELEASE" ordering guarantees the memory
+			 * operations above the status update are observed
+			 * before the update by all threads
+			 */
+			__atomic_store_n(&tim->status.u32, status.u32,
+				__ATOMIC_RELEASE);
 		} else {
 			/* keep it in list and mark timer as pending */
 			rte_spinlock_lock(
@@ -928,8 +957,12 @@  rte_timer_alt_manage(uint32_t timer_data_id,
 			status.state = RTE_TIMER_PENDING;
 			__TIMER_STAT_ADD(data->priv_timer, pending, 1);
 			status.owner = (int16_t)this_lcore;
-			rte_wmb();
-			tim->status.u32 = status.u32;
+			/* The "RELEASE" ordering guarantees the memory
+			 * operations above the status update are observed
+			 * before the update by all threads
+			 */
+			__atomic_store_n(&tim->status.u32, status.u32,
+				__ATOMIC_RELEASE);
 			__rte_timer_reset(tim, tim->expire + tim->period,
 				tim->period, this_lcore, tim->f, tim->arg, 1,
 				data);
diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h
index c6b3d45..df533fa 100644
--- a/lib/librte_timer/rte_timer.h
+++ b/lib/librte_timer/rte_timer.h
@@ -101,7 +101,7 @@  struct rte_timer
 {
 	uint64_t expire;       /**< Time when timer expire. */
 	struct rte_timer *sl_next[MAX_SKIPLIST_DEPTH];
-	volatile union rte_timer_status status; /**< Status of timer. */
+	union rte_timer_status status; /**< Status of timer. */
 	uint64_t period;       /**< Period of timer (0 if not periodic). */
 	rte_timer_cb_t f;      /**< Callback function. */
 	void *arg;             /**< Argument to callback function. */