[v2,4/5] compress/zlib: add enq deq apis

Message ID 1530550631-22841-5-git-send-email-shally.verma@caviumnetworks.com (mailing list archive)
State Changes Requested, archived
Delegated to: Pablo de Lara Guarch
Headers
Series compress: add ZLIB compression PMD |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Shally Verma July 2, 2018, 4:57 p.m. UTC
  From: Sunila Sahu <ssahu@caviumnetworks.com>

implement enqueue and dequeue apis

Signed-off-by: Sunila Sahu <sunila.sahu@caviumnetworks.com>
Signed-off-by: Shally Verma <shally.verma@caviumnetworks.com>
Signed-off-by: Ashish Gupta <ashish.gupta@caviumnetworks.com>
---
 drivers/compress/zlib/zlib_pmd.c | 238 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 237 insertions(+), 1 deletion(-)
  

Comments

De Lara Guarch, Pablo July 11, 2018, 1:25 p.m. UTC | #1
Hi,

> -----Original Message-----
> From: Shally Verma [mailto:shally.verma@caviumnetworks.com]
> Sent: Monday, July 2, 2018 5:57 PM
> To: De Lara Guarch, Pablo <pablo.de.lara.guarch@intel.com>
> Cc: dev@dpdk.org; pathreya@caviumnetworks.com;
> mchalla@caviumnetworks.com; Sunila Sahu <ssahu@caviumnetworks.com>;
> Sunila Sahu <sunila.sahu@caviumnetworks.com>; Ashish Gupta
> <ashish.gupta@caviumnetworks.com>
> Subject: [PATCH v2 4/5] compress/zlib: add enq deq apis

Better retitle to "support burst enqueue/dequeue".

> 
> From: Sunila Sahu <ssahu@caviumnetworks.com>
> 
> implement enqueue and dequeue apis

This should start with capital letter, but this is probably not needed anyway.

> 
> Signed-off-by: Sunila Sahu <sunila.sahu@caviumnetworks.com>
> Signed-off-by: Shally Verma <shally.verma@caviumnetworks.com>
> Signed-off-by: Ashish Gupta <ashish.gupta@caviumnetworks.com>
> ---
>  drivers/compress/zlib/zlib_pmd.c | 238
> ++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 237 insertions(+), 1 deletion(-)
> 
> diff --git a/drivers/compress/zlib/zlib_pmd.c b/drivers/compress/zlib/zlib_pmd.c
> index 7c2614e..aef23de 100644
> --- a/drivers/compress/zlib/zlib_pmd.c
> +++ b/drivers/compress/zlib/zlib_pmd.c
> @@ -6,7 +6,198 @@
>  #include <rte_common.h>
>  #include "zlib_pmd_private.h"
> 
> -/** Parse comp xform and set private xform/stream parameters */
> +/** Compute next mbuf in the list, assign data buffer and len */
> +#define COMPUTE_BUF(mbuf, data, len)		\
> +		((mbuf = mbuf->next) ?		\
> +		(data = rte_pktmbuf_mtod(mbuf, uint8_t *)),	\
> +		(len = rte_pktmbuf_data_len(mbuf)) : 0)
> +
> +/** Set op->status to appropriate flag if we run out of mbuf */
> +#define COMPUTE_DST_BUF(mbuf, dst, dlen)		\
> +		((COMPUTE_BUF(mbuf, dst, dlen)) ? 1 :	\
> +		(!(op->status =				\

I see and build error here:

drivers/compress/zlib/zlib_pmd.c(81): error #187: use of "=" where "==" may have been intended
                        COMPUTE_DST_BUF(mbuf_dst,  strm->next_out,
It is a bit confusing macro, but afaik, you should pass op if you want to modify the status
(I suggest making this more readable).
 

> +		RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED)))
> +
> +static void
> +process_zlib_deflate(struct rte_comp_op *op, z_stream *strm) {
> +	int ret, flush, fin_flush;
> +	struct rte_mbuf *mbuf_src = op->m_src;
> +	struct rte_mbuf *mbuf_dst = op->m_dst;
> +
> +	switch (op->flush_flag) {
> +	case RTE_COMP_FLUSH_FULL:
> +	case RTE_COMP_FLUSH_FINAL:
> +		fin_flush = Z_FINISH;
> +		break;
> +	default:
> +		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
> +		ZLIB_PMD_ERR("\nInvalid flush value");

Better to have "\n" at the end for consistency.

> +
> +	}
> +
> +	if (unlikely(!strm)) {
> +		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
> +		ZLIB_PMD_ERR("\nInvalid z_stream");
> +		return;
> +	}
> +	/* Update z_stream with the inputs provided by application */
> +	strm->next_in = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *,
> +			op->src.offset);
> +
> +	strm->avail_in = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;

Shouldn't this be "op->src.length"?

> +
> +	strm->next_out = rte_pktmbuf_mtod_offset(mbuf_dst, uint8_t *,
> +			op->dst.offset);
> +
> +	strm->avail_out = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
> +
> +	/* Set flush value to NO_FLUSH unless it is last mbuf */
> +	flush = Z_NO_FLUSH;
> +	/* Initialize status to SUCCESS */
> +	op->status = RTE_COMP_OP_STATUS_SUCCESS;
> +

...

> +	/* Update source buffer to next mbuf
> +	 * Exit if op->status is not SUCCESS or input buffers are fully consumed
> +	 */
> +	} while ((op->status == RTE_COMP_OP_STATUS_SUCCESS) &&
> +		(COMPUTE_BUF(mbuf_src, strm->next_in, strm->avail_in)));

It looks like you support Scatter Gather here, but according to the documentation, you don't support it...

> +
> +def_end:
> +	/* Update op stats */
> +	switch (op->status) {
> +	case RTE_COMP_OP_STATUS_SUCCESS:
> +		op->consumed += strm->total_in;

I see a compilation error with gcc:

drivers/compress/zlib/zlib_pmd.c:166:16: error:
this statement may fall through [-Werror=implicit-fallthrough=]
   op->consumed += strm->total_in;
   ~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~
drivers/compress/zlib/zlib_pmd.c:167:2: note: here
  case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
  ^~~~

If the intention is to fall-through, you should add a comment before the next case.

> +	case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
> +		op->produced += strm->total_out;
> +		break;
> +	default:
> +		ZLIB_PMD_ERR("stats not updated for status:%d\n",
> +				op->status);
> +	}
> +
> +	deflateReset(strm);
> +}
> +

...

> +
> +/** Process comp operation for mbuf */
> +static inline int
> +process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op) {
> +	struct zlib_stream *stream;
> +
> +	if ((op->op_type == RTE_COMP_OP_STATEFUL) ||
> +		op->src.offset > rte_pktmbuf_data_len(op->m_src) ||
> +		op->dst.offset > rte_pktmbuf_data_len(op->m_dst)) {

An extra indentation is needed, so it is easy to distinguish between the if statement and the body.
You should also check if (src.offset + src.length > rte_pktmbuf_data_len(op->m_src))

> +		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
> +		ZLIB_PMD_ERR("\nInvalid source or destination buffers or "
> +				"invalid Operation requested");

Better to include the "\n" at the end.

> +	} else {
> +		stream = &((struct zlib_priv_xform *)op->private_xform)-
> >stream;

I think it is more readable to have this line split into two:
First get the private xform and then get zlib_stream pointer.

> +		stream->comp(op, &stream->strm);
> +	}
> +	/* whatever is out of op, put it into completion queue with
> +	 * its status
> +	 */
> +	return rte_ring_enqueue(qp->processed_pkts, (void *)op); }
> +

...

> +static uint16_t
> +zlib_pmd_enqueue_burst(void *queue_pair,
> +			struct rte_comp_op **ops, uint16_t nb_ops) {
> +	struct zlib_qp *qp = queue_pair;
> +	int ret, i;
> +	int enqd = 0;

"i" and "enqd" variables should be uint16_t.

> +	for (i = 0; i < nb_ops; i++) {
> +		ret = process_zlib_op(qp, ops[i]);

I think you should check if there is enough space in the ring for all the operations, to save time.
If there is not, then the PMD would modify the operation unnecessarily and waste a lot of time.
Then, with that check, you can just process the operations that can fit, looping until minimum between
nb_ops and space available in the ring.

> +		if (unlikely(ret < 0)) {
> +			/* increment count if failed to push to completion
> +			 * queue
> +			 */
> +			qp->qp_stats.enqueue_err_count++;

I think this variable should be used when there was an error in the operation but
it was still be enqueued (because there was space in the ring).
So you can increase this count in process_zlib_op when there is an error.

> +		} else {
> +			qp->qp_stats.enqueued_count++;

I think this should be equal to all the operations enqueued (with and without error).

> +			enqd++;
> +		}
> +	}
> +	return enqd;
> +}
  
Verma, Shally July 12, 2018, 5:46 a.m. UTC | #2
>-----Original Message-----
>From: De Lara Guarch, Pablo [mailto:pablo.de.lara.guarch@intel.com]
>Sent: 11 July 2018 18:56
>To: Verma, Shally <Shally.Verma@cavium.com>
>Cc: dev@dpdk.org; Athreya, Narayana Prasad <NarayanaPrasad.Athreya@cavium.com>; Challa, Mahipal
><Mahipal.Challa@cavium.com>; Sahu, Sunila <Sunila.Sahu@cavium.com>; Sahu, Sunila <Sunila.Sahu@cavium.com>; Gupta, Ashish
><Ashish.Gupta@cavium.com>
>Subject: RE: [PATCH v2 4/5] compress/zlib: add enq deq apis
>
>External Email
>
>Hi,
>
>> -----Original Message-----
>> From: Shally Verma [mailto:shally.verma@caviumnetworks.com]
>> Sent: Monday, July 2, 2018 5:57 PM
>> To: De Lara Guarch, Pablo <pablo.de.lara.guarch@intel.com>
>> Cc: dev@dpdk.org; pathreya@caviumnetworks.com;
>> mchalla@caviumnetworks.com; Sunila Sahu <ssahu@caviumnetworks.com>;
>> Sunila Sahu <sunila.sahu@caviumnetworks.com>; Ashish Gupta
>> <ashish.gupta@caviumnetworks.com>
>> Subject: [PATCH v2 4/5] compress/zlib: add enq deq apis
>
>Better retitle to "support burst enqueue/dequeue".
>
>>
>> From: Sunila Sahu <ssahu@caviumnetworks.com>
>>
>> implement enqueue and dequeue apis
>
>This should start with capital letter, but this is probably not needed anyway.
>
>>
>> Signed-off-by: Sunila Sahu <sunila.sahu@caviumnetworks.com>
>> Signed-off-by: Shally Verma <shally.verma@caviumnetworks.com>
>> Signed-off-by: Ashish Gupta <ashish.gupta@caviumnetworks.com>
>> ---
>>  drivers/compress/zlib/zlib_pmd.c | 238
>> ++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 237 insertions(+), 1 deletion(-)
>>
>> diff --git a/drivers/compress/zlib/zlib_pmd.c b/drivers/compress/zlib/zlib_pmd.c
>> index 7c2614e..aef23de 100644
>> --- a/drivers/compress/zlib/zlib_pmd.c
>> +++ b/drivers/compress/zlib/zlib_pmd.c
>> @@ -6,7 +6,198 @@
>>  #include <rte_common.h>
>>  #include "zlib_pmd_private.h"
>>
>> -/** Parse comp xform and set private xform/stream parameters */
>> +/** Compute next mbuf in the list, assign data buffer and len */
>> +#define COMPUTE_BUF(mbuf, data, len)         \
>> +             ((mbuf = mbuf->next) ?          \
>> +             (data = rte_pktmbuf_mtod(mbuf, uint8_t *)),     \
>> +             (len = rte_pktmbuf_data_len(mbuf)) : 0)
>> +
>> +/** Set op->status to appropriate flag if we run out of mbuf */
>> +#define COMPUTE_DST_BUF(mbuf, dst, dlen)             \
>> +             ((COMPUTE_BUF(mbuf, dst, dlen)) ? 1 :   \
>> +             (!(op->status =                         \
>
>I see and build error here:
>
>drivers/compress/zlib/zlib_pmd.c(81): error #187: use of "=" where "==" may have been intended
>                        COMPUTE_DST_BUF(mbuf_dst,  strm->next_out,
>It is a bit confusing macro, but afaik, you should pass op if you want to modify the status
>(I suggest making this more readable).
>
>
>> +             RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED)))
>> +
>> +static void
>> +process_zlib_deflate(struct rte_comp_op *op, z_stream *strm) {
>> +     int ret, flush, fin_flush;
>> +     struct rte_mbuf *mbuf_src = op->m_src;
>> +     struct rte_mbuf *mbuf_dst = op->m_dst;
>> +
>> +     switch (op->flush_flag) {
>> +     case RTE_COMP_FLUSH_FULL:
>> +     case RTE_COMP_FLUSH_FINAL:
>> +             fin_flush = Z_FINISH;
>> +             break;
>> +     default:
>> +             op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
>> +             ZLIB_PMD_ERR("\nInvalid flush value");
>
>Better to have "\n" at the end for consistency.
>
>> +
>> +     }
>> +
>> +     if (unlikely(!strm)) {
>> +             op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
>> +             ZLIB_PMD_ERR("\nInvalid z_stream");
>> +             return;
>> +     }
>> +     /* Update z_stream with the inputs provided by application */
>> +     strm->next_in = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *,
>> +                     op->src.offset);
>> +
>> +     strm->avail_in = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
>
>Shouldn't this be "op->src.length"?
If packet is segmented, then src.length will give wrong o/p. Though we are not claiming segmented packet support. But this is safer way to do.

>
>> +
>> +     strm->next_out = rte_pktmbuf_mtod_offset(mbuf_dst, uint8_t *,
>> +                     op->dst.offset);
>> +
>> +     strm->avail_out = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
>> +
>> +     /* Set flush value to NO_FLUSH unless it is last mbuf */
>> +     flush = Z_NO_FLUSH;
>> +     /* Initialize status to SUCCESS */
>> +     op->status = RTE_COMP_OP_STATUS_SUCCESS;
>> +
>
>...
>
>> +     /* Update source buffer to next mbuf
>> +      * Exit if op->status is not SUCCESS or input buffers are fully consumed
>> +      */
>> +     } while ((op->status == RTE_COMP_OP_STATUS_SUCCESS) &&
>> +             (COMPUTE_BUF(mbuf_src, strm->next_in, strm->avail_in)));
>
>It looks like you support Scatter Gather here, but according to the documentation, you don't support it...
>
Yes. right now, we don't claim its support as we couldn't test it.

>> +
>> +def_end:
>> +     /* Update op stats */
>> +     switch (op->status) {
>> +     case RTE_COMP_OP_STATUS_SUCCESS:
>> +             op->consumed += strm->total_in;
>
>I see a compilation error with gcc:
>
>drivers/compress/zlib/zlib_pmd.c:166:16: error:
>this statement may fall through [-Werror=implicit-fallthrough=]
>   op->consumed += strm->total_in;
>   ~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~
>drivers/compress/zlib/zlib_pmd.c:167:2: note: here
>  case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
>  ^~~~
>
>If the intention is to fall-through, you should add a comment before the next case.
>
Ok. But then my view is it shouldn't be taken as error here. Is it we don't want fall-through model?

>> +     case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
>> +             op->produced += strm->total_out;
>> +             break;
>> +     default:
>> +             ZLIB_PMD_ERR("stats not updated for status:%d\n",
>> +                             op->status);
>> +     }
>> +
>> +     deflateReset(strm);
>> +}
>> +
>
>...
>
>> +
>> +/** Process comp operation for mbuf */
>> +static inline int
>> +process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op) {
>> +     struct zlib_stream *stream;
>> +
>> +     if ((op->op_type == RTE_COMP_OP_STATEFUL) ||
>> +             op->src.offset > rte_pktmbuf_data_len(op->m_src) ||
>> +             op->dst.offset > rte_pktmbuf_data_len(op->m_dst)) {
>
>An extra indentation is needed, so it is easy to distinguish between the if statement and the body.
>You should also check if (src.offset + src.length > rte_pktmbuf_data_len(op->m_src))

Should it be checked against pktmbuf_data_len() or pktmbuf_pkt_len()?!

>
>> +             op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
>> +             ZLIB_PMD_ERR("\nInvalid source or destination buffers or "
>> +                             "invalid Operation requested");
>
>Better to include the "\n" at the end.
>
>> +     } else {
>> +             stream = &((struct zlib_priv_xform *)op->private_xform)-
>> >stream;
>
>I think it is more readable to have this line split into two:
>First get the private xform and then get zlib_stream pointer.
>
>> +             stream->comp(op, &stream->strm);
>> +     }
>> +     /* whatever is out of op, put it into completion queue with
>> +      * its status
>> +      */
>> +     return rte_ring_enqueue(qp->processed_pkts, (void *)op); }
>> +
>
>...
>
>> +static uint16_t
>> +zlib_pmd_enqueue_burst(void *queue_pair,
>> +                     struct rte_comp_op **ops, uint16_t nb_ops) {
>> +     struct zlib_qp *qp = queue_pair;
>> +     int ret, i;
>> +     int enqd = 0;
>
>"i" and "enqd" variables should be uint16_t.
>
>> +     for (i = 0; i < nb_ops; i++) {
>> +             ret = process_zlib_op(qp, ops[i]);
>
>I think you should check if there is enough space in the ring for all the operations, to save time.
>If there is not, then the PMD would modify the operation unnecessarily and waste a lot of time.
>Then, with that check, you can just process the operations that can fit, looping until minimum between
>nb_ops and space available in the ring.
I doubt if I should do that. If there's an application with dequeue only thread, then at one point, we may see it 
full and another space available. Since this PMD is using lockless rings, it can be made flexible to produce as many it can.

>
>> +             if (unlikely(ret < 0)) {
>> +                     /* increment count if failed to push to completion
>> +                      * queue
>> +                      */
>> +                     qp->qp_stats.enqueue_err_count++;
>
>I think this variable should be used when there was an error in the operation but
>it was still be enqueued (because there was space in the ring).
>So you can increase this count in process_zlib_op when there is an error.
>
So what is exact purpose of this stat? is  enqueue_err_count supposed to give number of failed operations in processed queue?
Or, just number of operations that couldn't be enqueued/dequeued for processing?


>> +             } else {
>> +                     qp->qp_stats.enqueued_count++;
>
>I think this should be equal to all the operations enqueued (with and without error).
Yes. Right now it does that except for the cases, where op couldn't be pushed into as processing queue, and user cannot deque it.

Thanks
Shally
>
>> +                     enqd++;
>> +             }
>> +     }
>> +     return enqd;
>> +}
  
De Lara Guarch, Pablo July 13, 2018, 3:55 p.m. UTC | #3
Hi Shally,

Sorry for the delay. Comments inline.

> -----Original Message-----
> From: Verma, Shally [mailto:Shally.Verma@cavium.com]
> Sent: Thursday, July 12, 2018 6:46 AM
> To: De Lara Guarch, Pablo <pablo.de.lara.guarch@intel.com>
> Cc: dev@dpdk.org; Athreya, Narayana Prasad
> <NarayanaPrasad.Athreya@cavium.com>; Challa, Mahipal
> <Mahipal.Challa@cavium.com>; Sahu, Sunila <Sunila.Sahu@cavium.com>; Sahu,
> Sunila <Sunila.Sahu@cavium.com>; Gupta, Ashish <Ashish.Gupta@cavium.com>
> Subject: RE: [PATCH v2 4/5] compress/zlib: add enq deq apis
> 
> 
> 
> >-----Original Message-----
> >From: De Lara Guarch, Pablo [mailto:pablo.de.lara.guarch@intel.com]
> >Sent: 11 July 2018 18:56
> >To: Verma, Shally <Shally.Verma@cavium.com>
> >Cc: dev@dpdk.org; Athreya, Narayana Prasad
> ><NarayanaPrasad.Athreya@cavium.com>; Challa, Mahipal
> ><Mahipal.Challa@cavium.com>; Sahu, Sunila <Sunila.Sahu@cavium.com>;
> >Sahu, Sunila <Sunila.Sahu@cavium.com>; Gupta, Ashish
> ><Ashish.Gupta@cavium.com>
> >Subject: RE: [PATCH v2 4/5] compress/zlib: add enq deq apis
> >
> >External Email
> >
> >Hi,
> >
> >> -----Original Message-----
> >> From: Shally Verma [mailto:shally.verma@caviumnetworks.com]
> >> Sent: Monday, July 2, 2018 5:57 PM
> >> To: De Lara Guarch, Pablo <pablo.de.lara.guarch@intel.com>
> >> Cc: dev@dpdk.org; pathreya@caviumnetworks.com;
> >> mchalla@caviumnetworks.com; Sunila Sahu <ssahu@caviumnetworks.com>;
> >> Sunila Sahu <sunila.sahu@caviumnetworks.com>; Ashish Gupta
> >> <ashish.gupta@caviumnetworks.com>
> >> Subject: [PATCH v2 4/5] compress/zlib: add enq deq apis
> >
> >Better retitle to "support burst enqueue/dequeue".
> >
> >>
> >> From: Sunila Sahu <ssahu@caviumnetworks.com>
> >>
> >> implement enqueue and dequeue apis
> >
> >This should start with capital letter, but this is probably not needed anyway.
> >
> >>
> >> Signed-off-by: Sunila Sahu <sunila.sahu@caviumnetworks.com>
> >> Signed-off-by: Shally Verma <shally.verma@caviumnetworks.com>
> >> Signed-off-by: Ashish Gupta <ashish.gupta@caviumnetworks.com>
> >> ---
> >>  drivers/compress/zlib/zlib_pmd.c | 238
> >> ++++++++++++++++++++++++++++++++++++++-
> >>  1 file changed, 237 insertions(+), 1 deletion(-)
> >>
> >> diff --git a/drivers/compress/zlib/zlib_pmd.c
> >> b/drivers/compress/zlib/zlib_pmd.c
> >> index 7c2614e..aef23de 100644
> >> --- a/drivers/compress/zlib/zlib_pmd.c
> >> +++ b/drivers/compress/zlib/zlib_pmd.c
> >> @@ -6,7 +6,198 @@
> >>  #include <rte_common.h>
> >>  #include "zlib_pmd_private.h"
> >>
> >> -/** Parse comp xform and set private xform/stream parameters */
> >> +/** Compute next mbuf in the list, assign data buffer and len */
> >> +#define COMPUTE_BUF(mbuf, data, len)         \
> >> +             ((mbuf = mbuf->next) ?          \
> >> +             (data = rte_pktmbuf_mtod(mbuf, uint8_t *)),     \
> >> +             (len = rte_pktmbuf_data_len(mbuf)) : 0)
> >> +
> >> +/** Set op->status to appropriate flag if we run out of mbuf */
> >> +#define COMPUTE_DST_BUF(mbuf, dst, dlen)             \
> >> +             ((COMPUTE_BUF(mbuf, dst, dlen)) ? 1 :   \
> >> +             (!(op->status =                         \
> >
> >I see and build error here:
> >
> >drivers/compress/zlib/zlib_pmd.c(81): error #187: use of "=" where "==" may
> have been intended
> >                        COMPUTE_DST_BUF(mbuf_dst,  strm->next_out, It
> >is a bit confusing macro, but afaik, you should pass op if you want to
> >modify the status (I suggest making this more readable).
> >
> >
> >> +             RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED)))
> >> +
> >> +static void
> >> +process_zlib_deflate(struct rte_comp_op *op, z_stream *strm) {
> >> +     int ret, flush, fin_flush;
> >> +     struct rte_mbuf *mbuf_src = op->m_src;
> >> +     struct rte_mbuf *mbuf_dst = op->m_dst;
> >> +
> >> +     switch (op->flush_flag) {
> >> +     case RTE_COMP_FLUSH_FULL:
> >> +     case RTE_COMP_FLUSH_FINAL:
> >> +             fin_flush = Z_FINISH;
> >> +             break;
> >> +     default:
> >> +             op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
> >> +             ZLIB_PMD_ERR("\nInvalid flush value");
> >
> >Better to have "\n" at the end for consistency.
> >
> >> +
> >> +     }
> >> +
> >> +     if (unlikely(!strm)) {
> >> +             op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
> >> +             ZLIB_PMD_ERR("\nInvalid z_stream");
> >> +             return;
> >> +     }
> >> +     /* Update z_stream with the inputs provided by application */
> >> +     strm->next_in = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *,
> >> +                     op->src.offset);
> >> +
> >> +     strm->avail_in = rte_pktmbuf_data_len(mbuf_src) -
> >> + op->src.offset;
> >
> >Shouldn't this be "op->src.length"?
> If packet is segmented, then src.length will give wrong o/p. Though we are not
> claiming segmented packet support. But this is safer way to do.

Right, I was assuming that only single segment buffers were supported.
Regardless you take into account single or multi segment buffers,
src.length needs to be taken into consideration.

> 
> >
> >> +
> >> +     strm->next_out = rte_pktmbuf_mtod_offset(mbuf_dst, uint8_t *,
> >> +                     op->dst.offset);
> >> +
> >> +     strm->avail_out = rte_pktmbuf_data_len(mbuf_dst) -
> >> + op->dst.offset;
> >> +
> >> +     /* Set flush value to NO_FLUSH unless it is last mbuf */
> >> +     flush = Z_NO_FLUSH;
> >> +     /* Initialize status to SUCCESS */
> >> +     op->status = RTE_COMP_OP_STATUS_SUCCESS;
> >> +
> >
> >...
> >
> >> +     /* Update source buffer to next mbuf
> >> +      * Exit if op->status is not SUCCESS or input buffers are fully consumed
> >> +      */
> >> +     } while ((op->status == RTE_COMP_OP_STATUS_SUCCESS) &&
> >> +             (COMPUTE_BUF(mbuf_src, strm->next_in,
> >> + strm->avail_in)));
> >
> >It looks like you support Scatter Gather here, but according to the
> documentation, you don't support it...
> >
> Yes. right now, we don't claim its support as we couldn't test it.

Tests for this were sent in this release:
http://patches.dpdk.org/patch/42137/

If you think you support it, you should try the tests and set the flags.

> 
> >> +
> >> +def_end:
> >> +     /* Update op stats */
> >> +     switch (op->status) {
> >> +     case RTE_COMP_OP_STATUS_SUCCESS:
> >> +             op->consumed += strm->total_in;
> >
> >I see a compilation error with gcc:
> >
> >drivers/compress/zlib/zlib_pmd.c:166:16: error:
> >this statement may fall through [-Werror=implicit-fallthrough=]
> >   op->consumed += strm->total_in;
> >   ~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~
> >drivers/compress/zlib/zlib_pmd.c:167:2: note: here
> >  case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
> >  ^~~~
> >
> >If the intention is to fall-through, you should add a comment before the next
> case.
> >
> Ok. But then my view is it shouldn't be taken as error here. Is it we don't want
> fall-through model?

Basically the compiler warns you that you could have forgotten a break statement.
If you want to fall-though, you should add "/* Fall-through */ before the next case.


> 
> >> +     case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
> >> +             op->produced += strm->total_out;
> >> +             break;
> >> +     default:
> >> +             ZLIB_PMD_ERR("stats not updated for status:%d\n",
> >> +                             op->status);
> >> +     }
> >> +
> >> +     deflateReset(strm);
> >> +}
> >> +
> >
> >...
> >
> >> +
> >> +/** Process comp operation for mbuf */ static inline int
> >> +process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op) {
> >> +     struct zlib_stream *stream;
> >> +
> >> +     if ((op->op_type == RTE_COMP_OP_STATEFUL) ||
> >> +             op->src.offset > rte_pktmbuf_data_len(op->m_src) ||
> >> +             op->dst.offset > rte_pktmbuf_data_len(op->m_dst)) {
> >
> >An extra indentation is needed, so it is easy to distinguish between the if
> statement and the body.
> >You should also check if (src.offset + src.length >
> >rte_pktmbuf_data_len(op->m_src))
> 
> Should it be checked against pktmbuf_data_len() or pktmbuf_pkt_len()?!

Pktlen if you consider multi-segment (which I thought you weren't supporting it).
For a single segment, these two values are the same.

> 
> >
> >> +             op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
> >> +             ZLIB_PMD_ERR("\nInvalid source or destination buffers or "
> >> +                             "invalid Operation requested");
> >
> >Better to include the "\n" at the end.
> >
> >> +     } else {
> >> +             stream = &((struct zlib_priv_xform
> >> + *)op->private_xform)-
> >> >stream;
> >
> >I think it is more readable to have this line split into two:
> >First get the private xform and then get zlib_stream pointer.
> >
> >> +             stream->comp(op, &stream->strm);
> >> +     }
> >> +     /* whatever is out of op, put it into completion queue with
> >> +      * its status
> >> +      */
> >> +     return rte_ring_enqueue(qp->processed_pkts, (void *)op); }
> >> +
> >
> >...
> >
> >> +static uint16_t
> >> +zlib_pmd_enqueue_burst(void *queue_pair,
> >> +                     struct rte_comp_op **ops, uint16_t nb_ops) {
> >> +     struct zlib_qp *qp = queue_pair;
> >> +     int ret, i;
> >> +     int enqd = 0;
> >
> >"i" and "enqd" variables should be uint16_t.
> >
> >> +     for (i = 0; i < nb_ops; i++) {
> >> +             ret = process_zlib_op(qp, ops[i]);
> >
> >I think you should check if there is enough space in the ring for all the
> operations, to save time.
> >If there is not, then the PMD would modify the operation unnecessarily and
> waste a lot of time.
> >Then, with that check, you can just process the operations that can
> >fit, looping until minimum between nb_ops and space available in the ring.
> I doubt if I should do that. If there's an application with dequeue only thread,
> then at one point, we may see it full and another space available. Since this PMD
> is using lockless rings, it can be made flexible to produce as many it can.

Right, I see your point. My concern is that if there is no space in the ring,
you would have modified the operation. The expectation is that, if an operation cannot be enqueued,
you can retry and enqueue it again, but if you have modified it, that won't be possible.

An alternative is to move the processing into the dequeue function.

> 
> >
> >> +             if (unlikely(ret < 0)) {
> >> +                     /* increment count if failed to push to completion
> >> +                      * queue
> >> +                      */
> >> +                     qp->qp_stats.enqueue_err_count++;
> >
> >I think this variable should be used when there was an error in the
> >operation but it was still be enqueued (because there was space in the ring).
> >So you can increase this count in process_zlib_op when there is an error.
> >
> So what is exact purpose of this stat? is  enqueue_err_count supposed to give
> number of failed operations in processed queue?
> Or, just number of operations that couldn't be enqueued/dequeued for
> processing?

For me, it is the number of operations that were enqueued, but that contains an error.
This actually raises a concern. For software devices, operations with errors can be enqueued and not processed,
but for hardware devices, this might not work (arguments could be invalid) and then operation couldn't be "enqueued".
In that case, I believe that the faulty operations and the next ones won't be enqueued into the device,
making it inconsistent with software devices.

Any opinions on this?

> 
> 
> >> +             } else {
> >> +                     qp->qp_stats.enqueued_count++;
> >
> >I think this should be equal to all the operations enqueued (with and without
> error).
> Yes. Right now it does that except for the cases, where op couldn't be pushed
> into as processing queue, and user cannot deque it.
> 
> Thanks
> Shally
> >
> >> +                     enqd++;
> >> +             }
> >> +     }
> >> +     return enqd;
> >> +}
  

Patch

diff --git a/drivers/compress/zlib/zlib_pmd.c b/drivers/compress/zlib/zlib_pmd.c
index 7c2614e..aef23de 100644
--- a/drivers/compress/zlib/zlib_pmd.c
+++ b/drivers/compress/zlib/zlib_pmd.c
@@ -6,7 +6,198 @@ 
 #include <rte_common.h>
 #include "zlib_pmd_private.h"
 
-/** Parse comp xform and set private xform/stream parameters */
+/** Compute next mbuf in the list, assign data buffer and len */
+#define COMPUTE_BUF(mbuf, data, len)		\
+		((mbuf = mbuf->next) ?		\
+		(data = rte_pktmbuf_mtod(mbuf, uint8_t *)),	\
+		(len = rte_pktmbuf_data_len(mbuf)) : 0)
+
+/** Set op->status to appropriate flag if we run out of mbuf */
+#define COMPUTE_DST_BUF(mbuf, dst, dlen)		\
+		((COMPUTE_BUF(mbuf, dst, dlen)) ? 1 :	\
+		(!(op->status =				\
+		RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED)))
+
+static void
+process_zlib_deflate(struct rte_comp_op *op, z_stream *strm)
+{
+	int ret, flush, fin_flush;
+	struct rte_mbuf *mbuf_src = op->m_src;
+	struct rte_mbuf *mbuf_dst = op->m_dst;
+
+	switch (op->flush_flag) {
+	case RTE_COMP_FLUSH_FULL:
+	case RTE_COMP_FLUSH_FINAL:
+		fin_flush = Z_FINISH;
+		break;
+	default:
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_PMD_ERR("\nInvalid flush value");
+
+	}
+
+	if (unlikely(!strm)) {
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_PMD_ERR("\nInvalid z_stream");
+		return;
+	}
+	/* Update z_stream with the inputs provided by application */
+	strm->next_in = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *,
+			op->src.offset);
+
+	strm->avail_in = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
+
+	strm->next_out = rte_pktmbuf_mtod_offset(mbuf_dst, uint8_t *,
+			op->dst.offset);
+
+	strm->avail_out = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
+
+	/* Set flush value to NO_FLUSH unless it is last mbuf */
+	flush = Z_NO_FLUSH;
+	/* Initialize status to SUCCESS */
+	op->status = RTE_COMP_OP_STATUS_SUCCESS;
+
+	do {
+		/* Set flush value to Z_FINISH for last block */
+		if ((op->src.length - strm->total_in) <= strm->avail_in) {
+			strm->avail_in = (op->src.length - strm->total_in);
+			flush = fin_flush;
+		}
+		do {
+			ret = deflate(strm, flush);
+			if (unlikely(ret == Z_STREAM_ERROR)) {
+				/* error return, do not process further */
+				op->status =  RTE_COMP_OP_STATUS_ERROR;
+				break;
+			}
+			/* Break if Z_STREAM_END is encountered */
+			if (ret == Z_STREAM_END)
+				goto def_end;
+
+		 /* Break if current input buffer is consumed or
+		  * there is no more space for compressed output
+		  */
+		} while ((strm->avail_out == 0) &&
+			COMPUTE_DST_BUF(mbuf_dst,  strm->next_out,
+				strm->avail_out));
+
+	/* Update source buffer to next mbuf
+	 * Exit if op->status is not SUCCESS or input buffers are fully consumed
+	 */
+	} while ((op->status == RTE_COMP_OP_STATUS_SUCCESS) &&
+		(COMPUTE_BUF(mbuf_src, strm->next_in, strm->avail_in)));
+
+def_end:
+	/* Update op stats */
+	switch (op->status) {
+	case RTE_COMP_OP_STATUS_SUCCESS:
+		op->consumed += strm->total_in;
+	case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
+		op->produced += strm->total_out;
+		break;
+	default:
+		ZLIB_PMD_ERR("stats not updated for status:%d\n",
+				op->status);
+	}
+
+	deflateReset(strm);
+}
+
+static void
+process_zlib_inflate(struct rte_comp_op *op, z_stream *strm)
+{
+	int ret, flush;
+	struct rte_mbuf *mbuf_src = op->m_src;
+	struct rte_mbuf *mbuf_dst = op->m_dst;
+
+	if (unlikely(!strm)) {
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_PMD_ERR("\nInvalid z_stream");
+		return;
+	}
+	strm->next_in = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *,
+			op->src.offset);
+
+	strm->avail_in = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
+
+	strm->next_out = rte_pktmbuf_mtod_offset(mbuf_dst, uint8_t *,
+			op->dst.offset);
+
+	strm->avail_out = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
+
+	/** Ignoring flush value provided from application for decompression */
+	flush = Z_NO_FLUSH;
+	/* initialize status to SUCCESS */
+	op->status = RTE_COMP_OP_STATUS_SUCCESS;
+
+	do {
+		do {
+			ret = inflate(strm, flush);
+
+			switch (ret) {
+			case Z_NEED_DICT:
+				ret = Z_DATA_ERROR;     /* and fall through */
+			case Z_DATA_ERROR:
+			case Z_MEM_ERROR:
+			case Z_STREAM_ERROR:
+				op->status = RTE_COMP_OP_STATUS_ERROR;
+				break;
+			default:
+				/* Break if Z_STREAM_END is encountered */
+				if (ret == Z_STREAM_END)
+					goto inf_end;
+			}
+		/* Break if Z_STREAM_END is encountered or dst mbuf gets over */
+		} while ((strm->avail_out == 0) &&
+			COMPUTE_DST_BUF(mbuf_dst, strm->next_out,
+				strm->avail_out));
+
+		/* Exit if op->status is not SUCCESS.
+		 * Read next input buffer to be processed, exit if compressed
+		 * blocks are fully read
+		 */
+	} while ((op->status == RTE_COMP_OP_STATUS_SUCCESS) &&
+		(COMPUTE_BUF(mbuf_src, strm->next_in, strm->avail_in)));
+
+inf_end:
+	/* Update op stats */
+	switch (op->status) {
+	case RTE_COMP_OP_STATUS_SUCCESS:
+		op->consumed += strm->total_in;
+	case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
+		op->produced += strm->total_out;
+		break;
+	default:
+		ZLIB_PMD_ERR("stats not produced for status:%d\n",
+				op->status);
+	}
+
+	inflateReset(strm);
+}
+
+/** Process comp operation for mbuf */
+static inline int
+process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op)
+{
+	struct zlib_stream *stream;
+
+	if ((op->op_type == RTE_COMP_OP_STATEFUL) ||
+		op->src.offset > rte_pktmbuf_data_len(op->m_src) ||
+		op->dst.offset > rte_pktmbuf_data_len(op->m_dst)) {
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_PMD_ERR("\nInvalid source or destination buffers or "
+				"invalid Operation requested");
+	} else {
+		stream = &((struct zlib_priv_xform *)op->private_xform)->stream;
+		stream->comp(op, &stream->strm);
+	}
+	/* whatever is out of op, put it into completion queue with
+	 * its status
+	 */
+	return rte_ring_enqueue(qp->processed_pkts, (void *)op);
+}
+
+/** Parse comp xform and set private xform/Stream parameters */
 int
 zlib_set_stream_parameters(const struct rte_comp_xform *xform,
 		struct zlib_stream *stream)
@@ -21,6 +212,8 @@  zlib_set_stream_parameters(const struct rte_comp_xform *xform,
 
 	switch (xform->type) {
 	case RTE_COMP_COMPRESS:
+		stream->comp = process_zlib_deflate;
+		stream->free = deflateEnd;
 		/** Compression window bits */
 		switch (xform->compress.algo) {
 		case RTE_COMP_ALGO_DEFLATE:
@@ -78,6 +271,8 @@  zlib_set_stream_parameters(const struct rte_comp_xform *xform,
 	break;
 
 	case RTE_COMP_DECOMPRESS:
+		stream->comp = process_zlib_inflate;
+		stream->free = inflateEnd;
 		/** window bits */
 		switch (xform->decompress.algo) {
 		case RTE_COMP_ALGO_DEFLATE:
@@ -99,6 +294,43 @@  zlib_set_stream_parameters(const struct rte_comp_xform *xform,
 	return 0;
 }
 
+static uint16_t
+zlib_pmd_enqueue_burst(void *queue_pair,
+			struct rte_comp_op **ops, uint16_t nb_ops)
+{
+	struct zlib_qp *qp = queue_pair;
+	int ret, i;
+	int enqd = 0;
+	for (i = 0; i < nb_ops; i++) {
+		ret = process_zlib_op(qp, ops[i]);
+		if (unlikely(ret < 0)) {
+			/* increment count if failed to push to completion
+			 * queue
+			 */
+			qp->qp_stats.enqueue_err_count++;
+		} else {
+			qp->qp_stats.enqueued_count++;
+			enqd++;
+		}
+	}
+	return enqd;
+}
+
+static uint16_t
+zlib_pmd_dequeue_burst(void *queue_pair,
+			struct rte_comp_op **ops, uint16_t nb_ops)
+{
+	struct zlib_qp *qp = queue_pair;
+
+	unsigned int nb_dequeued = 0;
+
+	nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts,
+			(void **)ops, nb_ops, NULL);
+	qp->qp_stats.dequeued_count += nb_dequeued;
+
+	return nb_dequeued;
+}
+
 static int
 zlib_create(const char *name,
 		struct rte_vdev_device *vdev,
@@ -115,6 +347,10 @@  zlib_create(const char *name,
 
 	dev->dev_ops = rte_zlib_pmd_ops;
 
+	/* register rx/tx burst functions for data path */
+	dev->dequeue_burst = zlib_pmd_dequeue_burst;
+	dev->enqueue_burst = zlib_pmd_enqueue_burst;
+
 	dev->feature_flags = RTE_COMP_FF_NONCOMPRESSED_BLOCKS;
 
 	return 0;