[dpdk-dev] [PATCH] crypto/scheduler: optimize crypto op ordering

Fan Zhang roy.fan.zhang at intel.com
Thu Mar 2 15:18:34 CET 2017


This patch optimizes the crypto op ordering by replacing the
ordering method from using rte_reorder library to using rte_ring
to avoid unnecessary crypto op storing and recovering cost.

Signed-off-by: Fan Zhang <roy.fan.zhang at intel.com>
Signed-off-by: Sergio Gonzalez Monroy <sergio.gonzalez.monroy at intel.com>
---
 drivers/crypto/scheduler/scheduler_pmd_ops.c     |  42 +++---
 drivers/crypto/scheduler/scheduler_pmd_private.h |  49 +++++-
 drivers/crypto/scheduler/scheduler_roundrobin.c  | 181 ++---------------------
 3 files changed, 79 insertions(+), 193 deletions(-)

diff --git a/drivers/crypto/scheduler/scheduler_pmd_ops.c b/drivers/crypto/scheduler/scheduler_pmd_ops.c
index 56624c7..287b2fb 100644
--- a/drivers/crypto/scheduler/scheduler_pmd_ops.c
+++ b/drivers/crypto/scheduler/scheduler_pmd_ops.c
@@ -63,24 +63,25 @@ scheduler_pmd_config(struct rte_cryptodev *dev)
 }
 
 static int
-update_reorder_buff(struct rte_cryptodev *dev, uint16_t qp_id)
+update_order_ring(struct rte_cryptodev *dev, uint16_t qp_id)
 {
 	struct scheduler_ctx *sched_ctx = dev->data->dev_private;
 	struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id];
 
 	if (sched_ctx->reordering_enabled) {
-		char reorder_buff_name[RTE_CRYPTODEV_NAME_MAX_LEN];
-		uint32_t buff_size = sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE;
+		char order_ring_name[RTE_CRYPTODEV_NAME_MAX_LEN];
+		uint32_t buff_size = rte_align32pow2(
+			sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE);
 
-		if (qp_ctx->reorder_buf) {
-			rte_reorder_free(qp_ctx->reorder_buf);
-			qp_ctx->reorder_buf = NULL;
+		if (qp_ctx->order_ring) {
+			rte_ring_free(qp_ctx->order_ring);
+			qp_ctx->order_ring = NULL;
 		}
 
 		if (!buff_size)
 			return 0;
 
-		if (snprintf(reorder_buff_name, RTE_CRYPTODEV_NAME_MAX_LEN,
+		if (snprintf(order_ring_name, RTE_CRYPTODEV_NAME_MAX_LEN,
 			"%s_rb_%u_%u", RTE_STR(CRYPTODEV_NAME_SCHEDULER_PMD),
 			dev->data->dev_id, qp_id) < 0) {
 			CS_LOG_ERR("failed to create unique reorder buffer "
@@ -88,16 +89,17 @@ update_reorder_buff(struct rte_cryptodev *dev, uint16_t qp_id)
 			return -ENOMEM;
 		}
 
-		qp_ctx->reorder_buf = rte_reorder_create(reorder_buff_name,
-				rte_socket_id(), buff_size);
-		if (!qp_ctx->reorder_buf) {
-			CS_LOG_ERR("failed to create reorder buffer");
+		qp_ctx->order_ring = rte_ring_create(order_ring_name,
+				buff_size, rte_socket_id(),
+				RING_F_SP_ENQ | RING_F_SC_DEQ);
+		if (!qp_ctx->order_ring) {
+			CS_LOG_ERR("failed to create order ring");
 			return -ENOMEM;
 		}
 	} else {
-		if (qp_ctx->reorder_buf) {
-			rte_reorder_free(qp_ctx->reorder_buf);
-			qp_ctx->reorder_buf = NULL;
+		if (qp_ctx->order_ring) {
+			rte_ring_free(qp_ctx->order_ring);
+			qp_ctx->order_ring = NULL;
 		}
 	}
 
@@ -116,7 +118,7 @@ scheduler_pmd_start(struct rte_cryptodev *dev)
 		return 0;
 
 	for (i = 0; i < dev->data->nb_queue_pairs; i++) {
-		ret = update_reorder_buff(dev, i);
+		ret = update_order_ring(dev, i);
 		if (ret < 0) {
 			CS_LOG_ERR("Failed to update reorder buffer");
 			return ret;
@@ -224,9 +226,9 @@ scheduler_pmd_close(struct rte_cryptodev *dev)
 	for (i = 0; i < dev->data->nb_queue_pairs; i++) {
 		struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];
 
-		if (qp_ctx->reorder_buf) {
-			rte_reorder_free(qp_ctx->reorder_buf);
-			qp_ctx->reorder_buf = NULL;
+		if (qp_ctx->order_ring) {
+			rte_ring_free(qp_ctx->order_ring);
+			qp_ctx->order_ring = NULL;
 		}
 
 		if (qp_ctx->private_qp_ctx) {
@@ -324,8 +326,8 @@ scheduler_pmd_qp_release(struct rte_cryptodev *dev, uint16_t qp_id)
 	if (!qp_ctx)
 		return 0;
 
-	if (qp_ctx->reorder_buf)
-		rte_reorder_free(qp_ctx->reorder_buf);
+	if (qp_ctx->order_ring)
+		rte_ring_free(qp_ctx->order_ring);
 	if (qp_ctx->private_qp_ctx)
 		rte_free(qp_ctx->private_qp_ctx);
 
diff --git a/drivers/crypto/scheduler/scheduler_pmd_private.h b/drivers/crypto/scheduler/scheduler_pmd_private.h
index ac4690e..f5348f6 100644
--- a/drivers/crypto/scheduler/scheduler_pmd_private.h
+++ b/drivers/crypto/scheduler/scheduler_pmd_private.h
@@ -34,9 +34,7 @@
 #ifndef _SCHEDULER_PMD_PRIVATE_H
 #define _SCHEDULER_PMD_PRIVATE_H
 
-#include <rte_hash.h>
-#include <rte_reorder.h>
-#include <rte_cryptodev_scheduler.h>
+#include "rte_cryptodev_scheduler.h"
 
 /**< Maximum number of bonded devices per devices */
 #ifndef MAX_SLAVES_NUM
@@ -101,7 +99,7 @@ struct scheduler_qp_ctx {
 	rte_cryptodev_scheduler_burst_enqueue_t schedule_enqueue;
 	rte_cryptodev_scheduler_burst_dequeue_t schedule_dequeue;
 
-	struct rte_reorder_buffer *reorder_buf;
+	struct rte_ring *order_ring;
 	uint32_t seqn;
 } __rte_cache_aligned;
 
@@ -109,6 +107,49 @@ struct scheduler_session {
 	struct rte_cryptodev_sym_session *sessions[MAX_SLAVES_NUM];
 };
 
+static inline uint16_t __attribute__((always_inline))
+get_max_enqueue_order_count(struct rte_ring *order_ring, uint16_t nb_ops)
+{
+	uint32_t count = rte_ring_free_count(order_ring);
+
+	return count > nb_ops ? nb_ops : count;
+}
+
+static inline void __attribute__((always_inline))
+scheduler_order_insert(struct rte_ring *order_ring,
+		struct rte_crypto_op **ops, uint16_t nb_ops)
+{
+	rte_ring_sp_enqueue_burst(order_ring, (void **)ops, nb_ops);
+}
+
+#define SCHEDULER_GET_RING_OBJ(order_ring, pos)		\
+	order_ring->ring[(order_ring->cons.head + pos) & order_ring->prod.mask]
+
+static inline uint16_t __attribute__((always_inline))
+scheduler_order_drain(struct rte_ring *order_ring,
+		struct rte_crypto_op **ops, uint16_t nb_ops)
+{
+	struct rte_crypto_op *op;
+	uint32_t nb_objs = rte_ring_count(order_ring);
+	uint32_t nb_ops_to_deq = 0;
+	int status = -1;
+
+	if (nb_objs > nb_ops)
+		nb_objs = nb_ops;
+
+	while (nb_ops_to_deq < nb_objs) {
+		op = SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq);
+		if (op->status == RTE_CRYPTO_OP_STATUS_NOT_PROCESSED)
+			break;
+		nb_ops_to_deq++;
+	}
+
+	if (nb_ops_to_deq)
+		status = rte_ring_sc_dequeue_bulk(order_ring, (void **)ops,
+				nb_ops_to_deq);
+
+	return (status == 0) ? nb_ops_to_deq : 0;
+}
 /** device specific operations function pointer structure */
 extern struct rte_cryptodev_ops *rte_crypto_scheduler_pmd_ops;
 
diff --git a/drivers/crypto/scheduler/scheduler_roundrobin.c b/drivers/crypto/scheduler/scheduler_roundrobin.c
index 9545aa9..52f8c5e 100644
--- a/drivers/crypto/scheduler/scheduler_roundrobin.c
+++ b/drivers/crypto/scheduler/scheduler_roundrobin.c
@@ -115,80 +115,16 @@ static uint16_t
 schedule_enqueue_ordering(void *qp_ctx, struct rte_crypto_op **ops,
 		uint16_t nb_ops)
 {
-	struct scheduler_qp_ctx *gen_qp_ctx = qp_ctx;
-	struct rr_scheduler_qp_ctx *rr_qp_ctx =
-			gen_qp_ctx->private_qp_ctx;
-	uint32_t slave_idx = rr_qp_ctx->last_enq_slave_idx;
-	struct scheduler_slave *slave = &rr_qp_ctx->slaves[slave_idx];
-	uint16_t i, processed_ops;
-	struct rte_cryptodev_sym_session *sessions[nb_ops];
-	struct scheduler_session *sess0, *sess1, *sess2, *sess3;
-
-	if (unlikely(nb_ops == 0))
-		return 0;
-
-	for (i = 0; i < nb_ops && i < 4; i++) {
-		rte_prefetch0(ops[i]->sym->session);
-		rte_prefetch0(ops[i]->sym->m_src);
-	}
-
-	for (i = 0; (i < (nb_ops - 8)) && (nb_ops > 8); i += 4) {
-		sess0 = (struct scheduler_session *)
-				ops[i]->sym->session->_private;
-		sess1 = (struct scheduler_session *)
-				ops[i+1]->sym->session->_private;
-		sess2 = (struct scheduler_session *)
-				ops[i+2]->sym->session->_private;
-		sess3 = (struct scheduler_session *)
-				ops[i+3]->sym->session->_private;
-
-		sessions[i] = ops[i]->sym->session;
-		sessions[i + 1] = ops[i + 1]->sym->session;
-		sessions[i + 2] = ops[i + 2]->sym->session;
-		sessions[i + 3] = ops[i + 3]->sym->session;
-
-		ops[i]->sym->session = sess0->sessions[slave_idx];
-		ops[i]->sym->m_src->seqn = gen_qp_ctx->seqn++;
-		ops[i + 1]->sym->session = sess1->sessions[slave_idx];
-		ops[i + 1]->sym->m_src->seqn = gen_qp_ctx->seqn++;
-		ops[i + 2]->sym->session = sess2->sessions[slave_idx];
-		ops[i + 2]->sym->m_src->seqn = gen_qp_ctx->seqn++;
-		ops[i + 3]->sym->session = sess3->sessions[slave_idx];
-		ops[i + 3]->sym->m_src->seqn = gen_qp_ctx->seqn++;
-
-		rte_prefetch0(ops[i + 4]->sym->session);
-		rte_prefetch0(ops[i + 4]->sym->m_src);
-		rte_prefetch0(ops[i + 5]->sym->session);
-		rte_prefetch0(ops[i + 5]->sym->m_src);
-		rte_prefetch0(ops[i + 6]->sym->session);
-		rte_prefetch0(ops[i + 6]->sym->m_src);
-		rte_prefetch0(ops[i + 7]->sym->session);
-		rte_prefetch0(ops[i + 7]->sym->m_src);
-	}
-
-	for (; i < nb_ops; i++) {
-		sess0 = (struct scheduler_session *)
-				ops[i]->sym->session->_private;
-		sessions[i] = ops[i]->sym->session;
-		ops[i]->sym->session = sess0->sessions[slave_idx];
-		ops[i]->sym->m_src->seqn = gen_qp_ctx->seqn++;
-	}
-
-	processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id,
-			slave->qp_id, ops, nb_ops);
-
-	slave->nb_inflight_cops += processed_ops;
+	struct rte_ring *order_ring =
+			((struct scheduler_qp_ctx *)qp_ctx)->order_ring;
+	uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring,
+			nb_ops);
+	uint16_t nb_ops_enqd = schedule_enqueue(qp_ctx, ops,
+			nb_ops_to_enq);
 
-	rr_qp_ctx->last_enq_slave_idx += 1;
-	rr_qp_ctx->last_enq_slave_idx %= rr_qp_ctx->nb_slaves;
+	scheduler_order_insert(order_ring, ops, nb_ops_enqd);
 
-	/* recover session if enqueue is failed */
-	if (unlikely(processed_ops < nb_ops)) {
-		for (i = processed_ops; i < nb_ops; i++)
-			ops[i]->sym->session = sessions[i];
-	}
-
-	return processed_ops;
+	return nb_ops_enqd;
 }
 
 
@@ -233,105 +169,12 @@ static uint16_t
 schedule_dequeue_ordering(void *qp_ctx, struct rte_crypto_op **ops,
 		uint16_t nb_ops)
 {
-	struct scheduler_qp_ctx *gen_qp_ctx = (struct scheduler_qp_ctx *)qp_ctx;
-	struct rr_scheduler_qp_ctx *rr_qp_ctx = (gen_qp_ctx->private_qp_ctx);
-	struct scheduler_slave *slave;
-	struct rte_reorder_buffer *reorder_buff = gen_qp_ctx->reorder_buf;
-	struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3;
-	uint16_t nb_deq_ops, nb_drained_mbufs;
-	const uint16_t nb_op_ops = nb_ops;
-	struct rte_crypto_op *op_ops[nb_op_ops];
-	struct rte_mbuf *reorder_mbufs[nb_op_ops];
-	uint32_t last_slave_idx = rr_qp_ctx->last_deq_slave_idx;
-	uint16_t i;
+	struct rte_ring *order_ring =
+			((struct scheduler_qp_ctx *)qp_ctx)->order_ring;
 
-	if (unlikely(rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops == 0)) {
-		do {
-			last_slave_idx += 1;
-
-			if (unlikely(last_slave_idx >= rr_qp_ctx->nb_slaves))
-				last_slave_idx = 0;
-			/* looped back, means no inflight cops in the queue */
-			if (last_slave_idx == rr_qp_ctx->last_deq_slave_idx)
-				return 0;
-		} while (rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops
-				== 0);
-	}
-
-	slave = &rr_qp_ctx->slaves[last_slave_idx];
-
-	nb_deq_ops = rte_cryptodev_dequeue_burst(slave->dev_id,
-			slave->qp_id, op_ops, nb_ops);
-
-	rr_qp_ctx->last_deq_slave_idx += 1;
-	rr_qp_ctx->last_deq_slave_idx %= rr_qp_ctx->nb_slaves;
-
-	slave->nb_inflight_cops -= nb_deq_ops;
-
-	for (i = 0; i < nb_deq_ops && i < 4; i++)
-		rte_prefetch0(op_ops[i]->sym->m_src);
-
-	for (i = 0; (i < (nb_deq_ops - 8)) && (nb_deq_ops > 8); i += 4) {
-		mbuf0 = op_ops[i]->sym->m_src;
-		mbuf1 = op_ops[i + 1]->sym->m_src;
-		mbuf2 = op_ops[i + 2]->sym->m_src;
-		mbuf3 = op_ops[i + 3]->sym->m_src;
-
-		mbuf0->userdata = op_ops[i];
-		mbuf1->userdata = op_ops[i + 1];
-		mbuf2->userdata = op_ops[i + 2];
-		mbuf3->userdata = op_ops[i + 3];
-
-		rte_reorder_insert(reorder_buff, mbuf0);
-		rte_reorder_insert(reorder_buff, mbuf1);
-		rte_reorder_insert(reorder_buff, mbuf2);
-		rte_reorder_insert(reorder_buff, mbuf3);
-
-		rte_prefetch0(op_ops[i + 4]->sym->m_src);
-		rte_prefetch0(op_ops[i + 5]->sym->m_src);
-		rte_prefetch0(op_ops[i + 6]->sym->m_src);
-		rte_prefetch0(op_ops[i + 7]->sym->m_src);
-	}
-
-	for (; i < nb_deq_ops; i++) {
-		mbuf0 = op_ops[i]->sym->m_src;
-		mbuf0->userdata = op_ops[i];
-		rte_reorder_insert(reorder_buff, mbuf0);
-	}
-
-	nb_drained_mbufs = rte_reorder_drain(reorder_buff, reorder_mbufs,
-			nb_ops);
-	for (i = 0; i < nb_drained_mbufs && i < 4; i++)
-		rte_prefetch0(reorder_mbufs[i]);
-
-	for (i = 0; (i < (nb_drained_mbufs - 8)) && (nb_drained_mbufs > 8);
-			i += 4) {
-		ops[i] = *(struct rte_crypto_op **)reorder_mbufs[i]->userdata;
-		ops[i + 1] = *(struct rte_crypto_op **)
-			reorder_mbufs[i + 1]->userdata;
-		ops[i + 2] = *(struct rte_crypto_op **)
-			reorder_mbufs[i + 2]->userdata;
-		ops[i + 3] = *(struct rte_crypto_op **)
-			reorder_mbufs[i + 3]->userdata;
-
-		reorder_mbufs[i]->userdata = NULL;
-		reorder_mbufs[i + 1]->userdata = NULL;
-		reorder_mbufs[i + 2]->userdata = NULL;
-		reorder_mbufs[i + 3]->userdata = NULL;
-
-		rte_prefetch0(reorder_mbufs[i + 4]);
-		rte_prefetch0(reorder_mbufs[i + 5]);
-		rte_prefetch0(reorder_mbufs[i + 6]);
-		rte_prefetch0(reorder_mbufs[i + 7]);
-	}
-
-	for (; i < nb_drained_mbufs; i++) {
-		ops[i] = *(struct rte_crypto_op **)
-			reorder_mbufs[i]->userdata;
-		reorder_mbufs[i]->userdata = NULL;
-	}
+	schedule_dequeue(qp_ctx, ops, nb_ops);
 
-	return nb_drained_mbufs;
+	return scheduler_order_drain(order_ring, ops, nb_ops);
 }
 
 static int
-- 
2.7.4



More information about the dev mailing list