[dpdk-stable] [PATCH] rte_ring: fix racy dequeue/enqueue in ppc64
Takeshi Yoshimura
t.yoshimura8869 at gmail.com
Thu Jul 12 04:44:14 CEST 2018
SPDK blobfs encountered a crash around rte_ring dequeues in ppc64.
It uses a single consumer and multiple producers for a rte_ring.
The problem was a load-load reorder in rte_ring_sc_dequeue_bulk().
The reordered loads happened on r->prod.tail in
__rte_ring_move_cons_head() (rte_ring_generic.h) and ring[idx] in
DEQUEUE_PTRS() (rte_ring.h). They have a load-load control
dependency, but the code does not satisfy it. Note that they are
not reordered if __rte_ring_move_cons_head() with is_sc != 1 because
cmpset invokes a read barrier.
The paired stores on these loads are in ENQUEUE_PTRS() and
update_tail(). Simplified code around the reorder is the following.
Consumer Producer
load idx[ring]
store idx[ring]
store r->prod.tail
load r->prod.tail
In this case, the consumer loads old idx[ring] and confirms the load
is valid with the new r->prod.tail.
I added a read barrier in the case where __IS_SC is passed to
__rte_ring_move_cons_head(). I also fixed __rte_ring_move_prod_head()
to avoid similar problems with a single producer.
Cc: stable at dpdk.org
Signed-off-by: Takeshi Yoshimura <tyos at jp.ibm.com>
---
lib/librte_ring/rte_ring_generic.h | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index ea7dbe5b9..477326180 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -90,9 +90,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
return 0;
*new_head = *old_head + n;
- if (is_sp)
+ if (is_sp) {
+ rte_smp_rmb();
r->prod.head = *new_head, success = 1;
- else
+ } else
success = rte_atomic32_cmpset(&r->prod.head,
*old_head, *new_head);
} while (unlikely(success == 0));
@@ -158,9 +159,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
return 0;
*new_head = *old_head + n;
- if (is_sc)
+ if (is_sc) {
+ rte_smp_rmb();
r->cons.head = *new_head, success = 1;
- else
+ } else
success = rte_atomic32_cmpset(&r->cons.head, *old_head,
*new_head);
} while (unlikely(success == 0));
--
2.17.1
More information about the stable
mailing list