[dpdk-dev] [PATCH v2] ring: fix c11 memory ordering issue

He, Jia jia.he at hxt-semitech.com
Tue Aug 7 07:56:36 CEST 2018


Hi Gavin
> -----Original Message-----
> From: Gavin Hu [mailto:gavin.hu at arm.com]
> Sent: 2018年8月7日 11:20
> To: dev at dpdk.org
> Cc: gavin.hu at arm.com; Honnappa.Nagarahalli at arm.com;
> steve.capper at arm.com; Ola.Liljedahl at arm.com;
> jerin.jacob at caviumnetworks.com; hemant.agrawal at nxp.com; He, Jia
> <jia.he at hxt-semitech.com>; stable at dpdk.org
> Subject: [PATCH v2] ring: fix c11 memory ordering issue
> 
> This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4).

Maybe you need to split this into small parts.

> 1) In update_tail, read ht->tail using __atomic_load.Although the
>    compiler currently seems to be doing the right thing even without
>    _atomic_load, we don't want to give the compiler freedom to optimise
>    what should be an atomic load, it should not be arbitarily moved
>    around.
> 2) Synchronize the load-acquire of the tail and the store-release
>    within update_tail, the store release ensures all the ring operations,
>    engqueu or dequeue are seen by the observers as soon as they see
>    the updated tail. The load-acquire is required for correctly compu-
>    tate the free_entries or avail_entries, respectively for enqueue and
>    dequeue operations, the data dependency is not reliable for ordering
>    as the compiler might break it by saving to temporary values to boost
>    performance.

Could you describe the race condition in details?
e.g.
cpu 1			cpu2
code1
				code2

Cheers,
Jia
> 3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
>    the do {} while loop as upon failure the old_head will be updated,
>    another load is costy and not necessary.
> 4) When calling __atomic_compare_exchange_n, relaxed ordering for both
>    success and failure, as multiple threads can work independently on
>    the same end of the ring (either enqueue or dequeue) without
>    synchronization, not as operating on tail, which has to be finished
>    in sequence.
> 
> The patch was benchmarked with test/ring_perf_autotest and it decreases the
> enqueue/dequeue latency by 5% ~ 24.6% with two lcores, the real gains are
> dependent on the number of lcores, depth of the ring, SPSC or MPMC.
> For 1 lcore, it also improves a little, about 3 ~ 4%.
> It is a big improvement, in case of MPMC, with rings size of 32, it saves latency up
> to (6.90-5.20)/6.90 = 24.6%.
> 
> Test result data:
> 
> SP/SC bulk enq/dequeue (size: 8): 13.19
> MP/MC bulk enq/dequeue (size: 8): 25.79
> SP/SC bulk enq/dequeue (size: 32): 3.85
> MP/MC bulk enq/dequeue (size: 32): 6.90
> 
> SP/SC bulk enq/dequeue (size: 8): 12.05
> MP/MC bulk enq/dequeue (size: 8): 23.06
> SP/SC bulk enq/dequeue (size: 32): 3.62
> MP/MC bulk enq/dequeue (size: 32): 5.20
> 
> Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
> Cc: stable at dpdk.org
> 
> Signed-off-by: Gavin Hu <gavin.hu at arm.com>
> Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli at arm.com>
> Reviewed-by: Steve Capper <steve.capper at arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl at arm.com>
> ---
>  lib/librte_ring/rte_ring_c11_mem.h | 38
> +++++++++++++++++++++++++-------------
>  1 file changed, 25 insertions(+), 13 deletions(-)
> 
> diff --git a/lib/librte_ring/rte_ring_c11_mem.h
> b/lib/librte_ring/rte_ring_c11_mem.h
> index 94df3c4a6..cfa3be4a7 100644
> --- a/lib/librte_ring/rte_ring_c11_mem.h
> +++ b/lib/librte_ring/rte_ring_c11_mem.h
> @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
> uint32_t new_val,
>  	 * we need to wait for them to complete
>  	 */
>  	if (!single)
> -		while (unlikely(ht->tail != old_val))
> +		while (unlikely(old_val != __atomic_load_n(&ht->tail,
> +						__ATOMIC_RELAXED)))
>  			rte_pause();
> 
>  	__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20
> +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
>  	unsigned int max = n;
>  	int success;
> 
> +	*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
>  	do {
>  		/* Reset n to the initial burst count */
>  		n = max;
> 
> -		*old_head = __atomic_load_n(&r->prod.head,
> -					__ATOMIC_ACQUIRE);
> 
> -		/*
> -		 *  The subtraction is done between two unsigned 32bits value
> +		/* load-acquire synchronize with store-release of ht->tail
> +		 * in update_tail.
> +		 */
> +		const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
> +							__ATOMIC_ACQUIRE);
> +
> +		/* The subtraction is done between two unsigned 32bits value
>  		 * (the result is always modulo 32 bits even if we have
>  		 * *old_head > cons_tail). So 'free_entries' is always between 0
>  		 * and capacity (which is < size).
>  		 */
> -		*free_entries = (capacity + r->cons.tail - *old_head);
> +		*free_entries = (capacity + cons_tail - *old_head);
> 
>  		/* check that we have enough room in ring */
>  		if (unlikely(n > *free_entries))
> @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned
> int is_sp,
>  		if (is_sp)
>  			r->prod.head = *new_head, success = 1;
>  		else
> +			/* on failure, *old_head is updated */
>  			success = __atomic_compare_exchange_n(&r->prod.head,
>  					old_head, *new_head,
> -					0, __ATOMIC_ACQUIRE,
> +					/*weak=*/0, __ATOMIC_RELAXED,
>  					__ATOMIC_RELAXED);
>  	} while (unlikely(success == 0));
>  	return n;
> @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
> is_sc,
>  	int success;
> 
>  	/* move cons.head atomically */
> +	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
>  	do {
>  		/* Restore n as it may change every loop */
>  		n = max;
> -		*old_head = __atomic_load_n(&r->cons.head,
> -					__ATOMIC_ACQUIRE);
> +
> +		/* this load-acquire synchronize with store-release of ht->tail
> +		 * in update_tail.
> +		 */
> +		const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
> +							__ATOMIC_ACQUIRE);
> 
>  		/* The subtraction is done between two unsigned 32bits value
>  		 * (the result is always modulo 32 bits even if we have
>  		 * cons_head > prod_tail). So 'entries' is always between 0
>  		 * and size(ring)-1.
>  		 */
> -		*entries = (r->prod.tail - *old_head);
> +		*entries = (prod_tail - *old_head);
> 
>  		/* Set the actual entries for dequeue */
>  		if (n > *entries)
> @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
> is_sc,
>  		if (is_sc)
>  			r->cons.head = *new_head, success = 1;
>  		else
> +			/* on failure, *old_head will be updated */
>  			success = __atomic_compare_exchange_n(&r->cons.head,
> -							old_head, *new_head,
> -							0, __ATOMIC_ACQUIRE,
> -							__ATOMIC_RELAXED);
> +						old_head, *new_head,
> +						/*weak=*/0, __ATOMIC_RELAXED,
> +						__ATOMIC_RELAXED);
>  	} while (unlikely(success == 0));
>  	return n;
>  }
> --
> 2.11.0



More information about the dev mailing list