[dpdk-dev] [PATCH v2] ring: fix c11 memory ordering issue
He, Jia
jia.he at hxt-semitech.com
Tue Aug 7 07:56:36 CEST 2018
Hi Gavin
> -----Original Message-----
> From: Gavin Hu [mailto:gavin.hu at arm.com]
> Sent: 2018年8月7日 11:20
> To: dev at dpdk.org
> Cc: gavin.hu at arm.com; Honnappa.Nagarahalli at arm.com;
> steve.capper at arm.com; Ola.Liljedahl at arm.com;
> jerin.jacob at caviumnetworks.com; hemant.agrawal at nxp.com; He, Jia
> <jia.he at hxt-semitech.com>; stable at dpdk.org
> Subject: [PATCH v2] ring: fix c11 memory ordering issue
>
> This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4).
Maybe you need to split this into small parts.
> 1) In update_tail, read ht->tail using __atomic_load.Although the
> compiler currently seems to be doing the right thing even without
> _atomic_load, we don't want to give the compiler freedom to optimise
> what should be an atomic load, it should not be arbitarily moved
> around.
> 2) Synchronize the load-acquire of the tail and the store-release
> within update_tail, the store release ensures all the ring operations,
> engqueu or dequeue are seen by the observers as soon as they see
> the updated tail. The load-acquire is required for correctly compu-
> tate the free_entries or avail_entries, respectively for enqueue and
> dequeue operations, the data dependency is not reliable for ordering
> as the compiler might break it by saving to temporary values to boost
> performance.
Could you describe the race condition in details?
e.g.
cpu 1 cpu2
code1
code2
Cheers,
Jia
> 3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
> the do {} while loop as upon failure the old_head will be updated,
> another load is costy and not necessary.
> 4) When calling __atomic_compare_exchange_n, relaxed ordering for both
> success and failure, as multiple threads can work independently on
> the same end of the ring (either enqueue or dequeue) without
> synchronization, not as operating on tail, which has to be finished
> in sequence.
>
> The patch was benchmarked with test/ring_perf_autotest and it decreases the
> enqueue/dequeue latency by 5% ~ 24.6% with two lcores, the real gains are
> dependent on the number of lcores, depth of the ring, SPSC or MPMC.
> For 1 lcore, it also improves a little, about 3 ~ 4%.
> It is a big improvement, in case of MPMC, with rings size of 32, it saves latency up
> to (6.90-5.20)/6.90 = 24.6%.
>
> Test result data:
>
> SP/SC bulk enq/dequeue (size: 8): 13.19
> MP/MC bulk enq/dequeue (size: 8): 25.79
> SP/SC bulk enq/dequeue (size: 32): 3.85
> MP/MC bulk enq/dequeue (size: 32): 6.90
>
> SP/SC bulk enq/dequeue (size: 8): 12.05
> MP/MC bulk enq/dequeue (size: 8): 23.06
> SP/SC bulk enq/dequeue (size: 32): 3.62
> MP/MC bulk enq/dequeue (size: 32): 5.20
>
> Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
> Cc: stable at dpdk.org
>
> Signed-off-by: Gavin Hu <gavin.hu at arm.com>
> Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli at arm.com>
> Reviewed-by: Steve Capper <steve.capper at arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl at arm.com>
> ---
> lib/librte_ring/rte_ring_c11_mem.h | 38
> +++++++++++++++++++++++++-------------
> 1 file changed, 25 insertions(+), 13 deletions(-)
>
> diff --git a/lib/librte_ring/rte_ring_c11_mem.h
> b/lib/librte_ring/rte_ring_c11_mem.h
> index 94df3c4a6..cfa3be4a7 100644
> --- a/lib/librte_ring/rte_ring_c11_mem.h
> +++ b/lib/librte_ring/rte_ring_c11_mem.h
> @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
> uint32_t new_val,
> * we need to wait for them to complete
> */
> if (!single)
> - while (unlikely(ht->tail != old_val))
> + while (unlikely(old_val != __atomic_load_n(&ht->tail,
> + __ATOMIC_RELAXED)))
> rte_pause();
>
> __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20
> +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
> unsigned int max = n;
> int success;
>
> + *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
> do {
> /* Reset n to the initial burst count */
> n = max;
>
> - *old_head = __atomic_load_n(&r->prod.head,
> - __ATOMIC_ACQUIRE);
>
> - /*
> - * The subtraction is done between two unsigned 32bits value
> + /* load-acquire synchronize with store-release of ht->tail
> + * in update_tail.
> + */
> + const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
> + __ATOMIC_ACQUIRE);
> +
> + /* The subtraction is done between two unsigned 32bits value
> * (the result is always modulo 32 bits even if we have
> * *old_head > cons_tail). So 'free_entries' is always between 0
> * and capacity (which is < size).
> */
> - *free_entries = (capacity + r->cons.tail - *old_head);
> + *free_entries = (capacity + cons_tail - *old_head);
>
> /* check that we have enough room in ring */
> if (unlikely(n > *free_entries))
> @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned
> int is_sp,
> if (is_sp)
> r->prod.head = *new_head, success = 1;
> else
> + /* on failure, *old_head is updated */
> success = __atomic_compare_exchange_n(&r->prod.head,
> old_head, *new_head,
> - 0, __ATOMIC_ACQUIRE,
> + /*weak=*/0, __ATOMIC_RELAXED,
> __ATOMIC_RELAXED);
> } while (unlikely(success == 0));
> return n;
> @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
> is_sc,
> int success;
>
> /* move cons.head atomically */
> + *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
> do {
> /* Restore n as it may change every loop */
> n = max;
> - *old_head = __atomic_load_n(&r->cons.head,
> - __ATOMIC_ACQUIRE);
> +
> + /* this load-acquire synchronize with store-release of ht->tail
> + * in update_tail.
> + */
> + const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
> + __ATOMIC_ACQUIRE);
>
> /* The subtraction is done between two unsigned 32bits value
> * (the result is always modulo 32 bits even if we have
> * cons_head > prod_tail). So 'entries' is always between 0
> * and size(ring)-1.
> */
> - *entries = (r->prod.tail - *old_head);
> + *entries = (prod_tail - *old_head);
>
> /* Set the actual entries for dequeue */
> if (n > *entries)
> @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
> is_sc,
> if (is_sc)
> r->cons.head = *new_head, success = 1;
> else
> + /* on failure, *old_head will be updated */
> success = __atomic_compare_exchange_n(&r->cons.head,
> - old_head, *new_head,
> - 0, __ATOMIC_ACQUIRE,
> - __ATOMIC_RELAXED);
> + old_head, *new_head,
> + /*weak=*/0, __ATOMIC_RELAXED,
> + __ATOMIC_RELAXED);
> } while (unlikely(success == 0));
> return n;
> }
> --
> 2.11.0
More information about the dev
mailing list