[dpdk-dev] [PATCH v1] crypto/scheduler: fix multicore scheduler reordering

Kirill Rybalchenko kirill.rybalchenko at intel.com
Tue Jul 18 12:35:38 CEST 2017


Operations can be dequeued from the reordering ring only after they
were dequeued from the crypto pmd with rte_cryptodev_dequeue_burst()
function. It is not correct to dequeue them when status just changed
from RTE_CRYPTO_OP_STATUS_NOT_PROCESSED to any other value, as the
operations still can be processed by crypto pmd internally.
Now multicore scheduler workers mark status of all dequeued from
crypto pmd operations with CRYPTO_OP_STATUS_BIT_COMPLETE bit set.
Scheduler will dequeue crypto operations from reordering ring only
when this status bit is set. Prior to put this operation to output
buffer, scheduler clears this bit, so the application gets
unmodified status from crypto pmd.

Fixes: 4c07e0552f0a ("crypto/scheduler: add multicore scheduling mode")

Signed-off-by: Kirill Rybalchenko <kirill.rybalchenko at intel.com>
---
 drivers/crypto/scheduler/scheduler_multicore.c | 94 ++++++++++++++++++--------
 1 file changed, 67 insertions(+), 27 deletions(-)

diff --git a/drivers/crypto/scheduler/scheduler_multicore.c b/drivers/crypto/scheduler/scheduler_multicore.c
index bed9a8f..0cd5bce 100644
--- a/drivers/crypto/scheduler/scheduler_multicore.c
+++ b/drivers/crypto/scheduler/scheduler_multicore.c
@@ -42,6 +42,8 @@
 
 #define MC_SCHED_BUFFER_SIZE 32
 
+#define CRYPTO_OP_STATUS_BIT_COMPLETE	0x80
+
 /** multi-core scheduler context */
 struct mc_scheduler_ctx {
 	uint32_t num_workers;             /**< Number of workers polling */
@@ -136,10 +138,31 @@ static uint16_t
 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops,
 		uint16_t nb_ops)
 {
-	struct rte_ring *order_ring =
-			((struct scheduler_qp_ctx *)qp)->order_ring;
+	struct rte_ring *order_ring = ((struct scheduler_qp_ctx *)qp)->order_ring;
+	struct rte_crypto_op *op;
+	uint32_t nb_objs = rte_ring_count(order_ring);
+	uint32_t nb_ops_to_deq = 0;
+	uint32_t nb_ops_deqd = 0;
+
+	if (nb_objs > nb_ops)
+		nb_objs = nb_ops;
+
+	while (nb_ops_to_deq < nb_objs) {
+		SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq, op);
+
+		if (!(op->status & CRYPTO_OP_STATUS_BIT_COMPLETE))
+			break;
+
+		op->status &= ~CRYPTO_OP_STATUS_BIT_COMPLETE;
+		nb_ops_to_deq++;
+	}
+
+	if (nb_ops_to_deq) {
+		nb_ops_deqd = rte_ring_sc_dequeue_bulk(order_ring,
+				(void **)ops, nb_ops_to_deq, NULL);
+	}
 
-	return scheduler_order_drain(order_ring, ops, nb_ops);
+	return nb_ops_deqd;
 }
 
 static int
@@ -169,9 +192,12 @@ mc_scheduler_worker(struct rte_cryptodev *dev)
 	struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE];
 	struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE];
 	uint16_t processed_ops;
-	uint16_t left_op = 0;
-	uint16_t left_op_idx = 0;
+	uint16_t pending_enq_ops = 0;
+	uint16_t pending_enq_ops_idx = 0;
+	uint16_t pending_deq_ops = 0;
+	uint16_t pending_deq_ops_idx = 0;
 	uint16_t inflight_ops = 0;
+	const uint8_t reordering_enabled = sched_ctx->reordering_enabled;
 
 	for (i = 0; i < (int)sched_ctx->nb_wc; i++) {
 		if (sched_ctx->wc_pool[i] == core_id) {
@@ -189,37 +215,51 @@ mc_scheduler_worker(struct rte_cryptodev *dev)
 	deq_ring = mc_ctx->sched_deq_ring[worker_idx];
 
 	while (!mc_ctx->stop_signal) {
-		if (left_op) {
+		if (pending_enq_ops) {
 			processed_ops =
 				rte_cryptodev_enqueue_burst(slave->dev_id,
-						slave->qp_id,
-						&enq_ops[left_op_idx], left_op);
-
-			left_op -= processed_ops;
-			left_op_idx += processed_ops;
+					slave->qp_id, &enq_ops[pending_enq_ops_idx],
+					pending_enq_ops);
+			pending_enq_ops -= processed_ops;
+			pending_enq_ops_idx += processed_ops;
+			inflight_ops += processed_ops;
 		} else {
-			uint16_t nb_deq_ops = rte_ring_dequeue_burst(enq_ring,
-				(void *)enq_ops, MC_SCHED_BUFFER_SIZE, NULL);
-			if (nb_deq_ops) {
-				processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id,
-						slave->qp_id, enq_ops, nb_deq_ops);
-
-				if (unlikely(processed_ops < nb_deq_ops)) {
-					left_op = nb_deq_ops - processed_ops;
-					left_op_idx = processed_ops;
-				}
-
-				inflight_ops += processed_ops;
+			processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops,
+							MC_SCHED_BUFFER_SIZE, NULL);
+			if (processed_ops) {
+				pending_enq_ops_idx = rte_cryptodev_enqueue_burst(
+							slave->dev_id, slave->qp_id,
+							enq_ops, processed_ops);
+				pending_enq_ops = processed_ops - pending_enq_ops_idx;
+				inflight_ops += pending_enq_ops_idx;
 			}
 		}
 
-		if (inflight_ops > 0) {
+		if (pending_deq_ops) {
+			processed_ops = rte_ring_enqueue_burst(
+					deq_ring, (void *)&deq_ops[pending_deq_ops_idx],
+							pending_deq_ops, NULL);
+			pending_deq_ops -= processed_ops;
+			pending_deq_ops_idx += processed_ops;
+		} else if (inflight_ops) {
 			processed_ops = rte_cryptodev_dequeue_burst(slave->dev_id,
 					slave->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE);
 			if (processed_ops) {
-				uint16_t nb_enq_ops = rte_ring_enqueue_burst(deq_ring,
-					(void *)deq_ops, processed_ops, NULL);
-				inflight_ops -= nb_enq_ops;
+				inflight_ops -= processed_ops;
+				if (reordering_enabled) {
+					uint16_t j;
+
+					for (j = 0; j < processed_ops; j++) {
+						deq_ops[j]->status |=
+							CRYPTO_OP_STATUS_BIT_COMPLETE;
+					}
+				} else {
+					pending_deq_ops_idx = rte_ring_enqueue_burst(
+						deq_ring, (void *)deq_ops, processed_ops,
+						NULL);
+					pending_deq_ops = processed_ops -
+								pending_deq_ops_idx;
+				}
 			}
 		}
 
-- 
2.5.5



More information about the dev mailing list