[2/2] af_xdp: avoid to unnecessary allocation and free mbuf

Message ID 1600319462-2053-2-git-send-email-lirongqing@baidu.com (mailing list archive)
State Rejected, archived
Delegated to: Ferruh Yigit
Headers
Series [1/2] af_xdp: not return a negative value in af_xdp_rx_zc |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-testing success Testing PASS
ci/travis-robot success Travis build: passed

Commit Message

Li RongQing Sept. 17, 2020, 5:11 a.m. UTC
  optimize rx performance, by allocating mbuf based on result
of xsk_ring_cons__peek, to avoid to redundancy allocation,
and free mbuf when receive packets

Signed-off-by: Li RongQing <lirongqing@baidu.com>
Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 64 ++++++++++++++++---------------------
 1 file changed, 27 insertions(+), 37 deletions(-)
  

Comments

Loftus, Ciara Sept. 18, 2020, 9:38 a.m. UTC | #1
> 
> optimize rx performance, by allocating mbuf based on result
> of xsk_ring_cons__peek, to avoid to redundancy allocation,
> and free mbuf when receive packets
> 
> Signed-off-by: Li RongQing <lirongqing@baidu.com>
> Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
> ---
>  drivers/net/af_xdp/rte_eth_af_xdp.c | 64 ++++++++++++++++---------------
> ------
>  1 file changed, 27 insertions(+), 37 deletions(-)
> 
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index 7ce4ad04a..48824050e 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  	struct xsk_umem_info *umem = rxq->umem;
>  	uint32_t idx_rx = 0;
>  	unsigned long rx_bytes = 0;
> -	int rcvd, i;
> +	int i;
>  	struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
> 
> -	/* allocate bufs for fill queue replenishment after rx */
> -	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> -		AF_XDP_LOG(DEBUG,
> -			"Failed to get enough buffers for fq.\n");
> -		return 0;
> -	}
> 
> -	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> +	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> 
> -	if (rcvd == 0) {
> +	if (nb_pkts == 0) {
>  #if defined(XDP_USE_NEED_WAKEUP)
>  		if (xsk_ring_prod__needs_wakeup(&umem->fq))
>  			(void)poll(rxq->fds, 1, 1000);
>  #endif
> 
> -		goto out;
> +		return 0;
>  	}
> 
> -	for (i = 0; i < rcvd; i++) {
> +	/* allocate bufs for fill queue replenishment after rx */
> +	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> +		AF_XDP_LOG(DEBUG,
> +			"Failed to get enough buffers for fq.\n");

Thanks for this patch. I've considered this in the past.
There is a problem if we hit this condition.
We advance the rx producer @ xsk_ring_cons__peek.
But if we have no mbufs to hold the rx data, it is lost.
That's why we allocate the mbufs up front now.
Agree that we might have wasteful allocations and it's not the most optimal, but we don't drop packets due to failed mbuf allocs.

> +		return 0;
> +	}
> +
> +	for (i = 0; i < nb_pkts; i++) {
>  		const struct xdp_desc *desc;
>  		uint64_t addr;
>  		uint32_t len;
> @@ -275,20 +276,15 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  		rx_bytes += len;
>  	}
> 
> -	xsk_ring_cons__release(rx, rcvd);
> +	xsk_ring_cons__release(rx, nb_pkts);
> 
> -	(void)reserve_fill_queue(umem, rcvd, fq_bufs);
> +	(void)reserve_fill_queue(umem, nb_pkts, fq_bufs);
> 
>  	/* statistics */
> -	rxq->stats.rx_pkts += rcvd;
> +	rxq->stats.rx_pkts += nb_pkts;
>  	rxq->stats.rx_bytes += rx_bytes;
> 
> -out:
> -	if (rcvd != nb_pkts)
> -		rte_mempool_put_bulk(umem->mb_pool, (void
> **)&fq_bufs[rcvd],
> -				     nb_pkts - rcvd);
> -
> -	return rcvd;
> +	return nb_pkts;
>  }
>  #else
>  static uint16_t
> @@ -300,27 +296,26 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  	struct xsk_ring_prod *fq = &umem->fq;
>  	uint32_t idx_rx = 0;
>  	unsigned long rx_bytes = 0;
> -	int rcvd, i;
> +	int i;
>  	uint32_t free_thresh = fq->size >> 1;
>  	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
> 
> -	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> -		return 0;
> -
> -	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> -	if (rcvd == 0) {
> +	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> +	if (nb_pkts == 0) {
>  #if defined(XDP_USE_NEED_WAKEUP)
>  		if (xsk_ring_prod__needs_wakeup(fq))
>  			(void)poll(rxq->fds, 1, 1000);
>  #endif
> -
> -		goto out;
> +		return 0;
>  	}
> 
> +	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> +		return 0;
> +
>  	if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
>  		(void)reserve_fill_queue(umem,
> ETH_AF_XDP_RX_BATCH_SIZE, NULL);
> 
> -	for (i = 0; i < rcvd; i++) {
> +	for (i = 0; i < nb_pkts; i++) {
>  		const struct xdp_desc *desc;
>  		uint64_t addr;
>  		uint32_t len;
> @@ -339,18 +334,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>  		bufs[i] = mbufs[i];
>  	}
> 
> -	xsk_ring_cons__release(rx, rcvd);
> +	xsk_ring_cons__release(rx, nb_pkts);
> 
>  	/* statistics */
> -	rxq->stats.rx_pkts += rcvd;
> +	rxq->stats.rx_pkts += nb_pkts;
>  	rxq->stats.rx_bytes += rx_bytes;
> 
> -out:
> -	if (rcvd != nb_pkts)
> -		rte_mempool_put_bulk(rxq->mb_pool, (void
> **)&mbufs[rcvd],
> -				     nb_pkts - rcvd);
> -
> -	return rcvd;
> +	return nb_pkts;
>  }
>  #endif
> 
> --
> 2.16.2
  
Li RongQing Sept. 18, 2020, 11:16 a.m. UTC | #2
> -----Original Message-----
> From: Loftus, Ciara [mailto:ciara.loftus@intel.com]
> Sent: Friday, September 18, 2020 5:39 PM
> To: Li,Rongqing <lirongqing@baidu.com>
> Cc: dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 2/2] af_xdp: avoid to unnecessary allocation and
> free mbuf
> 
> >
> > optimize rx performance, by allocating mbuf based on result of
> > xsk_ring_cons__peek, to avoid to redundancy allocation, and free mbuf
> > when receive packets
> >
> > Signed-off-by: Li RongQing <lirongqing@baidu.com>
> > Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
> > ---
> >  drivers/net/af_xdp/rte_eth_af_xdp.c | 64
> > ++++++++++++++++---------------
> > ------
> >  1 file changed, 27 insertions(+), 37 deletions(-)
> >
> > diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > index 7ce4ad04a..48824050e 100644
> > --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > @@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> > **bufs, uint16_t nb_pkts)
> >  	struct xsk_umem_info *umem = rxq->umem;
> >  	uint32_t idx_rx = 0;
> >  	unsigned long rx_bytes = 0;
> > -	int rcvd, i;
> > +	int i;
> >  	struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
> >
> > -	/* allocate bufs for fill queue replenishment after rx */
> > -	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > -		AF_XDP_LOG(DEBUG,
> > -			"Failed to get enough buffers for fq.\n");
> > -		return 0;
> > -	}
> >
> > -	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> > +	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> >
> > -	if (rcvd == 0) {
> > +	if (nb_pkts == 0) {
> >  #if defined(XDP_USE_NEED_WAKEUP)
> >  		if (xsk_ring_prod__needs_wakeup(&umem->fq))
> >  			(void)poll(rxq->fds, 1, 1000);
> >  #endif
> >
> > -		goto out;
> > +		return 0;
> >  	}
> >
> > -	for (i = 0; i < rcvd; i++) {
> > +	/* allocate bufs for fill queue replenishment after rx */
> > +	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > +		AF_XDP_LOG(DEBUG,
> > +			"Failed to get enough buffers for fq.\n");
> 
> Thanks for this patch. I've considered this in the past.
> There is a problem if we hit this condition.
> We advance the rx producer @ xsk_ring_cons__peek.
> But if we have no mbufs to hold the rx data, it is lost.
> That's why we allocate the mbufs up front now.

True, thanks

-Li
  
Li RongQing Sept. 20, 2020, 6:02 a.m. UTC | #3
> -----Original Message-----
> From: Loftus, Ciara [mailto:ciara.loftus@intel.com]
> Sent: Friday, September 18, 2020 5:39 PM
> To: Li,Rongqing <lirongqing@baidu.com>
> Cc: dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 2/2] af_xdp: avoid to unnecessary allocation and
> free mbuf
> 
> >
> > optimize rx performance, by allocating mbuf based on result of
> > xsk_ring_cons__peek, to avoid to redundancy allocation, and free mbuf
> > when receive packets
> >
> > Signed-off-by: Li RongQing <lirongqing@baidu.com>
> > Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
> > ---
> >  drivers/net/af_xdp/rte_eth_af_xdp.c | 64
> > ++++++++++++++++---------------
> > ------
> >  1 file changed, 27 insertions(+), 37 deletions(-)
> >
> > diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > index 7ce4ad04a..48824050e 100644
> > --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > @@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> > **bufs, uint16_t nb_pkts)
> >  	struct xsk_umem_info *umem = rxq->umem;
> >  	uint32_t idx_rx = 0;
> >  	unsigned long rx_bytes = 0;
> > -	int rcvd, i;
> > +	int i;
> >  	struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
> >
> > -	/* allocate bufs for fill queue replenishment after rx */
> > -	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > -		AF_XDP_LOG(DEBUG,
> > -			"Failed to get enough buffers for fq.\n");
> > -		return 0;
> > -	}
> >
> > -	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> > +	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> >
> > -	if (rcvd == 0) {
> > +	if (nb_pkts == 0) {
> >  #if defined(XDP_USE_NEED_WAKEUP)
> >  		if (xsk_ring_prod__needs_wakeup(&umem->fq))
> >  			(void)poll(rxq->fds, 1, 1000);
> >  #endif
> >
> > -		goto out;
> > +		return 0;
> >  	}
> >
> > -	for (i = 0; i < rcvd; i++) {
> > +	/* allocate bufs for fill queue replenishment after rx */
> > +	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > +		AF_XDP_LOG(DEBUG,
> > +			"Failed to get enough buffers for fq.\n");
> 
> Thanks for this patch. I've considered this in the past.
> There is a problem if we hit this condition.
> We advance the rx producer @ xsk_ring_cons__peek.
> But if we have no mbufs to hold the rx data, it is lost.
> That's why we allocate the mbufs up front now.
> Agree that we might have wasteful allocations and it's not the most optimal,
> but we don't drop packets due to failed mbuf allocs.
> 
> > +		return 0;

xsk_ring_cons__peek advance rx cached_cons and rx cached_prod, I think it is harmless to advance cached_prod,
so we can restore rx cached_cons if mbuf fail to be allocated, to fix, like:


    if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0)) {
        rx->cached_cons -= rcvd;
        return 0;
     }

Is it right?

-Li
  

Patch

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 7ce4ad04a..48824050e 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -229,28 +229,29 @@  af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct xsk_umem_info *umem = rxq->umem;
 	uint32_t idx_rx = 0;
 	unsigned long rx_bytes = 0;
-	int rcvd, i;
+	int i;
 	struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
-	/* allocate bufs for fill queue replenishment after rx */
-	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
-		AF_XDP_LOG(DEBUG,
-			"Failed to get enough buffers for fq.\n");
-		return 0;
-	}
 
-	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
+	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
 
-	if (rcvd == 0) {
+	if (nb_pkts == 0) {
 #if defined(XDP_USE_NEED_WAKEUP)
 		if (xsk_ring_prod__needs_wakeup(&umem->fq))
 			(void)poll(rxq->fds, 1, 1000);
 #endif
 
-		goto out;
+		return 0;
 	}
 
-	for (i = 0; i < rcvd; i++) {
+	/* allocate bufs for fill queue replenishment after rx */
+	if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
+		AF_XDP_LOG(DEBUG,
+			"Failed to get enough buffers for fq.\n");
+		return 0;
+	}
+
+	for (i = 0; i < nb_pkts; i++) {
 		const struct xdp_desc *desc;
 		uint64_t addr;
 		uint32_t len;
@@ -275,20 +276,15 @@  af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		rx_bytes += len;
 	}
 
-	xsk_ring_cons__release(rx, rcvd);
+	xsk_ring_cons__release(rx, nb_pkts);
 
-	(void)reserve_fill_queue(umem, rcvd, fq_bufs);
+	(void)reserve_fill_queue(umem, nb_pkts, fq_bufs);
 
 	/* statistics */
-	rxq->stats.rx_pkts += rcvd;
+	rxq->stats.rx_pkts += nb_pkts;
 	rxq->stats.rx_bytes += rx_bytes;
 
-out:
-	if (rcvd != nb_pkts)
-		rte_mempool_put_bulk(umem->mb_pool, (void **)&fq_bufs[rcvd],
-				     nb_pkts - rcvd);
-
-	return rcvd;
+	return nb_pkts;
 }
 #else
 static uint16_t
@@ -300,27 +296,26 @@  af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct xsk_ring_prod *fq = &umem->fq;
 	uint32_t idx_rx = 0;
 	unsigned long rx_bytes = 0;
-	int rcvd, i;
+	int i;
 	uint32_t free_thresh = fq->size >> 1;
 	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
-	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
-		return 0;
-
-	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
-	if (rcvd == 0) {
+	nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
+	if (nb_pkts == 0) {
 #if defined(XDP_USE_NEED_WAKEUP)
 		if (xsk_ring_prod__needs_wakeup(fq))
 			(void)poll(rxq->fds, 1, 1000);
 #endif
-
-		goto out;
+		return 0;
 	}
 
+	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
+		return 0;
+
 	if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
 		(void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE, NULL);
 
-	for (i = 0; i < rcvd; i++) {
+	for (i = 0; i < nb_pkts; i++) {
 		const struct xdp_desc *desc;
 		uint64_t addr;
 		uint32_t len;
@@ -339,18 +334,13 @@  af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		bufs[i] = mbufs[i];
 	}
 
-	xsk_ring_cons__release(rx, rcvd);
+	xsk_ring_cons__release(rx, nb_pkts);
 
 	/* statistics */
-	rxq->stats.rx_pkts += rcvd;
+	rxq->stats.rx_pkts += nb_pkts;
 	rxq->stats.rx_bytes += rx_bytes;
 
-out:
-	if (rcvd != nb_pkts)
-		rte_mempool_put_bulk(rxq->mb_pool, (void **)&mbufs[rcvd],
-				     nb_pkts - rcvd);
-
-	return rcvd;
+	return nb_pkts;
 }
 #endif