[dpdk-stable] [PATCH] rte_ring: fix racy dequeue/enqueue in ppc64

Takeshi Yoshimura t.yoshimura8869 at gmail.com
Thu Jul 12 04:44:14 CEST 2018


SPDK blobfs encountered a crash around rte_ring dequeues in ppc64.
It uses a single consumer and multiple producers for a rte_ring.
The problem was a load-load reorder in rte_ring_sc_dequeue_bulk().

The reordered loads happened on r->prod.tail in
__rte_ring_move_cons_head() (rte_ring_generic.h) and ring[idx] in
DEQUEUE_PTRS() (rte_ring.h). They have a load-load control
dependency, but the code does not satisfy it. Note that they are
not reordered if __rte_ring_move_cons_head() with is_sc != 1 because
cmpset invokes a read barrier.

The paired stores on these loads are in ENQUEUE_PTRS() and
update_tail(). Simplified code around the reorder is the following.

Consumer             Producer
load idx[ring]
                     store idx[ring]
                     store r->prod.tail
load r->prod.tail

In this case, the consumer loads old idx[ring] and confirms the load
is valid with the new r->prod.tail.

I added a read barrier in the case where __IS_SC is passed to
__rte_ring_move_cons_head(). I also fixed __rte_ring_move_prod_head()
to avoid similar problems with a single producer.

Cc: stable at dpdk.org

Signed-off-by: Takeshi Yoshimura <tyos at jp.ibm.com>
---
 lib/librte_ring/rte_ring_generic.h | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index ea7dbe5b9..477326180 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -90,9 +90,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_sp)
+		if (is_sp) {
+			rte_smp_rmb();
 			r->prod.head = *new_head, success = 1;
-		else
+		} else
 			success = rte_atomic32_cmpset(&r->prod.head,
 					*old_head, *new_head);
 	} while (unlikely(success == 0));
@@ -158,9 +159,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_sc)
+		if (is_sc) {
+			rte_smp_rmb();
 			r->cons.head = *new_head, success = 1;
-		else
+		} else
 			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
 					*new_head);
 	} while (unlikely(success == 0));
-- 
2.17.1



More information about the stable mailing list