[v2,5/6] net/af_xdp: enable zero copy

Message ID 20190319071256.26302-6-xiaolong.ye@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Ferruh Yigit
Headers
Series Introduce AF_XDP PMD |

Checks

Context Check Description
ci/Intel-compilation fail Compilation issues
ci/checkpatch warning coding style issues

Commit Message

Xiaolong Ye March 19, 2019, 7:12 a.m. UTC
  Try to check if external mempool (from rx_queue_setup) is fit for
af_xdp, if it is, it will be registered to af_xdp socket directly and
there will be no packet data copy on Rx and Tx.

Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 128 ++++++++++++++++++++--------
 1 file changed, 91 insertions(+), 37 deletions(-)
  

Comments

Mattias Rönnblom March 19, 2019, 8:12 a.m. UTC | #1
On 2019-03-19 08:12, Xiaolong Ye wrote:
> Try to check if external mempool (from rx_queue_setup) is fit for
> af_xdp, if it is, it will be registered to af_xdp socket directly and
> there will be no packet data copy on Rx and Tx.
> 
> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
> ---
>   drivers/net/af_xdp/rte_eth_af_xdp.c | 128 ++++++++++++++++++++--------
>   1 file changed, 91 insertions(+), 37 deletions(-)
> 
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index fc60cb5c5..c22791e51 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -62,6 +62,7 @@ struct xsk_umem_info {
>   	struct xsk_umem *umem;
>   	struct rte_mempool *mb_pool;
>   	void *buffer;
> +	uint8_t zc;
>   };
>   
>   struct pkt_rx_queue {
> @@ -76,6 +77,7 @@ struct pkt_rx_queue {
>   
>   	struct pkt_tx_queue *pair;
>   	uint16_t queue_idx;
> +	uint8_t zc;
>   };
>   
>   struct pkt_tx_queue {
> @@ -191,17 +193,24 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>   		uint32_t len = xsk_ring_cons__rx_desc(rx, idx_rx++)->len;
>   		char *pkt = xsk_umem__get_data(rxq->umem->buffer, addr);
>   
> -		mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
> -		if (mbuf) {
> -			memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
> +		if (rxq->zc) {
> +			mbuf = addr_to_mbuf(rxq->umem, addr);
>   			rte_pktmbuf_pkt_len(mbuf) =
>   				rte_pktmbuf_data_len(mbuf) = len;
> -			rx_bytes += len;
>   			bufs[count++] = mbuf;
>   		} else {
> -			dropped++;
> +			mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
> +			if (mbuf) {

if (likely(mbuf != NULL))

> +				memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);

Use rte_memcpy.

> +				rte_pktmbuf_pkt_len(mbuf) =
> +					rte_pktmbuf_data_len(mbuf) = len;
> +				rx_bytes += len;
> +				bufs[count++] = mbuf;
> +			} else {
> +				dropped++;
> +			}
> +			rte_pktmbuf_free(addr_to_mbuf(umem, addr));
>   		}
> -		rte_pktmbuf_free(addr_to_mbuf(umem, addr));
>   	}
>   
>   	xsk_ring_cons__release(rx, rcvd);
> @@ -285,22 +294,29 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>   					- ETH_AF_XDP_DATA_HEADROOM;
>   		desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
>   		mbuf = bufs[i];
> -		if (mbuf->pkt_len <= buf_len) {
> -			mbuf_to_tx = rte_pktmbuf_alloc(umem->mb_pool);
> -			if (!mbuf_to_tx) {
> -				rte_pktmbuf_free(mbuf);
> -				continue;
> -			}
> -			desc->addr = mbuf_to_addr(umem, mbuf_to_tx);
> +		if (txq->pair->zc && mbuf->pool == umem->mb_pool) {
> +			desc->addr = mbuf_to_addr(umem, mbuf);
>   			desc->len = mbuf->pkt_len;
> -			pkt = xsk_umem__get_data(umem->buffer,
> -						 desc->addr);
> -			memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
> -			       desc->len);
>   			valid++;
>   			tx_bytes += mbuf->pkt_len;
> +		} else {
> +			if (mbuf->pkt_len <= buf_len) {
> +				mbuf_to_tx = rte_pktmbuf_alloc(umem->mb_pool);
> +				if (!mbuf_to_tx) {

if (unlikely(mbuf_to_tx == NULL))

See DPDK coding conventions 1.8.1 for how to do pointer comparisons.

> +					rte_pktmbuf_free(mbuf);
> +					continue;
> +				}
> +				desc->addr = mbuf_to_addr(umem, mbuf_to_tx);
> +				desc->len = mbuf->pkt_len;
> +				pkt = xsk_umem__get_data(umem->buffer,
> +							 desc->addr);
> +				memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
> +				       desc->len);

rte_memcpy()

> +				valid++;
> +				tx_bytes += mbuf->pkt_len;
> +			}
> +			rte_pktmbuf_free(mbuf);
>   		}
> -		rte_pktmbuf_free(mbuf);
>   	}
>   
>   	xsk_ring_prod__submit(&txq->tx, nb_pkts);
> @@ -479,7 +495,7 @@ static inline uint64_t get_len(struct rte_mempool *mp)
>   	return (uint64_t)(memhdr->len);
>   }
>   
> -static struct xsk_umem_info *xdp_umem_configure(void)
> +static struct xsk_umem_info *xdp_umem_configure(struct rte_mempool *mb_pool)
>   {
>   	struct xsk_umem_info *umem;
>   	struct xsk_umem_config usr_config = {
> @@ -497,20 +513,25 @@ static struct xsk_umem_info *xdp_umem_configure(void)
>   		return NULL;
>   	}
>   
> -	snprintf(pool_name, 0x100, "af_xdp_ring");
> -	umem->mb_pool = rte_pktmbuf_pool_create_with_flags(pool_name,
> -			ETH_AF_XDP_NUM_BUFFERS,
> -			250, 0,
> -			ETH_AF_XDP_FRAME_SIZE -
> -			ETH_AF_XDP_MBUF_OVERHEAD,
> -			MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
> -			SOCKET_ID_ANY);
> -
> -	if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
> -		RTE_LOG(ERR, AF_XDP,
> -			"Failed to create rte_mempool\n");
> -		goto err;
> +	if (!mb_pool) {

1.8.1

> +		snprintf(pool_name, 0x100, "af_xdp_ring");

0x100??

> +		umem->mb_pool = rte_pktmbuf_pool_create_with_flags(pool_name,
> +				ETH_AF_XDP_NUM_BUFFERS,
> +				250, 0,
> +				ETH_AF_XDP_FRAME_SIZE -
> +				ETH_AF_XDP_MBUF_OVERHEAD,
> +				MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
> +				SOCKET_ID_ANY);
> +
> +		if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {

1.8.1

> +			RTE_LOG(ERR, AF_XDP, "Failed to create rte_mempool\n");
> +			goto err;
> +		}
> +	} else {
> +		umem->mb_pool = mb_pool;
> +		umem->zc = 1;
>   	}
> +
>   	base_addr = (void *)get_base_addr(umem->mb_pool);
>   
>   	ret = xsk_umem__create(&umem->umem, base_addr,
> @@ -531,16 +552,43 @@ static struct xsk_umem_info *xdp_umem_configure(void)
>   	return NULL;
>   }
>   
> +static uint8_t
> +check_mempool_zc(struct rte_mempool *mp)
> +{
> +	RTE_ASSERT(mp);
> +
> +	/* must continues */
> +	if (mp->nb_mem_chunks > 1)
> +		return 0;
> +
> +	/* check header size */
> +	if (mp->header_size != RTE_CACHE_LINE_SIZE)
> +		return 0;
> +
> +	/* check base address */
> +	if ((uint64_t)get_base_addr(mp) % getpagesize() != 0)

This should be an uintptr_t cast.

> +		return 0;
> +
> +	/* check chunk size */
> +	if ((mp->elt_size + mp->header_size + mp->trailer_size) %
> +	    ETH_AF_XDP_FRAME_SIZE != 0)
> +		return 0;
> +
> +	return 1;
> +}
> +
>   static int
>   xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
> -	      int ring_size)
> +	      int ring_size, struct rte_mempool *mb_pool)
>   {
>   	struct xsk_socket_config cfg;
>   	struct pkt_tx_queue *txq = rxq->pair;
> +	struct rte_mempool *mp;
>   	int ret = 0;
>   	int reserve_size;
>   
> -	rxq->umem = xdp_umem_configure();
> +	mp = check_mempool_zc(mb_pool) ? mb_pool : NULL;
> +	rxq->umem = xdp_umem_configure(mp);
>   	if (!rxq->umem) {
>   		ret = -ENOMEM;
>   		goto err;
> @@ -627,15 +675,21 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
>   
>   	rxq->mb_pool = mb_pool;
>   
> -	if (xsk_configure(internals, rxq, nb_rx_desc)) {
> -		RTE_LOG(ERR, AF_XDP,
> -			"Failed to configure xdp socket\n");
> +	if (xsk_configure(internals, rxq, nb_rx_desc, mb_pool)) {
> +		RTE_LOG(ERR, AF_XDP, "Failed to configure xdp socket\n");
>   		ret = -EINVAL;
>   		goto err;
>   	}
>   
>   	internals->umem = rxq->umem;
>   
> +	if (mb_pool == internals->umem->mb_pool)
> +		rxq->zc = internals->umem->zc;
> +
> +	if (rxq->zc)
> +		RTE_LOG(INFO, AF_XDP,
> +			"zero copy enabled on rx queue %d\n", rx_queue_id);
> +
>   	dev->data->rx_queues[rx_queue_id] = rxq;
>   	return 0;
>   
>
  
Xiaolong Ye March 19, 2019, 8:39 a.m. UTC | #2
Hi, Mattias

Thanks for the review.

On 03/19, Mattias Rönnblom wrote:
>On 2019-03-19 08:12, Xiaolong Ye wrote:
>> Try to check if external mempool (from rx_queue_setup) is fit for
>> af_xdp, if it is, it will be registered to af_xdp socket directly and
>> there will be no packet data copy on Rx and Tx.
>> 
>> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
>> ---
>>   drivers/net/af_xdp/rte_eth_af_xdp.c | 128 ++++++++++++++++++++--------
>>   1 file changed, 91 insertions(+), 37 deletions(-)
>> 
>> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
>> index fc60cb5c5..c22791e51 100644
>> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
>> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
>> @@ -62,6 +62,7 @@ struct xsk_umem_info {
>>   	struct xsk_umem *umem;
>>   	struct rte_mempool *mb_pool;
>>   	void *buffer;
>> +	uint8_t zc;
>>   };
>>   struct pkt_rx_queue {
>> @@ -76,6 +77,7 @@ struct pkt_rx_queue {
>>   	struct pkt_tx_queue *pair;
>>   	uint16_t queue_idx;
>> +	uint8_t zc;
>>   };
>>   struct pkt_tx_queue {
>> @@ -191,17 +193,24 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>>   		uint32_t len = xsk_ring_cons__rx_desc(rx, idx_rx++)->len;
>>   		char *pkt = xsk_umem__get_data(rxq->umem->buffer, addr);
>> -		mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
>> -		if (mbuf) {
>> -			memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
>> +		if (rxq->zc) {
>> +			mbuf = addr_to_mbuf(rxq->umem, addr);
>>   			rte_pktmbuf_pkt_len(mbuf) =
>>   				rte_pktmbuf_data_len(mbuf) = len;
>> -			rx_bytes += len;
>>   			bufs[count++] = mbuf;
>>   		} else {
>> -			dropped++;
>> +			mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
>> +			if (mbuf) {
>
>if (likely(mbuf != NULL))

Got it.

>
>> +				memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
>
>Use rte_memcpy.

Got it.

>
>> +				rte_pktmbuf_pkt_len(mbuf) =
>> +					rte_pktmbuf_data_len(mbuf) = len;
>> +				rx_bytes += len;
>> +				bufs[count++] = mbuf;
>> +			} else {
>> +				dropped++;
>> +			}
>> +			rte_pktmbuf_free(addr_to_mbuf(umem, addr));
>>   		}
>> -		rte_pktmbuf_free(addr_to_mbuf(umem, addr));
>>   	}
>>   	xsk_ring_cons__release(rx, rcvd);
>> @@ -285,22 +294,29 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>>   					- ETH_AF_XDP_DATA_HEADROOM;
>>   		desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
>>   		mbuf = bufs[i];
>> -		if (mbuf->pkt_len <= buf_len) {
>> -			mbuf_to_tx = rte_pktmbuf_alloc(umem->mb_pool);
>> -			if (!mbuf_to_tx) {
>> -				rte_pktmbuf_free(mbuf);
>> -				continue;
>> -			}
>> -			desc->addr = mbuf_to_addr(umem, mbuf_to_tx);
>> +		if (txq->pair->zc && mbuf->pool == umem->mb_pool) {
>> +			desc->addr = mbuf_to_addr(umem, mbuf);
>>   			desc->len = mbuf->pkt_len;
>> -			pkt = xsk_umem__get_data(umem->buffer,
>> -						 desc->addr);
>> -			memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
>> -			       desc->len);
>>   			valid++;
>>   			tx_bytes += mbuf->pkt_len;
>> +		} else {
>> +			if (mbuf->pkt_len <= buf_len) {
>> +				mbuf_to_tx = rte_pktmbuf_alloc(umem->mb_pool);
>> +				if (!mbuf_to_tx) {
>
>if (unlikely(mbuf_to_tx == NULL))
>
>See DPDK coding conventions 1.8.1 for how to do pointer comparisons.

I'll check it and do it correctly in next version.

>
>> +					rte_pktmbuf_free(mbuf);
>> +					continue;
>> +				}
>> +				desc->addr = mbuf_to_addr(umem, mbuf_to_tx);
>> +				desc->len = mbuf->pkt_len;
>> +				pkt = xsk_umem__get_data(umem->buffer,
>> +							 desc->addr);
>> +				memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
>> +				       desc->len);
>
>rte_memcpy()

Got it.

>
>> +				valid++;
>> +				tx_bytes += mbuf->pkt_len;
>> +			}
>> +			rte_pktmbuf_free(mbuf);
>>   		}
>> -		rte_pktmbuf_free(mbuf);
>>   	}
>>   	xsk_ring_prod__submit(&txq->tx, nb_pkts);
>> @@ -479,7 +495,7 @@ static inline uint64_t get_len(struct rte_mempool *mp)
>>   	return (uint64_t)(memhdr->len);
>>   }
>> -static struct xsk_umem_info *xdp_umem_configure(void)
>> +static struct xsk_umem_info *xdp_umem_configure(struct rte_mempool *mb_pool)
>>   {
>>   	struct xsk_umem_info *umem;
>>   	struct xsk_umem_config usr_config = {
>> @@ -497,20 +513,25 @@ static struct xsk_umem_info *xdp_umem_configure(void)
>>   		return NULL;
>>   	}
>> -	snprintf(pool_name, 0x100, "af_xdp_ring");
>> -	umem->mb_pool = rte_pktmbuf_pool_create_with_flags(pool_name,
>> -			ETH_AF_XDP_NUM_BUFFERS,
>> -			250, 0,
>> -			ETH_AF_XDP_FRAME_SIZE -
>> -			ETH_AF_XDP_MBUF_OVERHEAD,
>> -			MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
>> -			SOCKET_ID_ANY);
>> -
>> -	if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
>> -		RTE_LOG(ERR, AF_XDP,
>> -			"Failed to create rte_mempool\n");
>> -		goto err;
>> +	if (!mb_pool) {
>
>1.8.1

Got it.

>
>> +		snprintf(pool_name, 0x100, "af_xdp_ring");
>
>0x100??

Will use RTE_MEMPOOL_NAMESIZE instead.

>
>> +		umem->mb_pool = rte_pktmbuf_pool_create_with_flags(pool_name,
>> +				ETH_AF_XDP_NUM_BUFFERS,
>> +				250, 0,
>> +				ETH_AF_XDP_FRAME_SIZE -
>> +				ETH_AF_XDP_MBUF_OVERHEAD,
>> +				MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
>> +				SOCKET_ID_ANY);
>> +
>> +		if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
>
>1.8.1

Got it.

>
>> +			RTE_LOG(ERR, AF_XDP, "Failed to create rte_mempool\n");
>> +			goto err;
>> +		}
>> +	} else {
>> +		umem->mb_pool = mb_pool;
>> +		umem->zc = 1;
>>   	}
>> +
>>   	base_addr = (void *)get_base_addr(umem->mb_pool);
>>   	ret = xsk_umem__create(&umem->umem, base_addr,
>> @@ -531,16 +552,43 @@ static struct xsk_umem_info *xdp_umem_configure(void)
>>   	return NULL;
>>   }
>> +static uint8_t
>> +check_mempool_zc(struct rte_mempool *mp)
>> +{
>> +	RTE_ASSERT(mp);
>> +
>> +	/* must continues */
>> +	if (mp->nb_mem_chunks > 1)
>> +		return 0;
>> +
>> +	/* check header size */
>> +	if (mp->header_size != RTE_CACHE_LINE_SIZE)
>> +		return 0;
>> +
>> +	/* check base address */
>> +	if ((uint64_t)get_base_addr(mp) % getpagesize() != 0)
>
>This should be an uintptr_t cast.

Got it.

Thanks,
Xiaolong
>
>> +		return 0;
>> +
>> +	/* check chunk size */
>> +	if ((mp->elt_size + mp->header_size + mp->trailer_size) %
>> +	    ETH_AF_XDP_FRAME_SIZE != 0)
>> +		return 0;
>> +
>> +	return 1;
>> +}
>> +
>>   static int
>>   xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
>> -	      int ring_size)
>> +	      int ring_size, struct rte_mempool *mb_pool)
>>   {
>>   	struct xsk_socket_config cfg;
>>   	struct pkt_tx_queue *txq = rxq->pair;
>> +	struct rte_mempool *mp;
>>   	int ret = 0;
>>   	int reserve_size;
>> -	rxq->umem = xdp_umem_configure();
>> +	mp = check_mempool_zc(mb_pool) ? mb_pool : NULL;
>> +	rxq->umem = xdp_umem_configure(mp);
>>   	if (!rxq->umem) {
>>   		ret = -ENOMEM;
>>   		goto err;
>> @@ -627,15 +675,21 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
>>   	rxq->mb_pool = mb_pool;
>> -	if (xsk_configure(internals, rxq, nb_rx_desc)) {
>> -		RTE_LOG(ERR, AF_XDP,
>> -			"Failed to configure xdp socket\n");
>> +	if (xsk_configure(internals, rxq, nb_rx_desc, mb_pool)) {
>> +		RTE_LOG(ERR, AF_XDP, "Failed to configure xdp socket\n");
>>   		ret = -EINVAL;
>>   		goto err;
>>   	}
>>   	internals->umem = rxq->umem;
>> +	if (mb_pool == internals->umem->mb_pool)
>> +		rxq->zc = internals->umem->zc;
>> +
>> +	if (rxq->zc)
>> +		RTE_LOG(INFO, AF_XDP,
>> +			"zero copy enabled on rx queue %d\n", rx_queue_id);
>> +
>>   	dev->data->rx_queues[rx_queue_id] = rxq;
>>   	return 0;
>>
  
David Marchand March 20, 2019, 9:22 a.m. UTC | #3
On Tue, Mar 19, 2019 at 8:17 AM Xiaolong Ye <xiaolong.ye@intel.com> wrote:

> Try to check if external mempool (from rx_queue_setup) is fit for
> af_xdp, if it is, it will be registered to af_xdp socket directly and
> there will be no packet data copy on Rx and Tx.
>
> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
> ---
>  drivers/net/af_xdp/rte_eth_af_xdp.c | 128 ++++++++++++++++++++--------
>  1 file changed, 91 insertions(+), 37 deletions(-)
>
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index fc60cb5c5..c22791e51 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -62,6 +62,7 @@ struct xsk_umem_info {
>         struct xsk_umem *umem;
>         struct rte_mempool *mb_pool;
>         void *buffer;
> +       uint8_t zc;
>  };
>
>  struct pkt_rx_queue {
> @@ -76,6 +77,7 @@ struct pkt_rx_queue {
>
>         struct pkt_tx_queue *pair;
>         uint16_t queue_idx;
> +       uint8_t zc;
>  };
>
>  struct pkt_tx_queue {
> @@ -191,17 +193,24 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs,
> uint16_t nb_pkts)
>                 uint32_t len = xsk_ring_cons__rx_desc(rx, idx_rx++)->len;
>                 char *pkt = xsk_umem__get_data(rxq->umem->buffer, addr);
>
> -               mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
> -               if (mbuf) {
> -                       memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
> +               if (rxq->zc) {
> +                       mbuf = addr_to_mbuf(rxq->umem, addr);
>                         rte_pktmbuf_pkt_len(mbuf) =
>                                 rte_pktmbuf_data_len(mbuf) = len;
> -                       rx_bytes += len;
>                         bufs[count++] = mbuf;
>                 } else {
> -                       dropped++;
> +                       mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
> +                       if (mbuf) {
> +                               memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt,
> len);
> +                               rte_pktmbuf_pkt_len(mbuf) =
> +                                       rte_pktmbuf_data_len(mbuf) = len;
> +                               rx_bytes += len;
> +                               bufs[count++] = mbuf;
> +                       } else {
> +                               dropped++;
> +                       }
> +                       rte_pktmbuf_free(addr_to_mbuf(umem, addr));
>                 }
> -               rte_pktmbuf_free(addr_to_mbuf(umem, addr));
>         }
>

Did not understand how the zc parts are working, but at least looking at
the rx_burst function, when multi q will be supported, is there any reason
we would have zc enabled on one rxq and not others?
If the answer is that we would have either all or none rxq with zc, we
could have dedicated rx_burst functions and avoid this per mbuf test on
rxq->zc.


For the tx part, I don't understand the relation between rx and tx.
Should not the zc capability be global to the ethdev port ?

You might also want to look at "simple" tx burst functions like in i40e so
that you only need to look at the first mbuf to check its originating pool.
  
Qi Zhang March 20, 2019, 9:48 a.m. UTC | #4
From: David Marchand [mailto:david.marchand@redhat.com] 
Sent: Wednesday, March 20, 2019 5:22 PM
To: Ye, Xiaolong <xiaolong.ye@intel.com>
Cc: dev <dev@dpdk.org>; Zhang, Qi Z <qi.z.zhang@intel.com>; Karlsson, Magnus <magnus.karlsson@intel.com>; Topel, Bjorn <bjorn.topel@intel.com>
Subject: Re: [dpdk-dev] [PATCH v2 5/6] net/af_xdp: enable zero copy



On Tue, Mar 19, 2019 at 8:17 AM Xiaolong Ye <xiaolong.ye@intel.com> wrote:
Try to check if external mempool (from rx_queue_setup) is fit for
af_xdp, if it is, it will be registered to af_xdp socket directly and
there will be no packet data copy on Rx and Tx.

Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 128 ++++++++++++++++++++--------
 1 file changed, 91 insertions(+), 37 deletions(-)

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index fc60cb5c5..c22791e51 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -62,6 +62,7 @@ struct xsk_umem_info {
        struct xsk_umem *umem;
        struct rte_mempool *mb_pool;
        void *buffer;
+       uint8_t zc;
 };

 struct pkt_rx_queue {
@@ -76,6 +77,7 @@ struct pkt_rx_queue {

        struct pkt_tx_queue *pair;
        uint16_t queue_idx;
+       uint8_t zc;
 };

 struct pkt_tx_queue {
@@ -191,17 +193,24 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
                uint32_t len = xsk_ring_cons__rx_desc(rx, idx_rx++)->len;
                char *pkt = xsk_umem__get_data(rxq->umem->buffer, addr);

-               mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
-               if (mbuf) {
-                       memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
+               if (rxq->zc) {
+                       mbuf = addr_to_mbuf(rxq->umem, addr);
                        rte_pktmbuf_pkt_len(mbuf) =
                                rte_pktmbuf_data_len(mbuf) = len;
-                       rx_bytes += len;
                        bufs[count++] = mbuf;
                } else {
-                       dropped++;
+                       mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
+                       if (mbuf) {
+                               memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
+                               rte_pktmbuf_pkt_len(mbuf) =
+                                       rte_pktmbuf_data_len(mbuf) = len;
+                               rx_bytes += len;
+                               bufs[count++] = mbuf;
+                       } else {
+                               dropped++;
+                       }
+                       rte_pktmbuf_free(addr_to_mbuf(umem, addr));
                }
-               rte_pktmbuf_free(addr_to_mbuf(umem, addr));
        }

Did not understand how the zc parts are working, but at least looking at the rx_burst function, when multi q will be supported, is there any reason we would have zc enabled on one rxq and not others?

[Qi:] the answer is no, we can't anticipate which memory pool application use during rx queue setup, also at the case multi queue share the same memory pool, umem still can't be shared due to race condition, so only one queue could be zc. To make all the queue have zc, we have to assign each queue different memory pool.

If the answer is that we would have either all or none rxq with zc, we could have dedicated rx_burst functions and avoid this per mbuf test on rxq->zc.




For the tx part, I don't understand the relation between rx and tx.
Should not the zc capability be global to the ethdev port ?

You might also want to look at "simple" tx burst functions like in i40e so that you only need to look at the first mbuf to check its originating pool.

[Qi:] if you mean DEV_TX_OFFLOAD_MBUF_FAST_FREE, yes I think that's good point.
  

Patch

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index fc60cb5c5..c22791e51 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -62,6 +62,7 @@  struct xsk_umem_info {
 	struct xsk_umem *umem;
 	struct rte_mempool *mb_pool;
 	void *buffer;
+	uint8_t zc;
 };
 
 struct pkt_rx_queue {
@@ -76,6 +77,7 @@  struct pkt_rx_queue {
 
 	struct pkt_tx_queue *pair;
 	uint16_t queue_idx;
+	uint8_t zc;
 };
 
 struct pkt_tx_queue {
@@ -191,17 +193,24 @@  eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		uint32_t len = xsk_ring_cons__rx_desc(rx, idx_rx++)->len;
 		char *pkt = xsk_umem__get_data(rxq->umem->buffer, addr);
 
-		mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
-		if (mbuf) {
-			memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
+		if (rxq->zc) {
+			mbuf = addr_to_mbuf(rxq->umem, addr);
 			rte_pktmbuf_pkt_len(mbuf) =
 				rte_pktmbuf_data_len(mbuf) = len;
-			rx_bytes += len;
 			bufs[count++] = mbuf;
 		} else {
-			dropped++;
+			mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
+			if (mbuf) {
+				memcpy(rte_pktmbuf_mtod(mbuf, void*), pkt, len);
+				rte_pktmbuf_pkt_len(mbuf) =
+					rte_pktmbuf_data_len(mbuf) = len;
+				rx_bytes += len;
+				bufs[count++] = mbuf;
+			} else {
+				dropped++;
+			}
+			rte_pktmbuf_free(addr_to_mbuf(umem, addr));
 		}
-		rte_pktmbuf_free(addr_to_mbuf(umem, addr));
 	}
 
 	xsk_ring_cons__release(rx, rcvd);
@@ -285,22 +294,29 @@  eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 					- ETH_AF_XDP_DATA_HEADROOM;
 		desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
 		mbuf = bufs[i];
-		if (mbuf->pkt_len <= buf_len) {
-			mbuf_to_tx = rte_pktmbuf_alloc(umem->mb_pool);
-			if (!mbuf_to_tx) {
-				rte_pktmbuf_free(mbuf);
-				continue;
-			}
-			desc->addr = mbuf_to_addr(umem, mbuf_to_tx);
+		if (txq->pair->zc && mbuf->pool == umem->mb_pool) {
+			desc->addr = mbuf_to_addr(umem, mbuf);
 			desc->len = mbuf->pkt_len;
-			pkt = xsk_umem__get_data(umem->buffer,
-						 desc->addr);
-			memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
-			       desc->len);
 			valid++;
 			tx_bytes += mbuf->pkt_len;
+		} else {
+			if (mbuf->pkt_len <= buf_len) {
+				mbuf_to_tx = rte_pktmbuf_alloc(umem->mb_pool);
+				if (!mbuf_to_tx) {
+					rte_pktmbuf_free(mbuf);
+					continue;
+				}
+				desc->addr = mbuf_to_addr(umem, mbuf_to_tx);
+				desc->len = mbuf->pkt_len;
+				pkt = xsk_umem__get_data(umem->buffer,
+							 desc->addr);
+				memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
+				       desc->len);
+				valid++;
+				tx_bytes += mbuf->pkt_len;
+			}
+			rte_pktmbuf_free(mbuf);
 		}
-		rte_pktmbuf_free(mbuf);
 	}
 
 	xsk_ring_prod__submit(&txq->tx, nb_pkts);
@@ -479,7 +495,7 @@  static inline uint64_t get_len(struct rte_mempool *mp)
 	return (uint64_t)(memhdr->len);
 }
 
-static struct xsk_umem_info *xdp_umem_configure(void)
+static struct xsk_umem_info *xdp_umem_configure(struct rte_mempool *mb_pool)
 {
 	struct xsk_umem_info *umem;
 	struct xsk_umem_config usr_config = {
@@ -497,20 +513,25 @@  static struct xsk_umem_info *xdp_umem_configure(void)
 		return NULL;
 	}
 
-	snprintf(pool_name, 0x100, "af_xdp_ring");
-	umem->mb_pool = rte_pktmbuf_pool_create_with_flags(pool_name,
-			ETH_AF_XDP_NUM_BUFFERS,
-			250, 0,
-			ETH_AF_XDP_FRAME_SIZE -
-			ETH_AF_XDP_MBUF_OVERHEAD,
-			MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
-			SOCKET_ID_ANY);
-
-	if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
-		RTE_LOG(ERR, AF_XDP,
-			"Failed to create rte_mempool\n");
-		goto err;
+	if (!mb_pool) {
+		snprintf(pool_name, 0x100, "af_xdp_ring");
+		umem->mb_pool = rte_pktmbuf_pool_create_with_flags(pool_name,
+				ETH_AF_XDP_NUM_BUFFERS,
+				250, 0,
+				ETH_AF_XDP_FRAME_SIZE -
+				ETH_AF_XDP_MBUF_OVERHEAD,
+				MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
+				SOCKET_ID_ANY);
+
+		if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
+			RTE_LOG(ERR, AF_XDP, "Failed to create rte_mempool\n");
+			goto err;
+		}
+	} else {
+		umem->mb_pool = mb_pool;
+		umem->zc = 1;
 	}
+
 	base_addr = (void *)get_base_addr(umem->mb_pool);
 
 	ret = xsk_umem__create(&umem->umem, base_addr,
@@ -531,16 +552,43 @@  static struct xsk_umem_info *xdp_umem_configure(void)
 	return NULL;
 }
 
+static uint8_t
+check_mempool_zc(struct rte_mempool *mp)
+{
+	RTE_ASSERT(mp);
+
+	/* must continues */
+	if (mp->nb_mem_chunks > 1)
+		return 0;
+
+	/* check header size */
+	if (mp->header_size != RTE_CACHE_LINE_SIZE)
+		return 0;
+
+	/* check base address */
+	if ((uint64_t)get_base_addr(mp) % getpagesize() != 0)
+		return 0;
+
+	/* check chunk size */
+	if ((mp->elt_size + mp->header_size + mp->trailer_size) %
+	    ETH_AF_XDP_FRAME_SIZE != 0)
+		return 0;
+
+	return 1;
+}
+
 static int
 xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
-	      int ring_size)
+	      int ring_size, struct rte_mempool *mb_pool)
 {
 	struct xsk_socket_config cfg;
 	struct pkt_tx_queue *txq = rxq->pair;
+	struct rte_mempool *mp;
 	int ret = 0;
 	int reserve_size;
 
-	rxq->umem = xdp_umem_configure();
+	mp = check_mempool_zc(mb_pool) ? mb_pool : NULL;
+	rxq->umem = xdp_umem_configure(mp);
 	if (!rxq->umem) {
 		ret = -ENOMEM;
 		goto err;
@@ -627,15 +675,21 @@  eth_rx_queue_setup(struct rte_eth_dev *dev,
 
 	rxq->mb_pool = mb_pool;
 
-	if (xsk_configure(internals, rxq, nb_rx_desc)) {
-		RTE_LOG(ERR, AF_XDP,
-			"Failed to configure xdp socket\n");
+	if (xsk_configure(internals, rxq, nb_rx_desc, mb_pool)) {
+		RTE_LOG(ERR, AF_XDP, "Failed to configure xdp socket\n");
 		ret = -EINVAL;
 		goto err;
 	}
 
 	internals->umem = rxq->umem;
 
+	if (mb_pool == internals->umem->mb_pool)
+		rxq->zc = internals->umem->zc;
+
+	if (rxq->zc)
+		RTE_LOG(INFO, AF_XDP,
+			"zero copy enabled on rx queue %d\n", rx_queue_id);
+
 	dev->data->rx_queues[rx_queue_id] = rxq;
 	return 0;