[v3,1/2] eventdev/crypto_adapter: move crypto ops to circular buffer
Checks
Commit Message
Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full
---
v3:
* update eca_ops_buffer_flush() to flush out all the crypto
ops out of circular buffer.
* remove freeing of failed crypto ops from eca_ops_enqueue_burst()
and add to cirular buffer for later processing.
v2:
* reset cryptp adapter next cdev id before dequeueing from the
next cdev
---
Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
Comments
Hi Ganapati,
> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> Sent: Tuesday, January 11, 2022 4:07 PM
> To: jerinjacobk@gmail.com; Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> dev@dpdk.org
> Cc: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> Subject: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular
> buffer
>
> Move crypto ops to circular buffer to retain crypto ops when
> cryptodev/eventdev are temporarily full
>
> ---
> v3:
> * update eca_ops_buffer_flush() to flush out all the crypto
> ops out of circular buffer.
> * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
> and add to cirular buffer for later processing.
>
> v2:
> * reset cryptp adapter next cdev id before dequeueing from the
Cryptp -> crypto
> next cdev
> ---
>
> Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
I don't see sign off after applying the patch. Could you take a look?
commit 18d9c3b1728f325dba5fe5dbb51df2366b70527a
Author: Ganapati Kundapura <ganapati.kundapura@intel.com>
Date: Tue Jan 11 04:36:30 2022 -0600
eventdev/crypto_adapter: move crypto ops to circular buffer
Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full
>
> diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> b/lib/eventdev/rte_event_crypto_adapter.c
> index d840803..9086368 100644
> --- a/lib/eventdev/rte_event_crypto_adapter.c
> +++ b/lib/eventdev/rte_event_crypto_adapter.c
> @@ -25,11 +25,27 @@
> #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
> #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
>
> +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> #define
> +CRYPTO_ADAPTER_BUFFER_SZ 1024
> +
> /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
> * iterations of eca_crypto_adapter_enq_run()
> */
> #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
>
> +struct crypto_ops_circular_buffer {
> + /* index of head element in circular buffer */
> + uint16_t head;
> + /* index of tail element in circular buffer */
> + uint16_t tail;
> + /* number elements in buffer */
number elements -> number of elements
> + uint16_t count;
> + /* size of circular buffer */
> + uint16_t size;
> + /* Pointer to hold rte_crypto_ops for batching */
> + struct rte_crypto_op **op_buffer;
> +} __rte_cache_aligned;
> +
> struct event_crypto_adapter {
> /* Event device identifier */
> uint8_t eventdev_id;
> @@ -47,6 +63,8 @@ struct event_crypto_adapter {
> struct crypto_device_info *cdevs;
> /* Loop counter to flush crypto ops */
> uint16_t transmit_loop_count;
> + /* Circular buffer for batching crypto ops to eventdev */
> + struct crypto_ops_circular_buffer ebuf;
> /* Per instance stats structure */
> struct rte_event_crypto_adapter_stats crypto_stats;
> /* Configuration callback for rte_service configuration */ @@ -93,8
> +111,8 @@ struct crypto_device_info { struct crypto_queue_pair_info {
> /* Set to indicate queue pair is enabled */
> bool qp_enabled;
> - /* Pointer to hold rte_crypto_ops for batching */
> - struct rte_crypto_op **op_buffer;
> + /* Circular buffer for batching crypto ops to cdev */
> + struct crypto_ops_circular_buffer cbuf;
> /* No of crypto ops accumulated */
> uint8_t len;
> } __rte_cache_aligned;
> @@ -141,6 +159,77 @@ eca_init(void)
> return 0;
> }
>
> +static inline bool
> +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> +*bufp) {
> + return bufp->count >= BATCH_SIZE;
> +}
> +
> +static inline void
> +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> + rte_free(bufp->op_buffer);
> +}
> +
> +static inline int
> +eca_circular_buffer_init(const char *name,
> + struct crypto_ops_circular_buffer *bufp,
> + uint16_t sz)
> +{
> + bufp->op_buffer = rte_zmalloc(name,
> + sizeof(struct rte_crypto_op *) * sz,
> + 0);
> + if (bufp->op_buffer == NULL)
> + return -ENOMEM;
> +
> + bufp->size = sz;
> + return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> + struct rte_crypto_op *op)
> +{
> + uint16_t *tailp = &bufp->tail;
> +
> + bufp->op_buffer[*tailp] = op;
> + *tailp = (*tailp + 1) % bufp->size;
Provide a comment saying you are taking care of buffer rollover condition
> + bufp->count++;
> +
> + return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
> + uint8_t cdev_id, uint16_t qp_id,
> + uint16_t *nb_ops_flushed)
This function is returning a value but caller does not check!
> +{
> + uint16_t n = 0;
> + uint16_t *headp = &bufp->head;
> + uint16_t *tailp = &bufp->tail;
> + struct rte_crypto_op **ops = bufp->op_buffer;
> +
> + if (*tailp > *headp)
> + n = *tailp - *headp;
> + else if (*tailp < *headp)
> + n = bufp->size - *headp;
> + else {
> + *nb_ops_flushed = 0;
> + return 0; /* buffer empty */
> + }
> +
> + *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> + &ops[*headp], n);
> + bufp->count -= *nb_ops_flushed;
> + if (!bufp->count) {
> + *headp = 0;
> + *tailp = 0;
> + } else
> + *headp = (*headp + *nb_ops_flushed) % bufp->size;
> +
> + return *nb_ops_flushed == n ? 0 : -1;
> +}
> +
> static inline struct event_crypto_adapter * eca_id_to_adapter(uint8_t id) {
> @@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
> return -ENOMEM;
> }
>
> + if (eca_circular_buffer_init("eca_edev_circular_buffer",
> + &adapter->ebuf,
> + CRYPTO_ADAPTER_BUFFER_SZ)) {
> + RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
Mem -> memory, edev -> eventdev.
> + rte_free(adapter);
> + return -ENOMEM;
> + }
> +
> ret = rte_event_dev_info_get(dev_id, &dev_info);
> if (ret < 0) {
> RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
> dev_id, dev_info.driver_name);
> + eca_circular_buffer_free(&adapter->ebuf);
> rte_free(adapter);
> return ret;
> }
> @@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
> socket_id);
> if (adapter->cdevs == NULL) {
> RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
> + eca_circular_buffer_free(&adapter->ebuf);
> rte_free(adapter);
> return -ENOMEM;
> }
> @@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> struct crypto_queue_pair_info *qp_info = NULL;
> struct rte_crypto_op *crypto_op;
> unsigned int i, n;
> - uint16_t qp_id, len, ret;
> + uint16_t qp_id, len, nb_enqueued = 0;
> uint8_t cdev_id;
>
> len = 0;
> - ret = 0;
> n = 0;
> stats->event_deq_count += cnt;
>
> @@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> continue;
> }
> len = qp_info->len;
> - qp_info->op_buffer[len] = crypto_op;
> + eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
> len++;
> } else if (crypto_op->sess_type ==
> RTE_CRYPTO_OP_SESSIONLESS &&
> crypto_op->private_data_offset) {
> @@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> continue;
> }
> len = qp_info->len;
> - qp_info->op_buffer[len] = crypto_op;
> + eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
> len++;
> } else {
> rte_pktmbuf_free(crypto_op->sym->m_src);
> @@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> continue;
> }
>
> - if (len == BATCH_SIZE) {
> - struct rte_crypto_op **op_buffer = qp_info-
> >op_buffer;
> - ret = rte_cryptodev_enqueue_burst(cdev_id,
> + if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> + eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
> + cdev_id,
> qp_id,
> - op_buffer,
> - BATCH_SIZE);
> + &nb_enqueued);
> + stats->crypto_enq_count += nb_enqueued;
> + n += nb_enqueued;
>
> - stats->crypto_enq_count += ret;
> -
> - while (ret < len) {
> + while (nb_enqueued < len) {
> struct rte_crypto_op *op;
> - op = op_buffer[ret++];
> + op = qp_info-
> >cbuf.op_buffer[nb_enqueued++];
> stats->crypto_enq_fail++;
> rte_pktmbuf_free(op->sym->m_src);
> rte_crypto_op_free(op);
Not sure, why are you free the ops which are not enqueued to cryptodev.
Isn't it the goal of the patch is to retain those non-enqueued crypto_ops temporarily and retry later?
> @@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>
> if (qp_info)
> qp_info->len = len;
> - n += ret;
> }
>
> return n;
> @@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
> struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
> struct crypto_device_info *curr_dev;
> struct crypto_queue_pair_info *curr_queue;
> - struct rte_crypto_op **op_buffer;
> struct rte_cryptodev *dev;
> uint8_t cdev_id;
> uint16_t qp;
> - uint16_t ret;
> + uint16_t nb_enqueued = 0, nb = 0;
> uint16_t num_cdev = rte_cryptodev_count();
>
> - ret = 0;
> for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> curr_dev = &adapter->cdevs[cdev_id];
> dev = curr_dev->dev;
> @@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
> if (!curr_queue->qp_enabled)
> continue;
>
> - op_buffer = curr_queue->op_buffer;
> - ret = rte_cryptodev_enqueue_burst(cdev_id,
> + eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> + cdev_id,
> qp,
> - op_buffer,
> - curr_queue->len);
> - stats->crypto_enq_count += ret;
> + &nb_enqueued);
>
> - while (ret < curr_queue->len) {
> + stats->crypto_enq_count += nb_enqueued;
> + nb += nb_enqueued;
> +
> + while (nb_enqueued < curr_queue->len) {
> struct rte_crypto_op *op;
> - op = op_buffer[ret++];
> + op = curr_queue-
> >cbuf.op_buffer[nb_enqueued++];
> stats->crypto_enq_fail++;
> rte_pktmbuf_free(op->sym->m_src);
> rte_crypto_op_free(op);
> @@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
> }
> }
>
> - return ret;
> + return nb;
> }
>
> static int
> @@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
> return nb_enqueued;
> }
>
> -static inline void
> +static inline uint16_t
> eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> - struct rte_crypto_op **ops, uint16_t num)
> + struct rte_crypto_op **ops, uint16_t num)
> {
> struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
> union rte_event_crypto_metadata *m_data = NULL; @@ -518,6 +613,8
> @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> num = RTE_MIN(num, BATCH_SIZE);
> for (i = 0; i < num; i++) {
> struct rte_event *ev = &events[nb_ev++];
> +
> + m_data = NULL;
> if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
> m_data = rte_cryptodev_sym_session_get_user_data(
> ops[i]->sym->session);
> @@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct event_crypto_adapter
> *adapter,
> event_port_id,
> &events[nb_enqueued],
> nb_ev - nb_enqueued);
> +
> } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
> nb_enqueued < nb_ev);
>
> - /* Free mbufs and rte_crypto_ops for failed events */
> - for (i = nb_enqueued; i < nb_ev; i++) {
> - struct rte_crypto_op *op = events[i].event_ptr;
> - rte_pktmbuf_free(op->sym->m_src);
> - rte_crypto_op_free(op);
> - }
> -
> stats->event_enq_fail_count += nb_ev - nb_enqueued;
> stats->event_enq_count += nb_enqueued;
> stats->event_enq_retry_count += retry - 1;
> +
> + return nb_enqueued;
> }
>
> +static int
> +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
> + struct crypto_ops_circular_buffer *bufp) {
> + uint16_t n = 0, nb_ops_flushed;
> + uint16_t *headp = &bufp->head;
> + uint16_t *tailp = &bufp->tail;
> + struct rte_crypto_op **ops = bufp->op_buffer;
> +
> + if (*tailp > *headp)
> + n = *tailp - *headp;
> + else if (*tailp < *headp)
> + n = bufp->size - *headp;
> + else
> + return 0; /* buffer empty */
> +
> + nb_ops_flushed = eca_ops_enqueue_burst(adapter, ops, n);
> + bufp->count -= nb_ops_flushed;
> + if (!bufp->count) {
> + *headp = 0;
> + *tailp = 0;
> +
Extra line not required
> + return 0; /* buffer empty */
> + }
> +
> + *headp = (*headp + nb_ops_flushed) % bufp->size;
> +
Extra line not required
> + return 1;
> +}
> +
> +
> +static void
> +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> + if (adapter->ebuf.count == 0)
> + return;
> +
> + while (eca_circular_buffer_flush_to_evdev(adapter,
> + &adapter->ebuf))
> + ;
> +}
> static inline unsigned int
> eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> unsigned int max_deq)
> @@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> struct crypto_device_info *curr_dev;
> struct crypto_queue_pair_info *curr_queue;
> struct rte_crypto_op *ops[BATCH_SIZE];
> - uint16_t n, nb_deq;
> + uint16_t n, nb_deq, nb_enqueued, i;
> struct rte_cryptodev *dev;
> uint8_t cdev_id;
> uint16_t qp, dev_qps;
> @@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> uint16_t num_cdev = rte_cryptodev_count();
>
> nb_deq = 0;
> + eca_ops_buffer_flush(adapter);
> +
> do {
> - uint16_t queues = 0;
> done = true;
>
> for (cdev_id = adapter->next_cdev_id;
> cdev_id < num_cdev; cdev_id++) {
> + uint16_t queues = 0;
> +
> curr_dev = &adapter->cdevs[cdev_id];
> dev = curr_dev->dev;
> if (dev == NULL)
> continue;
> +
> dev_qps = dev->data->nb_queue_pairs;
>
> for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> *adapter,
> queues++) {
>
> curr_queue = &curr_dev->qpairs[qp];
> - if (!curr_queue->qp_enabled)
> + if (curr_queue == NULL ||
> + !curr_queue->qp_enabled)
> continue;
>
> n = rte_cryptodev_dequeue_burst(cdev_id, qp,
> @@ -605,11 +744,27 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> continue;
>
> done = false;
> + nb_enqueued = 0;
> +
> stats->crypto_deq_count += n;
> - eca_ops_enqueue_burst(adapter, ops, n);
> +
> + if (unlikely(!adapter->ebuf.count))
> + nb_enqueued =
> eca_ops_enqueue_burst(
> + adapter, ops, n);
> +
> + if (nb_enqueued == n)
> + goto check;
> +
> + /* Failed to enqueue events case */
> + for (i = nb_enqueued; i < n; i++)
> + eca_circular_buffer_add(
> + &adapter->ebuf,
> + ops[nb_enqueued]);
> +
> +check:
> nb_deq += n;
>
> - if (nb_deq > max_deq) {
> + if (nb_deq >= max_deq) {
> if ((qp + 1) == dev_qps) {
> adapter->next_cdev_id =
> (cdev_id + 1)
> @@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> }
> }
> }
> + adapter->next_cdev_id = 0;
> } while (done == false);
> return nb_deq;
> }
> @@ -751,11 +907,10 @@ eca_add_queue_pair(struct event_crypto_adapter
> *adapter, uint8_t cdev_id,
> return -ENOMEM;
>
> qpairs = dev_info->qpairs;
> - qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
> - BATCH_SIZE *
> - sizeof(struct rte_crypto_op *),
> - 0, adapter->socket_id);
> - if (!qpairs->op_buffer) {
> +
> + if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> + &qpairs->cbuf,
> +
> CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> rte_free(qpairs);
> return -ENOMEM;
> }
> --
> 2.6.4
Hi Abhi,
> -----Original Message-----
> From: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> Sent: 13 January 2022 16:36
> To: Kundapura, Ganapati <ganapati.kundapura@intel.com>;
> jerinjacobk@gmail.com; Jayatheerthan, Jay <jay.jayatheerthan@intel.com>;
> dev@dpdk.org
> Subject: RE: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to
> circular buffer
>
> Hi Ganapati,
>
>
> > -----Original Message-----
> > From: Kundapura, Ganapati <ganapati.kundapura@intel.com>
> > Sent: Tuesday, January 11, 2022 4:07 PM
> > To: jerinjacobk@gmail.com; Jayatheerthan, Jay
> > <jay.jayatheerthan@intel.com>; dev@dpdk.org
> > Cc: Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>
> > Subject: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to
> > circular buffer
> >
> > Move crypto ops to circular buffer to retain crypto ops when
> > cryptodev/eventdev are temporarily full
> >
> > ---
> > v3:
> > * update eca_ops_buffer_flush() to flush out all the crypto
> > ops out of circular buffer.
> > * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
> > and add to cirular buffer for later processing.
> >
> > v2:
> > * reset cryptp adapter next cdev id before dequeueing from the
> Cryptp -> crypto
Updated in v4
> > next cdev
> > ---
> >
> > Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
> I don't see sign off after applying the patch. Could you take a look?
> commit 18d9c3b1728f325dba5fe5dbb51df2366b70527a
Moved signed-off before version history
> Author: Ganapati Kundapura <ganapati.kundapura@intel.com>
> Date: Tue Jan 11 04:36:30 2022 -0600
>
> eventdev/crypto_adapter: move crypto ops to circular buffer
>
> Move crypto ops to circular buffer to retain crypto
> ops when cryptodev/eventdev are temporarily full
>
>
> >
> > diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> > b/lib/eventdev/rte_event_crypto_adapter.c
> > index d840803..9086368 100644
> > --- a/lib/eventdev/rte_event_crypto_adapter.c
> > +++ b/lib/eventdev/rte_event_crypto_adapter.c
> > @@ -25,11 +25,27 @@
> > #define CRYPTO_ADAPTER_MEM_NAME_LEN 32 #define
> > CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> >
> > +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> > #define
> > +CRYPTO_ADAPTER_BUFFER_SZ 1024
> > +
> > /* Flush an instance's enqueue buffers every
> CRYPTO_ENQ_FLUSH_THRESHOLD
> > * iterations of eca_crypto_adapter_enq_run()
> > */
> > #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> >
> > +struct crypto_ops_circular_buffer {
> > + /* index of head element in circular buffer */
> > + uint16_t head;
> > + /* index of tail element in circular buffer */
> > + uint16_t tail;
> > + /* number elements in buffer */
> number elements -> number of elements
updated
> > + uint16_t count;
> > + /* size of circular buffer */
> > + uint16_t size;
> > + /* Pointer to hold rte_crypto_ops for batching */
> > + struct rte_crypto_op **op_buffer;
> > +} __rte_cache_aligned;
> > +
> > struct event_crypto_adapter {
> > /* Event device identifier */
> > uint8_t eventdev_id;
> > @@ -47,6 +63,8 @@ struct event_crypto_adapter {
> > struct crypto_device_info *cdevs;
> > /* Loop counter to flush crypto ops */
> > uint16_t transmit_loop_count;
> > + /* Circular buffer for batching crypto ops to eventdev */
> > + struct crypto_ops_circular_buffer ebuf;
> > /* Per instance stats structure */
> > struct rte_event_crypto_adapter_stats crypto_stats;
> > /* Configuration callback for rte_service configuration */ @@ -93,8
> > +111,8 @@ struct crypto_device_info { struct crypto_queue_pair_info {
> > /* Set to indicate queue pair is enabled */
> > bool qp_enabled;
> > - /* Pointer to hold rte_crypto_ops for batching */
> > - struct rte_crypto_op **op_buffer;
> > + /* Circular buffer for batching crypto ops to cdev */
> > + struct crypto_ops_circular_buffer cbuf;
> > /* No of crypto ops accumulated */
> > uint8_t len;
> > } __rte_cache_aligned;
> > @@ -141,6 +159,77 @@ eca_init(void)
> > return 0;
> > }
> >
> > +static inline bool
> > +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> > +*bufp) {
> > + return bufp->count >= BATCH_SIZE;
> > +}
> > +
> > +static inline void
> > +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> > + rte_free(bufp->op_buffer);
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_init(const char *name,
> > + struct crypto_ops_circular_buffer *bufp,
> > + uint16_t sz)
> > +{
> > + bufp->op_buffer = rte_zmalloc(name,
> > + sizeof(struct rte_crypto_op *) * sz,
> > + 0);
> > + if (bufp->op_buffer == NULL)
> > + return -ENOMEM;
> > +
> > + bufp->size = sz;
> > + return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> > + struct rte_crypto_op *op)
> > +{
> > + uint16_t *tailp = &bufp->tail;
> > +
> > + bufp->op_buffer[*tailp] = op;
> > + *tailp = (*tailp + 1) % bufp->size;
> Provide a comment saying you are taking care of buffer rollover condition
> > + bufp->count++;
> > +
> > + return 0;
> > +}
> > +
> > +static inline int
> > +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer
> *bufp,
> > + uint8_t cdev_id, uint16_t qp_id,
> > + uint16_t *nb_ops_flushed)
> This function is returning a value but caller does not check!
> > +{
> > + uint16_t n = 0;
> > + uint16_t *headp = &bufp->head;
> > + uint16_t *tailp = &bufp->tail;
> > + struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > + if (*tailp > *headp)
> > + n = *tailp - *headp;
> > + else if (*tailp < *headp)
> > + n = bufp->size - *headp;
> > + else {
> > + *nb_ops_flushed = 0;
> > + return 0; /* buffer empty */
> > + }
> > +
> > + *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> > + &ops[*headp], n);
> > + bufp->count -= *nb_ops_flushed;
> > + if (!bufp->count) {
> > + *headp = 0;
> > + *tailp = 0;
> > + } else
> > + *headp = (*headp + *nb_ops_flushed) % bufp->size;
> > +
> > + return *nb_ops_flushed == n ? 0 : -1; }
> > +
> > static inline struct event_crypto_adapter *
> > eca_id_to_adapter(uint8_t id) { @@ -237,10 +326,19 @@
> > rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
> > return -ENOMEM;
> > }
> >
> > + if (eca_circular_buffer_init("eca_edev_circular_buffer",
> > + &adapter->ebuf,
> > + CRYPTO_ADAPTER_BUFFER_SZ)) {
> > + RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
> Mem -> memory, edev -> eventdev.
Updated in v4
> > + rte_free(adapter);
> > + return -ENOMEM;
> > + }
> > +
> > ret = rte_event_dev_info_get(dev_id, &dev_info);
> > if (ret < 0) {
> > RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d:
> %s!",
> > dev_id, dev_info.driver_name);
> > + eca_circular_buffer_free(&adapter->ebuf);
> > rte_free(adapter);
> > return ret;
> > }
> > @@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> > uint8_t dev_id,
> > socket_id);
> > if (adapter->cdevs == NULL) {
> > RTE_EDEV_LOG_ERR("Failed to get mem for crypto
> devices\n");
> > + eca_circular_buffer_free(&adapter->ebuf);
> > rte_free(adapter);
> > return -ENOMEM;
> > }
> > @@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> > struct crypto_queue_pair_info *qp_info = NULL;
> > struct rte_crypto_op *crypto_op;
> > unsigned int i, n;
> > - uint16_t qp_id, len, ret;
> > + uint16_t qp_id, len, nb_enqueued = 0;
> > uint8_t cdev_id;
> >
> > len = 0;
> > - ret = 0;
> > n = 0;
> > stats->event_deq_count += cnt;
> >
> > @@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> > continue;
> > }
> > len = qp_info->len;
> > - qp_info->op_buffer[len] = crypto_op;
> > + eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
> > len++;
> > } else if (crypto_op->sess_type ==
> > RTE_CRYPTO_OP_SESSIONLESS &&
> > crypto_op->private_data_offset) { @@ -
> 383,7 +481,7 @@
> > eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct
> > rte_event *ev,
> > continue;
> > }
> > len = qp_info->len;
> > - qp_info->op_buffer[len] = crypto_op;
> > + eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
> > len++;
> > } else {
> > rte_pktmbuf_free(crypto_op->sym->m_src);
> > @@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> > continue;
> > }
> >
> > - if (len == BATCH_SIZE) {
> > - struct rte_crypto_op **op_buffer = qp_info-
> > >op_buffer;
> > - ret = rte_cryptodev_enqueue_burst(cdev_id,
> > + if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> > + eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
> > + cdev_id,
> > qp_id,
> > - op_buffer,
> > - BATCH_SIZE);
> > + &nb_enqueued);
> > + stats->crypto_enq_count += nb_enqueued;
> > + n += nb_enqueued;
> >
> > - stats->crypto_enq_count += ret;
> > -
> > - while (ret < len) {
> > + while (nb_enqueued < len) {
> > struct rte_crypto_op *op;
> > - op = op_buffer[ret++];
> > + op = qp_info-
> > >cbuf.op_buffer[nb_enqueued++];
> > stats->crypto_enq_fail++;
> > rte_pktmbuf_free(op->sym->m_src);
> > rte_crypto_op_free(op);
> Not sure, why are you free the ops which are not enqueued to cryptodev.
> Isn't it the goal of the patch is to retain those non-enqueued crypto_ops
> temporarily and retry later?
Retained failed ops
>
> > @@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter
> > *adapter, struct rte_event *ev,
> >
> > if (qp_info)
> > qp_info->len = len;
> > - n += ret;
> > }
> >
> > return n;
> > @@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct
> event_crypto_adapter
> > *adapter)
> > struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> > struct crypto_device_info *curr_dev;
> > struct crypto_queue_pair_info *curr_queue;
> > - struct rte_crypto_op **op_buffer;
> > struct rte_cryptodev *dev;
> > uint8_t cdev_id;
> > uint16_t qp;
> > - uint16_t ret;
> > + uint16_t nb_enqueued = 0, nb = 0;
> > uint16_t num_cdev = rte_cryptodev_count();
> >
> > - ret = 0;
> > for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> > curr_dev = &adapter->cdevs[cdev_id];
> > dev = curr_dev->dev;
> > @@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct
> event_crypto_adapter
> > *adapter)
> > if (!curr_queue->qp_enabled)
> > continue;
> >
> > - op_buffer = curr_queue->op_buffer;
> > - ret = rte_cryptodev_enqueue_burst(cdev_id,
> > + eca_circular_buffer_flush_to_cdev(&curr_queue-
> >cbuf,
> > + cdev_id,
> > qp,
> > - op_buffer,
> > - curr_queue->len);
> > - stats->crypto_enq_count += ret;
> > + &nb_enqueued);
> >
> > - while (ret < curr_queue->len) {
> > + stats->crypto_enq_count += nb_enqueued;
> > + nb += nb_enqueued;
> > +
> > + while (nb_enqueued < curr_queue->len) {
> > struct rte_crypto_op *op;
> > - op = op_buffer[ret++];
> > + op = curr_queue-
> > >cbuf.op_buffer[nb_enqueued++];
> > stats->crypto_enq_fail++;
> > rte_pktmbuf_free(op->sym->m_src);
> > rte_crypto_op_free(op);
> > @@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct
> event_crypto_adapter
> > *adapter)
> > }
> > }
> >
> > - return ret;
> > + return nb;
> > }
> >
> > static int
> > @@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct
> > event_crypto_adapter *adapter,
> > return nb_enqueued;
> > }
> >
> > -static inline void
> > +static inline uint16_t
> > eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> > - struct rte_crypto_op **ops, uint16_t num)
> > + struct rte_crypto_op **ops, uint16_t num)
> > {
> > struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> > union rte_event_crypto_metadata *m_data = NULL; @@ -518,6
> +613,8 @@
> > eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> > num = RTE_MIN(num, BATCH_SIZE);
> > for (i = 0; i < num; i++) {
> > struct rte_event *ev = &events[nb_ev++];
> > +
> > + m_data = NULL;
> > if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
> > m_data =
> rte_cryptodev_sym_session_get_user_data(
> > ops[i]->sym->session);
> > @@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct
> > event_crypto_adapter *adapter,
> > event_port_id,
> > &events[nb_enqueued],
> > nb_ev - nb_enqueued);
> > +
> > } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
> > nb_enqueued < nb_ev);
> >
> > - /* Free mbufs and rte_crypto_ops for failed events */
> > - for (i = nb_enqueued; i < nb_ev; i++) {
> > - struct rte_crypto_op *op = events[i].event_ptr;
> > - rte_pktmbuf_free(op->sym->m_src);
> > - rte_crypto_op_free(op);
> > - }
> > -
> > stats->event_enq_fail_count += nb_ev - nb_enqueued;
> > stats->event_enq_count += nb_enqueued;
> > stats->event_enq_retry_count += retry - 1;
> > +
> > + return nb_enqueued;
> > }
> >
> > +static int
> > +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter
> *adapter,
> > + struct crypto_ops_circular_buffer *bufp) {
> > + uint16_t n = 0, nb_ops_flushed;
> > + uint16_t *headp = &bufp->head;
> > + uint16_t *tailp = &bufp->tail;
> > + struct rte_crypto_op **ops = bufp->op_buffer;
> > +
> > + if (*tailp > *headp)
> > + n = *tailp - *headp;
> > + else if (*tailp < *headp)
> > + n = bufp->size - *headp;
> > + else
> > + return 0; /* buffer empty */
> > +
> > + nb_ops_flushed = eca_ops_enqueue_burst(adapter, ops, n);
> > + bufp->count -= nb_ops_flushed;
> > + if (!bufp->count) {
> > + *headp = 0;
> > + *tailp = 0;
> > +
> Extra line not required
removed
> > + return 0; /* buffer empty */
> > + }
> > +
> > + *headp = (*headp + nb_ops_flushed) % bufp->size;
> > +
> Extra line not required
removed
> > + return 1;
> > +}
> > +
> > +
> > +static void
> > +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> > + if (adapter->ebuf.count == 0)
> > + return;
> > +
> > + while (eca_circular_buffer_flush_to_evdev(adapter,
> > + &adapter->ebuf))
> > + ;
> > +}
> > static inline unsigned int
> > eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> > unsigned int max_deq)
> > @@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> > struct crypto_device_info *curr_dev;
> > struct crypto_queue_pair_info *curr_queue;
> > struct rte_crypto_op *ops[BATCH_SIZE];
> > - uint16_t n, nb_deq;
> > + uint16_t n, nb_deq, nb_enqueued, i;
> > struct rte_cryptodev *dev;
> > uint8_t cdev_id;
> > uint16_t qp, dev_qps;
> > @@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> > uint16_t num_cdev = rte_cryptodev_count();
> >
> > nb_deq = 0;
> > + eca_ops_buffer_flush(adapter);
> > +
> > do {
> > - uint16_t queues = 0;
> > done = true;
> >
> > for (cdev_id = adapter->next_cdev_id;
> > cdev_id < num_cdev; cdev_id++) {
> > + uint16_t queues = 0;
> > +
> > curr_dev = &adapter->cdevs[cdev_id];
> > dev = curr_dev->dev;
> > if (dev == NULL)
> > continue;
> > +
> > dev_qps = dev->data->nb_queue_pairs;
> >
> > for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> > +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> > *adapter,
> > queues++) {
> >
> > curr_queue = &curr_dev->qpairs[qp];
> > - if (!curr_queue->qp_enabled)
> > + if (curr_queue == NULL ||
> > + !curr_queue->qp_enabled)
> > continue;
> >
> > n = rte_cryptodev_dequeue_burst(cdev_id,
> qp, @@ -605,11 +744,27
> > @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> > continue;
> >
> > done = false;
> > + nb_enqueued = 0;
> > +
> > stats->crypto_deq_count += n;
> > - eca_ops_enqueue_burst(adapter, ops, n);
> > +
> > + if (unlikely(!adapter->ebuf.count))
> > + nb_enqueued =
> > eca_ops_enqueue_burst(
> > + adapter, ops, n);
> > +
> > + if (nb_enqueued == n)
> > + goto check;
> > +
> > + /* Failed to enqueue events case */
> > + for (i = nb_enqueued; i < n; i++)
> > + eca_circular_buffer_add(
> > + &adapter->ebuf,
> > + ops[nb_enqueued]);
> > +
> > +check:
> > nb_deq += n;
> >
> > - if (nb_deq > max_deq) {
> > + if (nb_deq >= max_deq) {
> > if ((qp + 1) == dev_qps) {
> > adapter->next_cdev_id =
> > (cdev_id + 1)
> > @@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct
> > event_crypto_adapter *adapter,
> > }
> > }
> > }
> > + adapter->next_cdev_id = 0;
> > } while (done == false);
> > return nb_deq;
> > }
> > @@ -751,11 +907,10 @@ eca_add_queue_pair(struct
> event_crypto_adapter
> > *adapter, uint8_t cdev_id,
> > return -ENOMEM;
> >
> > qpairs = dev_info->qpairs;
> > - qpairs->op_buffer = rte_zmalloc_socket(adapter-
> >mem_name,
> > - BATCH_SIZE *
> > - sizeof(struct rte_crypto_op *),
> > - 0, adapter->socket_id);
> > - if (!qpairs->op_buffer) {
> > +
> > + if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> > + &qpairs->cbuf,
> > +
> > CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> > rte_free(qpairs);
> > return -ENOMEM;
> > }
> > --
> > 2.6.4
@@ -25,11 +25,27 @@
#define CRYPTO_ADAPTER_MEM_NAME_LEN 32
#define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
/* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
* iterations of eca_crypto_adapter_enq_run()
*/
#define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
+struct crypto_ops_circular_buffer {
+ /* index of head element in circular buffer */
+ uint16_t head;
+ /* index of tail element in circular buffer */
+ uint16_t tail;
+ /* number elements in buffer */
+ uint16_t count;
+ /* size of circular buffer */
+ uint16_t size;
+ /* Pointer to hold rte_crypto_ops for batching */
+ struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
struct event_crypto_adapter {
/* Event device identifier */
uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
struct crypto_device_info *cdevs;
/* Loop counter to flush crypto ops */
uint16_t transmit_loop_count;
+ /* Circular buffer for batching crypto ops to eventdev */
+ struct crypto_ops_circular_buffer ebuf;
/* Per instance stats structure */
struct rte_event_crypto_adapter_stats crypto_stats;
/* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
struct crypto_queue_pair_info {
/* Set to indicate queue pair is enabled */
bool qp_enabled;
- /* Pointer to hold rte_crypto_ops for batching */
- struct rte_crypto_op **op_buffer;
+ /* Circular buffer for batching crypto ops to cdev */
+ struct crypto_ops_circular_buffer cbuf;
/* No of crypto ops accumulated */
uint8_t len;
} __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
return 0;
}
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+ return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+ rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+ struct crypto_ops_circular_buffer *bufp,
+ uint16_t sz)
+{
+ bufp->op_buffer = rte_zmalloc(name,
+ sizeof(struct rte_crypto_op *) * sz,
+ 0);
+ if (bufp->op_buffer == NULL)
+ return -ENOMEM;
+
+ bufp->size = sz;
+ return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+ struct rte_crypto_op *op)
+{
+ uint16_t *tailp = &bufp->tail;
+
+ bufp->op_buffer[*tailp] = op;
+ *tailp = (*tailp + 1) % bufp->size;
+ bufp->count++;
+
+ return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+ uint8_t cdev_id, uint16_t qp_id,
+ uint16_t *nb_ops_flushed)
+{
+ uint16_t n = 0;
+ uint16_t *headp = &bufp->head;
+ uint16_t *tailp = &bufp->tail;
+ struct rte_crypto_op **ops = bufp->op_buffer;
+
+ if (*tailp > *headp)
+ n = *tailp - *headp;
+ else if (*tailp < *headp)
+ n = bufp->size - *headp;
+ else {
+ *nb_ops_flushed = 0;
+ return 0; /* buffer empty */
+ }
+
+ *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+ &ops[*headp], n);
+ bufp->count -= *nb_ops_flushed;
+ if (!bufp->count) {
+ *headp = 0;
+ *tailp = 0;
+ } else
+ *headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+ return *nb_ops_flushed == n ? 0 : -1;
+}
+
static inline struct event_crypto_adapter *
eca_id_to_adapter(uint8_t id)
{
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
return -ENOMEM;
}
+ if (eca_circular_buffer_init("eca_edev_circular_buffer",
+ &adapter->ebuf,
+ CRYPTO_ADAPTER_BUFFER_SZ)) {
+ RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+ rte_free(adapter);
+ return -ENOMEM;
+ }
+
ret = rte_event_dev_info_get(dev_id, &dev_info);
if (ret < 0) {
RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
dev_id, dev_info.driver_name);
+ eca_circular_buffer_free(&adapter->ebuf);
rte_free(adapter);
return ret;
}
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
socket_id);
if (adapter->cdevs == NULL) {
RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+ eca_circular_buffer_free(&adapter->ebuf);
rte_free(adapter);
return -ENOMEM;
}
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
struct crypto_queue_pair_info *qp_info = NULL;
struct rte_crypto_op *crypto_op;
unsigned int i, n;
- uint16_t qp_id, len, ret;
+ uint16_t qp_id, len, nb_enqueued = 0;
uint8_t cdev_id;
len = 0;
- ret = 0;
n = 0;
stats->event_deq_count += cnt;
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
continue;
}
len = qp_info->len;
- qp_info->op_buffer[len] = crypto_op;
+ eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
len++;
} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
continue;
}
len = qp_info->len;
- qp_info->op_buffer[len] = crypto_op;
+ eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
len++;
} else {
rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
continue;
}
- if (len == BATCH_SIZE) {
- struct rte_crypto_op **op_buffer = qp_info->op_buffer;
- ret = rte_cryptodev_enqueue_burst(cdev_id,
+ if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+ eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+ cdev_id,
qp_id,
- op_buffer,
- BATCH_SIZE);
+ &nb_enqueued);
+ stats->crypto_enq_count += nb_enqueued;
+ n += nb_enqueued;
- stats->crypto_enq_count += ret;
-
- while (ret < len) {
+ while (nb_enqueued < len) {
struct rte_crypto_op *op;
- op = op_buffer[ret++];
+ op = qp_info->cbuf.op_buffer[nb_enqueued++];
stats->crypto_enq_fail++;
rte_pktmbuf_free(op->sym->m_src);
rte_crypto_op_free(op);
@@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
if (qp_info)
qp_info->len = len;
- n += ret;
}
return n;
@@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
struct crypto_device_info *curr_dev;
struct crypto_queue_pair_info *curr_queue;
- struct rte_crypto_op **op_buffer;
struct rte_cryptodev *dev;
uint8_t cdev_id;
uint16_t qp;
- uint16_t ret;
+ uint16_t nb_enqueued = 0, nb = 0;
uint16_t num_cdev = rte_cryptodev_count();
- ret = 0;
for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
curr_dev = &adapter->cdevs[cdev_id];
dev = curr_dev->dev;
@@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
if (!curr_queue->qp_enabled)
continue;
- op_buffer = curr_queue->op_buffer;
- ret = rte_cryptodev_enqueue_burst(cdev_id,
+ eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+ cdev_id,
qp,
- op_buffer,
- curr_queue->len);
- stats->crypto_enq_count += ret;
+ &nb_enqueued);
- while (ret < curr_queue->len) {
+ stats->crypto_enq_count += nb_enqueued;
+ nb += nb_enqueued;
+
+ while (nb_enqueued < curr_queue->len) {
struct rte_crypto_op *op;
- op = op_buffer[ret++];
+ op = curr_queue->cbuf.op_buffer[nb_enqueued++];
stats->crypto_enq_fail++;
rte_pktmbuf_free(op->sym->m_src);
rte_crypto_op_free(op);
@@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
}
}
- return ret;
+ return nb;
}
static int
@@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
return nb_enqueued;
}
-static inline void
+static inline uint16_t
eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
- struct rte_crypto_op **ops, uint16_t num)
+ struct rte_crypto_op **ops, uint16_t num)
{
struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +613,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
num = RTE_MIN(num, BATCH_SIZE);
for (i = 0; i < num; i++) {
struct rte_event *ev = &events[nb_ev++];
+
+ m_data = NULL;
if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
m_data = rte_cryptodev_sym_session_get_user_data(
ops[i]->sym->session);
@@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
event_port_id,
&events[nb_enqueued],
nb_ev - nb_enqueued);
+
} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
nb_enqueued < nb_ev);
- /* Free mbufs and rte_crypto_ops for failed events */
- for (i = nb_enqueued; i < nb_ev; i++) {
- struct rte_crypto_op *op = events[i].event_ptr;
- rte_pktmbuf_free(op->sym->m_src);
- rte_crypto_op_free(op);
- }
-
stats->event_enq_fail_count += nb_ev - nb_enqueued;
stats->event_enq_count += nb_enqueued;
stats->event_enq_retry_count += retry - 1;
+
+ return nb_enqueued;
}
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+ struct crypto_ops_circular_buffer *bufp)
+{
+ uint16_t n = 0, nb_ops_flushed;
+ uint16_t *headp = &bufp->head;
+ uint16_t *tailp = &bufp->tail;
+ struct rte_crypto_op **ops = bufp->op_buffer;
+
+ if (*tailp > *headp)
+ n = *tailp - *headp;
+ else if (*tailp < *headp)
+ n = bufp->size - *headp;
+ else
+ return 0; /* buffer empty */
+
+ nb_ops_flushed = eca_ops_enqueue_burst(adapter, ops, n);
+ bufp->count -= nb_ops_flushed;
+ if (!bufp->count) {
+ *headp = 0;
+ *tailp = 0;
+
+ return 0; /* buffer empty */
+ }
+
+ *headp = (*headp + nb_ops_flushed) % bufp->size;
+
+ return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+ if (adapter->ebuf.count == 0)
+ return;
+
+ while (eca_circular_buffer_flush_to_evdev(adapter,
+ &adapter->ebuf))
+ ;
+}
static inline unsigned int
eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
unsigned int max_deq)
@@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
struct crypto_device_info *curr_dev;
struct crypto_queue_pair_info *curr_queue;
struct rte_crypto_op *ops[BATCH_SIZE];
- uint16_t n, nb_deq;
+ uint16_t n, nb_deq, nb_enqueued, i;
struct rte_cryptodev *dev;
uint8_t cdev_id;
uint16_t qp, dev_qps;
@@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
uint16_t num_cdev = rte_cryptodev_count();
nb_deq = 0;
+ eca_ops_buffer_flush(adapter);
+
do {
- uint16_t queues = 0;
done = true;
for (cdev_id = adapter->next_cdev_id;
cdev_id < num_cdev; cdev_id++) {
+ uint16_t queues = 0;
+
curr_dev = &adapter->cdevs[cdev_id];
dev = curr_dev->dev;
if (dev == NULL)
continue;
+
dev_qps = dev->data->nb_queue_pairs;
for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
queues++) {
curr_queue = &curr_dev->qpairs[qp];
- if (!curr_queue->qp_enabled)
+ if (curr_queue == NULL ||
+ !curr_queue->qp_enabled)
continue;
n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,11 +744,27 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
continue;
done = false;
+ nb_enqueued = 0;
+
stats->crypto_deq_count += n;
- eca_ops_enqueue_burst(adapter, ops, n);
+
+ if (unlikely(!adapter->ebuf.count))
+ nb_enqueued = eca_ops_enqueue_burst(
+ adapter, ops, n);
+
+ if (nb_enqueued == n)
+ goto check;
+
+ /* Failed to enqueue events case */
+ for (i = nb_enqueued; i < n; i++)
+ eca_circular_buffer_add(
+ &adapter->ebuf,
+ ops[nb_enqueued]);
+
+check:
nb_deq += n;
- if (nb_deq > max_deq) {
+ if (nb_deq >= max_deq) {
if ((qp + 1) == dev_qps) {
adapter->next_cdev_id =
(cdev_id + 1)
@@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
}
}
}
+ adapter->next_cdev_id = 0;
} while (done == false);
return nb_deq;
}
@@ -751,11 +907,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
return -ENOMEM;
qpairs = dev_info->qpairs;
- qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
- BATCH_SIZE *
- sizeof(struct rte_crypto_op *),
- 0, adapter->socket_id);
- if (!qpairs->op_buffer) {
+
+ if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+ &qpairs->cbuf,
+ CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
rte_free(qpairs);
return -ENOMEM;
}