[v1] ring: enforce reading the tails before ring operations
Checks
Commit Message
In weak memory models, like arm64, reading the {prod,cons}.tail may get
reordered after reading or writing the ring slots, which corrupts the ring
and stale data is observed.
This issue was reported by NXP on 8-A72 DPAA2 board. The problem is most
likely caused by missing the acquire semantics when reading cons.tail (in
SP enqueue) or prod.tail (in SC dequeue) which makes it possible to read a
stale value from the ring slots. There must be a read fence before writing
or reading the ring slots, rte_atomic32_cmpset() provides the same ordering
for MP (and MC) case. This patch is to enforce this ordering for SP (and
SC) case.
Signed-off-by: gavin hu <gavin.hu@arm.com>
Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
Tested-by: Nipun Gupta <nipun.gupta@nxp.com>
---
lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
Comments
On 06.03.2019 6:07, gavin hu wrote:
> In weak memory models, like arm64, reading the {prod,cons}.tail may get
> reordered after reading or writing the ring slots, which corrupts the ring
> and stale data is observed.
> This issue was reported by NXP on 8-A72 DPAA2 board. The problem is most
> likely caused by missing the acquire semantics when reading cons.tail (in
> SP enqueue) or prod.tail (in SC dequeue) which makes it possible to read a
> stale value from the ring slots. There must be a read fence before writing
Sorry, but the phrase "There must be a read fence before writing" makes no sense.
Could you please rephrase or describe in details which reads you're trying to
keep in exact order?
> or reading the ring slots, rte_atomic32_cmpset() provides the same ordering
> for MP (and MC) case. This patch is to enforce this ordering for SP (and
> SC) case.
>
> Signed-off-by: gavin hu <gavin.hu@arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> Tested-by: Nipun Gupta <nipun.gupta@nxp.com>
> ---
> lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------
> 1 file changed, 10 insertions(+), 6 deletions(-)
>
> diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
> index ea7dbe5..1bd3dfd 100644
> --- a/lib/librte_ring/rte_ring_generic.h
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -90,9 +90,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
> return 0;
>
> *new_head = *old_head + n;
> - if (is_sp)
> - r->prod.head = *new_head, success = 1;
> - else
> + if (is_sp) {
> + r->prod.head = *new_head;
> + rte_smp_rmb();
> + success = 1;
> + } else
> success = rte_atomic32_cmpset(&r->prod.head,
> *old_head, *new_head);
> } while (unlikely(success == 0));
> @@ -158,9 +160,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
> return 0;
>
> *new_head = *old_head + n;
> - if (is_sc)
> - r->cons.head = *new_head, success = 1;
> - else
> + if (is_sc) {
> + r->cons.head = *new_head;
> + rte_smp_rmb();
> + success = 1;
> + } else
> success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> *new_head);
> } while (unlikely(success == 0));
>
Hi ilya,
Thanks for your comments, inline comments and new commit message in V2.
/Gavin
> -----Original Message-----
> From: Ilya Maximets <i.maximets@samsung.com>
> Sent: Wednesday, March 6, 2019 7:49 PM
> To: Gavin Hu (Arm Technology China) <Gavin.Hu@arm.com>;
> dev@dpdk.org
> Cc: nd <nd@arm.com>; thomas@monjalon.net; jerinj@marvell.com;
> hemant.agrawal@nxp.com; Nipun.gupta@nxp.com; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; olivier.matz@6wind.com
> Subject: Re: [v1] ring: enforce reading the tails before ring operations
>
> On 06.03.2019 6:07, gavin hu wrote:
> > In weak memory models, like arm64, reading the {prod,cons}.tail may get
> > reordered after reading or writing the ring slots, which corrupts the ring
> > and stale data is observed.
> > This issue was reported by NXP on 8-A72 DPAA2 board. The problem is
> most
> > likely caused by missing the acquire semantics when reading cons.tail (in
> > SP enqueue) or prod.tail (in SC dequeue) which makes it possible to read
> a
> > stale value from the ring slots. There must be a read fence before
> writing
>
> Sorry, but the phrase "There must be a read fence before writing" makes
> no sense.
> Could you please rephrase or describe in details which reads you're trying
> to
> keep in exact order?
This patch is to keep in exact order for reading the tails(to calculate the available
/free slots of ring when moving the heads) before reading or writing the ring slots.
I rephrased the commit message in V2, could you have a look?
> > or reading the ring slots, rte_atomic32_cmpset() provides the same
> ordering
> > for MP (and MC) case. This patch is to enforce this ordering for SP (and
> > SC) case.
> >
> > Signed-off-by: gavin hu <gavin.hu@arm.com>
> > Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> > Tested-by: Nipun Gupta <nipun.gupta@nxp.com>
> > ---
> > lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------
> > 1 file changed, 10 insertions(+), 6 deletions(-)
> >
> > diff --git a/lib/librte_ring/rte_ring_generic.h
> b/lib/librte_ring/rte_ring_generic.h
> > index ea7dbe5..1bd3dfd 100644
> > --- a/lib/librte_ring/rte_ring_generic.h
> > +++ b/lib/librte_ring/rte_ring_generic.h
> > @@ -90,9 +90,11 @@ __rte_ring_move_prod_head(struct rte_ring *r,
> unsigned int is_sp,
> > return 0;
> >
> > *new_head = *old_head + n;
> > - if (is_sp)
> > - r->prod.head = *new_head, success = 1;
> > - else
> > + if (is_sp) {
> > + r->prod.head = *new_head;
> > + rte_smp_rmb();
> > + success = 1;
> > + } else
> > success = rte_atomic32_cmpset(&r->prod.head,
> > *old_head, *new_head);
> > } while (unlikely(success == 0));
> > @@ -158,9 +160,11 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> unsigned int is_sc,
> > return 0;
> >
> > *new_head = *old_head + n;
> > - if (is_sc)
> > - r->cons.head = *new_head, success = 1;
> > - else
> > + if (is_sc) {
> > + r->cons.head = *new_head;
> > + rte_smp_rmb();
> > + success = 1;
> > + } else
> > success = rte_atomic32_cmpset(&r->cons.head,
> *old_head,
> > *new_head);
> > } while (unlikely(success == 0));
> >
@@ -90,9 +90,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
return 0;
*new_head = *old_head + n;
- if (is_sp)
- r->prod.head = *new_head, success = 1;
- else
+ if (is_sp) {
+ r->prod.head = *new_head;
+ rte_smp_rmb();
+ success = 1;
+ } else
success = rte_atomic32_cmpset(&r->prod.head,
*old_head, *new_head);
} while (unlikely(success == 0));
@@ -158,9 +160,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
return 0;
*new_head = *old_head + n;
- if (is_sc)
- r->cons.head = *new_head, success = 1;
- else
+ if (is_sc) {
+ r->cons.head = *new_head;
+ rte_smp_rmb();
+ success = 1;
+ } else
success = rte_atomic32_cmpset(&r->cons.head, *old_head,
*new_head);
} while (unlikely(success == 0));