[dpdk-dev] [PATCH 4/8] net/mlx4: optimize Tx multi-segment case

Adrien Mazarguil adrien.mazarguil at 6wind.com
Wed Dec 6 11:58:31 CET 2017


On Tue, Nov 28, 2017 at 12:19:26PM +0000, Matan Azrad wrote:
> mlx4 Tx block can handle up to 4 data segments or control segment + up
> to 3 data segments. The first data segment in each not first Tx block
> must validate Tx queue wraparound and must use IO memory barrier before
> writing the byte count.
> 
> The previous multi-segment code used "for" loop to iterate over all
> packet segments and separated first Tx block data case by "if"
> statments.

statments => statements

> 
> Use switch case and unconditional branches instead of "for" loop can
> optimize the case and prevents the unnecessary checks for each data
> segment; This hints to compiler to create opitimized jump table.

opitimized => optimized

> 
> Optimize this case by switch case and unconditional branches usage.
> 
> Signed-off-by: Matan Azrad <matan at mellanox.com>
> ---
>  drivers/net/mlx4/mlx4_rxtx.c | 165 +++++++++++++++++++++++++------------------
>  drivers/net/mlx4/mlx4_rxtx.h |  33 +++++++++
>  2 files changed, 128 insertions(+), 70 deletions(-)
> 
> diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
> index 1d8240a..b9cb2fc 100644
> --- a/drivers/net/mlx4/mlx4_rxtx.c
> +++ b/drivers/net/mlx4/mlx4_rxtx.c
> @@ -432,15 +432,14 @@ struct pv {
>  	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
>  	volatile struct mlx4_wqe_ctrl_seg *ctrl;
>  	volatile struct mlx4_wqe_data_seg *dseg;
> -	struct rte_mbuf *sbuf;
> +	struct rte_mbuf *sbuf = buf;
>  	uint32_t lkey;
> -	uintptr_t addr;
> -	uint32_t byte_count;
>  	int pv_counter = 0;
> +	int nb_segs = buf->nb_segs;
>  
>  	/* Calculate the needed work queue entry size for this packet. */
>  	wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) +
> -		buf->nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> +		nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
>  	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
>  	/*
>  	 * Check that there is room for this WQE in the send queue and that
> @@ -457,67 +456,99 @@ struct pv {
>  	dseg = (volatile struct mlx4_wqe_data_seg *)
>  			((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg));
>  	*pctrl = ctrl;
> -	/* Fill the data segments with buffer information. */
> -	for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> -		addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> -		rte_prefetch0((volatile void *)addr);
> -		/* Memory region key (big endian) for this memory pool. */
> +	/*
> +	 * Fill the data segments with buffer information.
> +	 * First WQE TXBB head segment is always control segment,
> +	 * so jump to tail TXBB data segments code for the first
> +	 * WQE data segments filling.
> +	 */
> +	goto txbb_tail_segs;
> +txbb_head_seg:

I'm not fundamentally opposed to "goto" unlike a lot of people out there,
but this doesn't look good. It's OK to use goto for error cases and to
extricate yourself when trapped in an inner loop, also in some optimization
scenarios where it sometimes make sense, but not when the same can be
achieved through standard loop constructs and keywords.

In this case I'm under the impression you could have managed with a
do { ... } while (...) construct. You need to try harder to reorganize these
changes or prove it can't be done without negatively impacting performance.

Doing so should make this patch shorter as well.

> +	/* Memory region key (big endian) for this memory pool. */
> +	lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> +	if (unlikely(lkey == (uint32_t)-1)) {
> +		DEBUG("%p: unable to get MP <-> MR association",
> +		      (void *)txq);
> +		return -1;
> +	}
> +	/* Handle WQE wraparound. */
> +	if (dseg >=
> +		(volatile struct mlx4_wqe_data_seg *)sq->eob)
> +		dseg = (volatile struct mlx4_wqe_data_seg *)
> +			sq->buf;
> +	dseg->addr = rte_cpu_to_be_64(rte_pktmbuf_mtod(sbuf, uintptr_t));
> +	dseg->lkey = rte_cpu_to_be_32(lkey);
> +	/*
> +	 * This data segment starts at the beginning of a new
> +	 * TXBB, so we need to postpone its byte_count writing
> +	 * for later.
> +	 */
> +	pv[pv_counter].dseg = dseg;
> +	/*
> +	 * Zero length segment is treated as inline segment
> +	 * with zero data.
> +	 */
> +	pv[pv_counter++].val = rte_cpu_to_be_32(sbuf->data_len ?
> +						sbuf->data_len : 0x80000000);
> +	sbuf = sbuf->next;
> +	dseg++;
> +	nb_segs--;
> +txbb_tail_segs:
> +	/* Jump to default if there are more than two segments remaining. */
> +	switch (nb_segs) {
> +	default:
>  		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> -		dseg->lkey = rte_cpu_to_be_32(lkey);
> -		/* Calculate the needed work queue entry size for this packet */
> -		if (unlikely(lkey == rte_cpu_to_be_32((uint32_t)-1))) {
> -			/* MR does not exist. */
> +		if (unlikely(lkey == (uint32_t)-1)) {
>  			DEBUG("%p: unable to get MP <-> MR association",
>  			      (void *)txq);
>  			return -1;
>  		}
> -		if (likely(sbuf->data_len)) {
> -			byte_count = rte_cpu_to_be_32(sbuf->data_len);
> -		} else {
> -			/*
> -			 * Zero length segment is treated as inline segment
> -			 * with zero data.
> -			 */
> -			byte_count = RTE_BE32(0x80000000);
> +		mlx4_fill_tx_data_seg(dseg, lkey,
> +				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> +				      rte_cpu_to_be_32(sbuf->data_len ?
> +						       sbuf->data_len :
> +						       0x80000000));
> +		sbuf = sbuf->next;
> +		dseg++;
> +		nb_segs--;
> +		/* fallthrough */
> +	case 2:
> +		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> +		if (unlikely(lkey == (uint32_t)-1)) {
> +			DEBUG("%p: unable to get MP <-> MR association",
> +			      (void *)txq);
> +			return -1;
>  		}
> -		/*
> -		 * If the data segment is not at the beginning of a
> -		 * Tx basic block (TXBB) then write the byte count,
> -		 * else postpone the writing to just before updating the
> -		 * control segment.
> -		 */
> -		if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
> -			dseg->addr = rte_cpu_to_be_64(addr);
> -			dseg->lkey = rte_cpu_to_be_32(lkey);
> -#if RTE_CACHE_LINE_SIZE < 64
> -			/*
> -			 * Need a barrier here before writing the byte_count
> -			 * fields to make sure that all the data is visible
> -			 * before the byte_count field is set.
> -			 * Otherwise, if the segment begins a new cacheline,
> -			 * the HCA prefetcher could grab the 64-byte chunk and
> -			 * get a valid (!= 0xffffffff) byte count but stale
> -			 * data, and end up sending the wrong data.
> -			 */
> -			rte_io_wmb();
> -#endif /* RTE_CACHE_LINE_SIZE */
> -			dseg->byte_count = byte_count;
> -		} else {
> -			/*
> -			 * This data segment starts at the beginning of a new
> -			 * TXBB, so we need to postpone its byte_count writing
> -			 * for later.
> -			 */
> -			/* Handle WQE wraparound. */
> -			if (dseg >=
> -			    (volatile struct mlx4_wqe_data_seg *)sq->eob)
> -				dseg = (volatile struct mlx4_wqe_data_seg *)
> -					sq->buf;
> -			dseg->addr = rte_cpu_to_be_64(addr);
> -			dseg->lkey = rte_cpu_to_be_32(lkey);
> -			pv[pv_counter].dseg = dseg;
> -			pv[pv_counter++].val = byte_count;
> +		mlx4_fill_tx_data_seg(dseg, lkey,
> +				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> +				      rte_cpu_to_be_32(sbuf->data_len ?
> +						       sbuf->data_len :
> +						       0x80000000));
> +		sbuf = sbuf->next;
> +		dseg++;
> +		nb_segs--;
> +		/* fallthrough */
> +	case 1:
> +		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> +		if (unlikely(lkey == (uint32_t)-1)) {
> +			DEBUG("%p: unable to get MP <-> MR association",
> +			      (void *)txq);
> +			return -1;
> +		}
> +		mlx4_fill_tx_data_seg(dseg, lkey,
> +				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> +				      rte_cpu_to_be_32(sbuf->data_len ?
> +						       sbuf->data_len :
> +						       0x80000000));
> +		nb_segs--;
> +		if (nb_segs) {
> +			sbuf = sbuf->next;
> +			dseg++;
> +			goto txbb_head_seg;
>  		}
> +		/* fallthrough */
> +	case 0:
> +		break;
>  	}

I think this "switch (nb_segs)" idea is an interesting approach, but should
occur inside a loop construct as previously described.

>  	/* Write the first DWORD of each TXBB save earlier. */
>  	if (pv_counter) {
> @@ -583,7 +614,6 @@ struct pv {
>  		} srcrb;
>  		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
>  		uint32_t lkey;
> -		uintptr_t addr;
>  
>  		/* Clean up old buffer. */
>  		if (likely(elt->buf != NULL)) {
> @@ -618,24 +648,19 @@ struct pv {
>  			dseg = (volatile struct mlx4_wqe_data_seg *)
>  					((uintptr_t)ctrl +
>  					sizeof(struct mlx4_wqe_ctrl_seg));
> -			addr = rte_pktmbuf_mtod(buf, uintptr_t);
> -			rte_prefetch0((volatile void *)addr);
> -			dseg->addr = rte_cpu_to_be_64(addr);
> -			/* Memory region key (big endian). */
> +
> +			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
>  			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
> -			dseg->lkey = rte_cpu_to_be_32(lkey);
> -			if (unlikely(dseg->lkey ==
> -				rte_cpu_to_be_32((uint32_t)-1))) {
> +			if (unlikely(lkey == (uint32_t)-1)) {
>  				/* MR does not exist. */
>  				DEBUG("%p: unable to get MP <-> MR association",
>  				      (void *)txq);
>  				elt->buf = NULL;
>  				break;
>  			}
> -			/* Never be TXBB aligned, no need compiler barrier. */
> -			dseg->byte_count = rte_cpu_to_be_32(buf->data_len);
> -			/* Fill the control parameters for this packet. */
> -			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
> +			mlx4_fill_tx_data_seg(dseg, lkey,
> +					      rte_pktmbuf_mtod(buf, uintptr_t),
> +					      rte_cpu_to_be_32(buf->data_len));
>  			nr_txbbs = 1;
>  		} else {
>  			nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl);
> diff --git a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h
> index 463df2b..8207232 100644
> --- a/drivers/net/mlx4/mlx4_rxtx.h
> +++ b/drivers/net/mlx4/mlx4_rxtx.h
> @@ -212,4 +212,37 @@ int mlx4_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx,
>  	return mlx4_txq_add_mr(txq, mp, i);
>  }
>  
> +/**
> + * Write Tx data segment to the SQ.
> + *
> + * @param dseg
> + *   Pointer to data segment in SQ.
> + * @param lkey
> + *   Memory region lkey.
> + * @param addr
> + *   Data address.
> + * @param byte_count
> + *   Big Endian bytes count of the data to send.

Big Endian => Big endian

How about using the dedicated type to properly document it?
See rte_be32_t from rte_byteorder.h.

> + */
> +static inline void
> +mlx4_fill_tx_data_seg(volatile struct mlx4_wqe_data_seg *dseg,
> +		       uint32_t lkey, uintptr_t addr, uint32_t byte_count)
> +{
> +	dseg->addr = rte_cpu_to_be_64(addr);
> +	dseg->lkey = rte_cpu_to_be_32(lkey);
> +#if RTE_CACHE_LINE_SIZE < 64
> +	/*
> +	 * Need a barrier here before writing the byte_count
> +	 * fields to make sure that all the data is visible
> +	 * before the byte_count field is set.
> +	 * Otherwise, if the segment begins a new cacheline,
> +	 * the HCA prefetcher could grab the 64-byte chunk and
> +	 * get a valid (!= 0xffffffff) byte count but stale
> +	 * data, and end up sending the wrong data.
> +	 */
> +	rte_io_wmb();
> +#endif /* RTE_CACHE_LINE_SIZE */
> +	dseg->byte_count = byte_count;
> +}
> +

No need to expose this function in a header file. Note that rte_cpu_*() and
rte_io*() require the inclusion of rte_byteorder.h and rte_atomic.h
respectively.

>  #endif /* MLX4_RXTX_H_ */
> -- 
> 1.8.3.1
> 

-- 
Adrien Mazarguil
6WIND


More information about the dev mailing list