[dpdk-dev,5/8] net/mlx4: merge Tx queue rings management

Message ID 1511871570-16826-6-git-send-email-matan@mellanox.com (mailing list archive)
State Superseded, archived
Delegated to: Shahaf Shuler
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Matan Azrad Nov. 28, 2017, 12:19 p.m. UTC
  The Tx queue send ring was managed by Tx block head,tail,count and mask
management variables which were used for managing the send queue remain
space and next places of empty or completted work queue entries.

This method suffered from an actual addresses recalculation per packet,
an unnecessary Tx block based calculations and an expensive dual
management of Tx rings.

Move send queue ring calculation to be based on actual addresses while
managing it by descriptors ring indexes.

Add new work queue entry pointer to the descriptor element to hold the
appropriate entry in the send queue.

Signed-off-by: Matan Azrad <matan@mellanox.com>
---
 drivers/net/mlx4/mlx4_prm.h  |  20 ++--
 drivers/net/mlx4/mlx4_rxtx.c | 241 +++++++++++++++++++------------------------
 drivers/net/mlx4/mlx4_rxtx.h |   1 +
 drivers/net/mlx4/mlx4_txq.c  |  23 +++--
 4 files changed, 126 insertions(+), 159 deletions(-)
  

Comments

Adrien Mazarguil Dec. 6, 2017, 10:58 a.m. UTC | #1
On Tue, Nov 28, 2017 at 12:19:27PM +0000, Matan Azrad wrote:
> The Tx queue send ring was managed by Tx block head,tail,count and mask
> management variables which were used for managing the send queue remain
> space and next places of empty or completted work queue entries.

completted => completed

> 
> This method suffered from an actual addresses recalculation per packet,
> an unnecessary Tx block based calculations and an expensive dual
> management of Tx rings.
> 
> Move send queue ring calculation to be based on actual addresses while
> managing it by descriptors ring indexes.
> 
> Add new work queue entry pointer to the descriptor element to hold the
> appropriate entry in the send queue.
> 
> Signed-off-by: Matan Azrad <matan@mellanox.com>
> ---
>  drivers/net/mlx4/mlx4_prm.h  |  20 ++--
>  drivers/net/mlx4/mlx4_rxtx.c | 241 +++++++++++++++++++------------------------
>  drivers/net/mlx4/mlx4_rxtx.h |   1 +
>  drivers/net/mlx4/mlx4_txq.c  |  23 +++--
>  4 files changed, 126 insertions(+), 159 deletions(-)
> 
> diff --git a/drivers/net/mlx4/mlx4_prm.h b/drivers/net/mlx4/mlx4_prm.h
> index fcc7c12..2ca303a 100644
> --- a/drivers/net/mlx4/mlx4_prm.h
> +++ b/drivers/net/mlx4/mlx4_prm.h
> @@ -54,22 +54,18 @@
>  
>  /* Typical TSO descriptor with 16 gather entries is 352 bytes. */
>  #define MLX4_MAX_WQE_SIZE 512
> -#define MLX4_MAX_WQE_TXBBS (MLX4_MAX_WQE_SIZE / MLX4_TXBB_SIZE)
> +#define MLX4_SEG_SHIFT 4
>  
>  /* Send queue stamping/invalidating information. */
>  #define MLX4_SQ_STAMP_STRIDE 64
>  #define MLX4_SQ_STAMP_DWORDS (MLX4_SQ_STAMP_STRIDE / 4)
> -#define MLX4_SQ_STAMP_SHIFT 31
> +#define MLX4_SQ_OWNER_BIT 31
>  #define MLX4_SQ_STAMP_VAL 0x7fffffff
>  
>  /* Work queue element (WQE) flags. */
> -#define MLX4_BIT_WQE_OWN 0x80000000
>  #define MLX4_WQE_CTRL_IIP_HDR_CSUM (1 << 28)
>  #define MLX4_WQE_CTRL_IL4_HDR_CSUM (1 << 27)
>  
> -#define MLX4_SIZE_TO_TXBBS(size) \
> -	(RTE_ALIGN((size), (MLX4_TXBB_SIZE)) >> (MLX4_TXBB_SHIFT))
> -
>  /* CQE checksum flags. */
>  enum {
>  	MLX4_CQE_L2_TUNNEL_IPV4 = (int)(1u << 25),
> @@ -98,17 +94,15 @@ enum {
>  struct mlx4_sq {
>  	volatile uint8_t *buf; /**< SQ buffer. */
>  	volatile uint8_t *eob; /**< End of SQ buffer */
> -	uint32_t head; /**< SQ head counter in units of TXBBS. */
> -	uint32_t tail; /**< SQ tail counter in units of TXBBS. */
> -	uint32_t txbb_cnt; /**< Num of WQEBB in the Q (should be ^2). */
> -	uint32_t txbb_cnt_mask; /**< txbbs_cnt mask (txbb_cnt is ^2). */
> -	uint32_t headroom_txbbs; /**< Num of txbbs that should be kept free. */
> +	uint32_t size; /**< SQ size includes headroom. */
> +	int32_t remain_size; /**< Remain WQE size in SQ. */

Remain => Remaining?

By "size", do you mean "room" as there could be several WQEs in there?

Note before reviewing the rest of this patch, the fact it's a signed integer
bothers me; it's probably a mistake. You should standardize on unsigned
values everywhere.

> +	/**< Default owner opcode with HW valid owner bit. */

The "/**<" syntax requires the comment to come after the documented
field. You should either move this line below "owner_opcode" or use "/**".

> +	uint32_t owner_opcode;
> +	uint32_t stamp; /**< Stamp value with an invalid HW owner bit. */
>  	volatile uint32_t *db; /**< Pointer to the doorbell. */
>  	uint32_t doorbell_qpn; /**< qp number to write to the doorbell. */
>  };
>  
> -#define mlx4_get_send_wqe(sq, n) ((sq)->buf + ((n) * (MLX4_TXBB_SIZE)))
> -
>  /* Completion queue events, numbers and masks. */
>  #define MLX4_CQ_DB_GEQ_N_MASK 0x3
>  #define MLX4_CQ_DOORBELL 0x20
> diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
> index b9cb2fc..0a8ef93 100644
> --- a/drivers/net/mlx4/mlx4_rxtx.c
> +++ b/drivers/net/mlx4/mlx4_rxtx.c
> @@ -61,9 +61,6 @@
>  #include "mlx4_rxtx.h"
>  #include "mlx4_utils.h"
>  
> -#define WQE_ONE_DATA_SEG_SIZE \
> -	(sizeof(struct mlx4_wqe_ctrl_seg) + sizeof(struct mlx4_wqe_data_seg))
> -
>  /**
>   * Pointer-value pair structure used in tx_post_send for saving the first
>   * DWORD (32 byte) of a TXBB.
> @@ -268,52 +265,48 @@ struct pv {
>   *
>   * @param sq
>   *   Pointer to the SQ structure.
> - * @param index
> - *   Index of the freed WQE.
> - * @param num_txbbs
> - *   Number of blocks to stamp.
> - *   If < 0 the routine will use the size written in the WQ entry.
> - * @param owner
> - *   The value of the WQE owner bit to use in the stamp.
> + * @param wqe
> + *   Pointer of WQE to stamp.

Looks like it's not just a simple pointer to the WQE to stamp seeing this
function also stores the address of the next WQE in the provided buffer
(uint32_t **wqe). It's not documented as such.

>   *
>   * @return
> - *   The number of Tx basic blocs (TXBB) the WQE contained.
> + *   WQE size.
>   */
> -static int
> -mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, uint16_t index, uint8_t owner)
> +static uint32_t
> +mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, volatile uint32_t **wqe)
>  {
> -	uint32_t stamp = rte_cpu_to_be_32(MLX4_SQ_STAMP_VAL |
> -					  (!!owner << MLX4_SQ_STAMP_SHIFT));
> -	volatile uint8_t *wqe = mlx4_get_send_wqe(sq,
> -						(index & sq->txbb_cnt_mask));
> -	volatile uint32_t *ptr = (volatile uint32_t *)wqe;
> -	int i;
> -	int txbbs_size;
> -	int num_txbbs;
> -
> +	uint32_t stamp = sq->stamp;
> +	volatile uint32_t *next_txbb = *wqe;
>  	/* Extract the size from the control segment of the WQE. */
> -	num_txbbs = MLX4_SIZE_TO_TXBBS((((volatile struct mlx4_wqe_ctrl_seg *)
> -					 wqe)->fence_size & 0x3f) << 4);
> -	txbbs_size = num_txbbs * MLX4_TXBB_SIZE;
> +	uint32_t size = RTE_ALIGN((uint32_t)
> +				  ((((volatile struct mlx4_wqe_ctrl_seg *)
> +				     next_txbb)->fence_size & 0x3f) << 4),
> +				  MLX4_TXBB_SIZE);
> +	uint32_t size_cd = size;
> +
>  	/* Optimize the common case when there is no wrap-around. */
> -	if (wqe + txbbs_size <= sq->eob) {
> +	if ((uintptr_t)next_txbb + size < (uintptr_t)sq->eob) {
>  		/* Stamp the freed descriptor. */
> -		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
> -			*ptr = stamp;
> -			ptr += MLX4_SQ_STAMP_DWORDS;
> -		}
> +		do {
> +			*next_txbb = stamp;
> +			next_txbb += MLX4_SQ_STAMP_DWORDS;
> +			size_cd -= MLX4_TXBB_SIZE;
> +		} while (size_cd);
>  	} else {
>  		/* Stamp the freed descriptor. */
> -		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
> -			*ptr = stamp;
> -			ptr += MLX4_SQ_STAMP_DWORDS;
> -			if ((volatile uint8_t *)ptr >= sq->eob) {
> -				ptr = (volatile uint32_t *)sq->buf;
> -				stamp ^= RTE_BE32(0x80000000);
> +		do {
> +			*next_txbb = stamp;
> +			next_txbb += MLX4_SQ_STAMP_DWORDS;
> +			if ((volatile uint8_t *)next_txbb >= sq->eob) {
> +				next_txbb = (volatile uint32_t *)sq->buf;
> +				/* Flip invalid stamping ownership. */
> +				stamp ^= RTE_BE32(0x1 << MLX4_SQ_OWNER_BIT);
> +				sq->stamp = stamp;
>  			}
> -		}
> +			size_cd -= MLX4_TXBB_SIZE;
> +		} while (size_cd);
>  	}
> -	return num_txbbs;
> +	*wqe = next_txbb;
> +	return size;
>  }
>  
>  /**
> @@ -326,24 +319,22 @@ struct pv {
>   *
>   * @param txq
>   *   Pointer to Tx queue structure.
> - *
> - * @return
> - *   0 on success, -1 on failure.
>   */
> -static int
> +static void
>  mlx4_txq_complete(struct txq *txq, const unsigned int elts_n,
>  				  struct mlx4_sq *sq)
>  {
> -	unsigned int elts_comp = txq->elts_comp;
>  	unsigned int elts_tail = txq->elts_tail;
> -	unsigned int sq_tail = sq->tail;
>  	struct mlx4_cq *cq = &txq->mcq;
>  	volatile struct mlx4_cqe *cqe;
>  	uint32_t cons_index = cq->cons_index;
> -	uint16_t new_index;
> -	uint16_t nr_txbbs = 0;
> -	int pkts = 0;
> -
> +	volatile uint32_t *first_wqe;
> +	volatile uint32_t *next_wqe = (volatile uint32_t *)
> +			((&(*txq->elts)[elts_tail])->wqe);
> +	volatile uint32_t *last_wqe;
> +	uint16_t mask = (((uintptr_t)sq->eob - (uintptr_t)sq->buf) >>
> +			 MLX4_TXBB_SHIFT) - 1;
> +	uint32_t pkts = 0;
>  	/*
>  	 * Traverse over all CQ entries reported and handle each WQ entry
>  	 * reported by them.
> @@ -353,11 +344,11 @@ struct pv {
>  		if (unlikely(!!(cqe->owner_sr_opcode & MLX4_CQE_OWNER_MASK) ^
>  		    !!(cons_index & cq->cqe_cnt)))
>  			break;
> +#ifndef NDEBUG
>  		/*
>  		 * Make sure we read the CQE after we read the ownership bit.
>  		 */
>  		rte_io_rmb();
> -#ifndef NDEBUG
>  		if (unlikely((cqe->owner_sr_opcode & MLX4_CQE_OPCODE_MASK) ==
>  			     MLX4_CQE_OPCODE_ERROR)) {
>  			volatile struct mlx4_err_cqe *cqe_err =
> @@ -366,41 +357,32 @@ struct pv {
>  			      " syndrome: 0x%x\n",
>  			      (void *)txq, cqe_err->vendor_err,
>  			      cqe_err->syndrome);
> +			break;
>  		}
>  #endif /* NDEBUG */
> -		/* Get WQE index reported in the CQE. */
> -		new_index =
> -			rte_be_to_cpu_16(cqe->wqe_index) & sq->txbb_cnt_mask;
> +		/* Get WQE address buy index from the CQE. */
> +		last_wqe = (volatile uint32_t *)((uintptr_t)sq->buf +
> +			((rte_be_to_cpu_16(cqe->wqe_index) & mask) <<
> +			 MLX4_TXBB_SHIFT));
>  		do {
>  			/* Free next descriptor. */
> -			sq_tail += nr_txbbs;
> -			nr_txbbs =
> -				mlx4_txq_stamp_freed_wqe(sq,
> -				     sq_tail & sq->txbb_cnt_mask,
> -				     !!(sq_tail & sq->txbb_cnt));
> +			first_wqe = next_wqe;
> +			sq->remain_size +=
> +				mlx4_txq_stamp_freed_wqe(sq, &next_wqe);
>  			pkts++;
> -		} while ((sq_tail & sq->txbb_cnt_mask) != new_index);
> +		} while (first_wqe != last_wqe);
>  		cons_index++;
>  	} while (1);
>  	if (unlikely(pkts == 0))
> -		return 0;
> -	/* Update CQ. */
> +		return;
> +	/* Update CQ consumer index. */
>  	cq->cons_index = cons_index;
> -	*cq->set_ci_db = rte_cpu_to_be_32(cq->cons_index & MLX4_CQ_DB_CI_MASK);
> -	sq->tail = sq_tail + nr_txbbs;
> -	/* Update the list of packets posted for transmission. */
> -	elts_comp -= pkts;
> -	assert(elts_comp <= txq->elts_comp);
> -	/*
> -	 * Assume completion status is successful as nothing can be done about
> -	 * it anyway.
> -	 */
> +	*cq->set_ci_db = rte_cpu_to_be_32(cons_index & MLX4_CQ_DB_CI_MASK);
> +	txq->elts_comp -= pkts;
>  	elts_tail += pkts;
>  	if (elts_tail >= elts_n)
>  		elts_tail -= elts_n;
>  	txq->elts_tail = elts_tail;
> -	txq->elts_comp = elts_comp;
> -	return 0;
>  }
>  
>  /**
> @@ -421,41 +403,27 @@ struct pv {
>  	return buf->pool;
>  }
>  
> -static int
> +static volatile struct mlx4_wqe_ctrl_seg *
>  mlx4_tx_burst_segs(struct rte_mbuf *buf, struct txq *txq,
> -		   volatile struct mlx4_wqe_ctrl_seg **pctrl)
> +		   volatile struct mlx4_wqe_ctrl_seg *ctrl)

Can you use this opportunity to document this function?

>  {
> -	int wqe_real_size;
> -	int nr_txbbs;
>  	struct pv *pv = (struct pv *)txq->bounce_buf;
>  	struct mlx4_sq *sq = &txq->msq;
> -	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> -	volatile struct mlx4_wqe_ctrl_seg *ctrl;
> -	volatile struct mlx4_wqe_data_seg *dseg;
>  	struct rte_mbuf *sbuf = buf;
>  	uint32_t lkey;
>  	int pv_counter = 0;
>  	int nb_segs = buf->nb_segs;
> +	int32_t wqe_size;
> +	volatile struct mlx4_wqe_data_seg *dseg =
> +		(volatile struct mlx4_wqe_data_seg *)(ctrl + 1);
>  
> -	/* Calculate the needed work queue entry size for this packet. */
> -	wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) +
> -		nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> -	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> -	/*
> -	 * Check that there is room for this WQE in the send queue and that
> -	 * the WQE size is legal.
> -	 */
> -	if (((sq->head - sq->tail) + nr_txbbs +
> -				sq->headroom_txbbs) >= sq->txbb_cnt ||
> -			nr_txbbs > MLX4_MAX_WQE_TXBBS) {
> -		return -1;
> -	}
> -	/* Get the control and data entries of the WQE. */
> -	ctrl = (volatile struct mlx4_wqe_ctrl_seg *)
> -			mlx4_get_send_wqe(sq, head_idx);
> -	dseg = (volatile struct mlx4_wqe_data_seg *)
> -			((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg));
> -	*pctrl = ctrl;
> +	ctrl->fence_size = 1 + nb_segs;
> +	wqe_size = RTE_ALIGN((int32_t)(ctrl->fence_size << MLX4_SEG_SHIFT),
> +			     MLX4_TXBB_SIZE);
> +	/* Validate WQE size and WQE space in the send queue. */
> +	if (sq->remain_size < wqe_size ||
> +	    wqe_size > MLX4_MAX_WQE_SIZE)
> +		return NULL;
>  	/*
>  	 * Fill the data segments with buffer information.
>  	 * First WQE TXBB head segment is always control segment,
> @@ -469,7 +437,7 @@ struct pv {
>  	if (unlikely(lkey == (uint32_t)-1)) {
>  		DEBUG("%p: unable to get MP <-> MR association",
>  		      (void *)txq);
> -		return -1;
> +		return NULL;
>  	}
>  	/* Handle WQE wraparound. */
>  	if (dseg >=
> @@ -501,7 +469,7 @@ struct pv {
>  		if (unlikely(lkey == (uint32_t)-1)) {
>  			DEBUG("%p: unable to get MP <-> MR association",
>  			      (void *)txq);
> -			return -1;
> +			return NULL;
>  		}
>  		mlx4_fill_tx_data_seg(dseg, lkey,
>  				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> @@ -517,7 +485,7 @@ struct pv {
>  		if (unlikely(lkey == (uint32_t)-1)) {
>  			DEBUG("%p: unable to get MP <-> MR association",
>  			      (void *)txq);
> -			return -1;
> +			return NULL;
>  		}
>  		mlx4_fill_tx_data_seg(dseg, lkey,
>  				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> @@ -533,7 +501,7 @@ struct pv {
>  		if (unlikely(lkey == (uint32_t)-1)) {
>  			DEBUG("%p: unable to get MP <-> MR association",
>  			      (void *)txq);
> -			return -1;
> +			return NULL;
>  		}
>  		mlx4_fill_tx_data_seg(dseg, lkey,
>  				      rte_pktmbuf_mtod(sbuf, uintptr_t),
> @@ -557,9 +525,10 @@ struct pv {
>  		for (--pv_counter; pv_counter  >= 0; pv_counter--)
>  			pv[pv_counter].dseg->byte_count = pv[pv_counter].val;
>  	}
> -	/* Fill the control parameters for this packet. */
> -	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> -	return nr_txbbs;
> +	sq->remain_size -= wqe_size;
> +	/* Align next WQE address to the next TXBB. */
> +	return (volatile struct mlx4_wqe_ctrl_seg *)
> +		((volatile uint8_t *)ctrl + wqe_size);
>  }
>  
>  /**
> @@ -585,7 +554,8 @@ struct pv {
>  	unsigned int i;
>  	unsigned int max;
>  	struct mlx4_sq *sq = &txq->msq;
> -	int nr_txbbs;
> +	volatile struct mlx4_wqe_ctrl_seg *ctrl;
> +	struct txq_elt *elt;
>  
>  	assert(txq->elts_comp_cd != 0);
>  	if (likely(txq->elts_comp != 0))
> @@ -599,29 +569,30 @@ struct pv {
>  	--max;
>  	if (max > pkts_n)
>  		max = pkts_n;
> +	elt = &(*txq->elts)[elts_head];
> +	/* Each element saves its appropriate work queue. */
> +	ctrl = elt->wqe;
>  	for (i = 0; (i != max); ++i) {
>  		struct rte_mbuf *buf = pkts[i];
>  		unsigned int elts_head_next =
>  			(((elts_head + 1) == elts_n) ? 0 : elts_head + 1);
>  		struct txq_elt *elt_next = &(*txq->elts)[elts_head_next];
> -		struct txq_elt *elt = &(*txq->elts)[elts_head];
> -		uint32_t owner_opcode = MLX4_OPCODE_SEND;
> -		volatile struct mlx4_wqe_ctrl_seg *ctrl;
> -		volatile struct mlx4_wqe_data_seg *dseg;
> +		uint32_t owner_opcode = sq->owner_opcode;
> +		volatile struct mlx4_wqe_data_seg *dseg =
> +				(volatile struct mlx4_wqe_data_seg *)(ctrl + 1);
> +		volatile struct mlx4_wqe_ctrl_seg *ctrl_next;
>  		union {
>  			uint32_t flags;
>  			uint16_t flags16[2];
>  		} srcrb;
> -		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
>  		uint32_t lkey;
>  
>  		/* Clean up old buffer. */
>  		if (likely(elt->buf != NULL)) {
>  			struct rte_mbuf *tmp = elt->buf;
> -

Empty line following variable declarations should stay.

>  #ifndef NDEBUG
>  			/* Poisoning. */
> -			memset(elt, 0x66, sizeof(*elt));
> +			elt->buf = (struct rte_mbuf *)0x6666666666666666;

Note this address depends on pointer size, which may in turn trigger a
compilation warning/error. Keep memset() on elt->buf.

>  #endif
>  			/* Faster than rte_pktmbuf_free(). */
>  			do {
> @@ -633,23 +604,11 @@ struct pv {
>  		}
>  		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
>  		if (buf->nb_segs == 1) {
> -			/*
> -			 * Check that there is room for this WQE in the send
> -			 * queue and that the WQE size is legal
> -			 */
> -			if (((sq->head - sq->tail) + 1 + sq->headroom_txbbs) >=
> -			     sq->txbb_cnt || 1 > MLX4_MAX_WQE_TXBBS) {
> +			/* Validate WQE space in the send queue. */
> +			if (sq->remain_size < MLX4_TXBB_SIZE) {
>  				elt->buf = NULL;
>  				break;
>  			}
> -			/* Get the control and data entries of the WQE. */
> -			ctrl = (volatile struct mlx4_wqe_ctrl_seg *)
> -					mlx4_get_send_wqe(sq, head_idx);
> -			dseg = (volatile struct mlx4_wqe_data_seg *)
> -					((uintptr_t)ctrl +
> -					sizeof(struct mlx4_wqe_ctrl_seg));
> -
> -			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
>  			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
>  			if (unlikely(lkey == (uint32_t)-1)) {
>  				/* MR does not exist. */
> @@ -658,23 +617,33 @@ struct pv {
>  				elt->buf = NULL;
>  				break;
>  			}
> -			mlx4_fill_tx_data_seg(dseg, lkey,
> +			mlx4_fill_tx_data_seg(dseg++, lkey,
>  					      rte_pktmbuf_mtod(buf, uintptr_t),
>  					      rte_cpu_to_be_32(buf->data_len));
> -			nr_txbbs = 1;
> +			/* Set WQE size in 16-byte units. */
> +			ctrl->fence_size = 0x2;
> +			sq->remain_size -= MLX4_TXBB_SIZE;
> +			/* Align next WQE address to the next TXBB. */
> +			ctrl_next = ctrl + 0x4;
>  		} else {
> -			nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl);
> -			if (nr_txbbs < 0) {
> +			ctrl_next = mlx4_tx_burst_segs(buf, txq, ctrl);
> +			if (!ctrl_next) {
>  				elt->buf = NULL;
>  				break;
>  			}
>  		}
> +		/* Hold SQ ring wrap around. */
> +		if ((volatile uint8_t *)ctrl_next >= sq->eob) {
> +			ctrl_next = (volatile struct mlx4_wqe_ctrl_seg *)
> +				((volatile uint8_t *)ctrl_next - sq->size);
> +			/* Flip HW valid ownership. */
> +			sq->owner_opcode ^= 0x1 << MLX4_SQ_OWNER_BIT;
> +		}
>  		/*
>  		 * For raw Ethernet, the SOLICIT flag is used to indicate
>  		 * that no ICRC should be calculated.
>  		 */
> -		txq->elts_comp_cd -= nr_txbbs;
> -		if (unlikely(txq->elts_comp_cd <= 0)) {
> +		if (--txq->elts_comp_cd == 0) {
>  			txq->elts_comp_cd = txq->elts_comp_cd_init;
>  			srcrb.flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT |
>  					       MLX4_WQE_CTRL_CQ_UPDATE);
> @@ -720,13 +689,13 @@ struct pv {
>  		 * executing as soon as we do).
>  		 */
>  		rte_io_wmb();
> -		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
> -					      ((sq->head & sq->txbb_cnt) ?
> -						       MLX4_BIT_WQE_OWN : 0));
> -		sq->head += nr_txbbs;
> +		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode);
>  		elt->buf = buf;
>  		bytes_sent += buf->pkt_len;
>  		elts_head = elts_head_next;
> +		elt_next->wqe = ctrl_next;
> +		ctrl = ctrl_next;
> +		elt = elt_next;
>  	}
>  	/* Take a shortcut if nothing must be sent. */
>  	if (unlikely(i == 0))
> diff --git a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h
> index 8207232..c092afa 100644
> --- a/drivers/net/mlx4/mlx4_rxtx.h
> +++ b/drivers/net/mlx4/mlx4_rxtx.h
> @@ -105,6 +105,7 @@ struct mlx4_rss {
>  /** Tx element. */
>  struct txq_elt {
>  	struct rte_mbuf *buf; /**< Buffer. */
> +	volatile struct mlx4_wqe_ctrl_seg *wqe; /**< SQ WQE. */
>  };
>  
>  /** Rx queue counters. */
> diff --git a/drivers/net/mlx4/mlx4_txq.c b/drivers/net/mlx4/mlx4_txq.c
> index 7882a4d..4c7b62a 100644
> --- a/drivers/net/mlx4/mlx4_txq.c
> +++ b/drivers/net/mlx4/mlx4_txq.c
> @@ -84,6 +84,7 @@
>  		assert(elt->buf != NULL);
>  		rte_pktmbuf_free(elt->buf);
>  		elt->buf = NULL;
> +		elt->wqe = NULL;
>  		if (++elts_tail == RTE_DIM(*elts))
>  			elts_tail = 0;
>  	}
> @@ -163,20 +164,19 @@ struct txq_mp2mr_mbuf_check_data {
>  	struct mlx4_cq *cq = &txq->mcq;
>  	struct mlx4dv_qp *dqp = mlxdv->qp.out;
>  	struct mlx4dv_cq *dcq = mlxdv->cq.out;
> -	uint32_t sq_size = (uint32_t)dqp->rq.offset - (uint32_t)dqp->sq.offset;
>  
> -	sq->buf = (uint8_t *)dqp->buf.buf + dqp->sq.offset;
>  	/* Total length, including headroom and spare WQEs. */
> -	sq->eob = sq->buf + sq_size;
> -	sq->head = 0;
> -	sq->tail = 0;
> -	sq->txbb_cnt =
> -		(dqp->sq.wqe_cnt << dqp->sq.wqe_shift) >> MLX4_TXBB_SHIFT;
> -	sq->txbb_cnt_mask = sq->txbb_cnt - 1;
> +	sq->size = (uint32_t)dqp->rq.offset - (uint32_t)dqp->sq.offset;
> +	sq->buf = (uint8_t *)dqp->buf.buf + dqp->sq.offset;
> +	sq->eob = sq->buf + sq->size;
> +	uint32_t headroom_size = 2048 + (1 << dqp->sq.wqe_shift);
> +	/* Continuous headroom size bytes must always stay freed. */
> +	sq->remain_size = sq->size - headroom_size;
> +	sq->owner_opcode = MLX4_OPCODE_SEND | (0 << MLX4_SQ_OWNER_BIT);
> +	sq->stamp = rte_cpu_to_be_32(MLX4_SQ_STAMP_VAL |
> +				     (0 << MLX4_SQ_OWNER_BIT));
>  	sq->db = dqp->sdb;
>  	sq->doorbell_qpn = dqp->doorbell_qpn;
> -	sq->headroom_txbbs =
> -		(2048 + (1 << dqp->sq.wqe_shift)) >> MLX4_TXBB_SHIFT;
>  	cq->buf = dcq->buf.buf;
>  	cq->cqe_cnt = dcq->cqe_cnt;
>  	cq->set_ci_db = dcq->set_ci_db;
> @@ -362,6 +362,9 @@ struct txq_mp2mr_mbuf_check_data {
>  		goto error;
>  	}
>  	mlx4_txq_fill_dv_obj_info(txq, &mlxdv);
> +	/* Save first wqe pointer in the first element. */
> +	(&(*txq->elts)[0])->wqe =
> +		(volatile struct mlx4_wqe_ctrl_seg *)txq->msq.buf;
>  	/* Pre-register known mempools. */
>  	rte_mempool_walk(mlx4_txq_mp2mr_iter, txq);
>  	DEBUG("%p: adding Tx queue %p to list", (void *)dev, (void *)txq);
> -- 
> 1.8.3.1
> 

Otherwise this patch looks OK.
  
Matan Azrad Dec. 6, 2017, 11:43 a.m. UTC | #2
Hi Adrien

> -----Original Message-----
> From: Adrien Mazarguil [mailto:adrien.mazarguil@6wind.com]
> Sent: Wednesday, December 6, 2017 12:59 PM
> To: Matan Azrad <matan@mellanox.com>
> Cc: dev@dpdk.org
> Subject: Re: [PATCH 5/8] net/mlx4: merge Tx queue rings management
> 
> On Tue, Nov 28, 2017 at 12:19:27PM +0000, Matan Azrad wrote:
> > The Tx queue send ring was managed by Tx block head,tail,count and
> > mask management variables which were used for managing the send
> queue
> > remain space and next places of empty or completted work queue entries.
> 
> completted => completed
> 
> >
> > This method suffered from an actual addresses recalculation per
> > packet, an unnecessary Tx block based calculations and an expensive
> > dual management of Tx rings.
> >
> > Move send queue ring calculation to be based on actual addresses while
> > managing it by descriptors ring indexes.
> >
> > Add new work queue entry pointer to the descriptor element to hold the
> > appropriate entry in the send queue.
> >
> > Signed-off-by: Matan Azrad <matan@mellanox.com>
> > ---
> >  drivers/net/mlx4/mlx4_prm.h  |  20 ++--  drivers/net/mlx4/mlx4_rxtx.c
> > | 241 +++++++++++++++++++------------------------
> >  drivers/net/mlx4/mlx4_rxtx.h |   1 +
> >  drivers/net/mlx4/mlx4_txq.c  |  23 +++--
> >  4 files changed, 126 insertions(+), 159 deletions(-)
> >
> > diff --git a/drivers/net/mlx4/mlx4_prm.h b/drivers/net/mlx4/mlx4_prm.h
> > index fcc7c12..2ca303a 100644
> > --- a/drivers/net/mlx4/mlx4_prm.h
> > +++ b/drivers/net/mlx4/mlx4_prm.h
> > @@ -54,22 +54,18 @@
> >
> >  /* Typical TSO descriptor with 16 gather entries is 352 bytes. */
> > #define MLX4_MAX_WQE_SIZE 512 -#define MLX4_MAX_WQE_TXBBS
> > (MLX4_MAX_WQE_SIZE / MLX4_TXBB_SIZE)
> > +#define MLX4_SEG_SHIFT 4
> >
> >  /* Send queue stamping/invalidating information. */  #define
> > MLX4_SQ_STAMP_STRIDE 64  #define MLX4_SQ_STAMP_DWORDS
> > (MLX4_SQ_STAMP_STRIDE / 4) -#define MLX4_SQ_STAMP_SHIFT 31
> > +#define MLX4_SQ_OWNER_BIT 31
> >  #define MLX4_SQ_STAMP_VAL 0x7fffffff
> >
> >  /* Work queue element (WQE) flags. */ -#define MLX4_BIT_WQE_OWN
> > 0x80000000  #define MLX4_WQE_CTRL_IIP_HDR_CSUM (1 << 28)  #define
> > MLX4_WQE_CTRL_IL4_HDR_CSUM (1 << 27)
> >
> > -#define MLX4_SIZE_TO_TXBBS(size) \
> > -	(RTE_ALIGN((size), (MLX4_TXBB_SIZE)) >> (MLX4_TXBB_SHIFT))
> > -
> >  /* CQE checksum flags. */
> >  enum {
> >  	MLX4_CQE_L2_TUNNEL_IPV4 = (int)(1u << 25), @@ -98,17 +94,15
> @@ enum
> > {  struct mlx4_sq {
> >  	volatile uint8_t *buf; /**< SQ buffer. */
> >  	volatile uint8_t *eob; /**< End of SQ buffer */
> > -	uint32_t head; /**< SQ head counter in units of TXBBS. */
> > -	uint32_t tail; /**< SQ tail counter in units of TXBBS. */
> > -	uint32_t txbb_cnt; /**< Num of WQEBB in the Q (should be ^2). */
> > -	uint32_t txbb_cnt_mask; /**< txbbs_cnt mask (txbb_cnt is ^2). */
> > -	uint32_t headroom_txbbs; /**< Num of txbbs that should be kept
> free. */
> > +	uint32_t size; /**< SQ size includes headroom. */
> > +	int32_t remain_size; /**< Remain WQE size in SQ. */
> 
> Remain => Remaining?
> 
OK
> By "size", do you mean "room" as there could be several WQEs in there?
> 
Size in bytes.
remaining size | remaining space | remaining room | remaining bytes , What are you preferred?

> Note before reviewing the rest of this patch, the fact it's a signed integer
> bothers me; it's probably a mistake.

There is place in the code where this variable can used for signed calculations.

> You should standardize on unsigned values everywhere.

Why?
Each field with the most appropriate type.

> 

> > +	/**< Default owner opcode with HW valid owner bit. */
> 
> The "/**<" syntax requires the comment to come after the documented
> field. You should either move this line below "owner_opcode" or use "/**".
> 
OK

> > +	uint32_t owner_opcode;
> > +	uint32_t stamp; /**< Stamp value with an invalid HW owner bit. */
> >  	volatile uint32_t *db; /**< Pointer to the doorbell. */
> >  	uint32_t doorbell_qpn; /**< qp number to write to the doorbell. */
> > };
> >
> > -#define mlx4_get_send_wqe(sq, n) ((sq)->buf + ((n) *
> > (MLX4_TXBB_SIZE)))
> > -
> >  /* Completion queue events, numbers and masks. */  #define
> > MLX4_CQ_DB_GEQ_N_MASK 0x3  #define MLX4_CQ_DOORBELL 0x20 diff -
> -git
> > a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c index
> > b9cb2fc..0a8ef93 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.c
> > +++ b/drivers/net/mlx4/mlx4_rxtx.c
> > @@ -61,9 +61,6 @@
> >  #include "mlx4_rxtx.h"
> >  #include "mlx4_utils.h"
> >
> > -#define WQE_ONE_DATA_SEG_SIZE \
> > -	(sizeof(struct mlx4_wqe_ctrl_seg) + sizeof(struct
> mlx4_wqe_data_seg))
> > -
> >  /**
> >   * Pointer-value pair structure used in tx_post_send for saving the first
> >   * DWORD (32 byte) of a TXBB.
> > @@ -268,52 +265,48 @@ struct pv {
> >   *
> >   * @param sq
> >   *   Pointer to the SQ structure.
> > - * @param index
> > - *   Index of the freed WQE.
> > - * @param num_txbbs
> > - *   Number of blocks to stamp.
> > - *   If < 0 the routine will use the size written in the WQ entry.
> > - * @param owner
> > - *   The value of the WQE owner bit to use in the stamp.
> > + * @param wqe
> > + *   Pointer of WQE to stamp.
> 
> Looks like it's not just a simple pointer to the WQE to stamp seeing this
> function also stores the address of the next WQE in the provided buffer
> (uint32_t **wqe). It's not documented as such.
> 
Yes, you right, I will change it, it is going to be changed in the next series patch :)

> >   *
> >   * @return
> > - *   The number of Tx basic blocs (TXBB) the WQE contained.
> > + *   WQE size.
> >   */
> > -static int
> > -mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, uint16_t index, uint8_t
> > owner)
> > +static uint32_t
> > +mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, volatile uint32_t
> **wqe)
> >  {
> > -	uint32_t stamp = rte_cpu_to_be_32(MLX4_SQ_STAMP_VAL |
> > -					  (!!owner <<
> MLX4_SQ_STAMP_SHIFT));
> > -	volatile uint8_t *wqe = mlx4_get_send_wqe(sq,
> > -						(index & sq-
> >txbb_cnt_mask));
> > -	volatile uint32_t *ptr = (volatile uint32_t *)wqe;
> > -	int i;
> > -	int txbbs_size;
> > -	int num_txbbs;
> > -
> > +	uint32_t stamp = sq->stamp;
> > +	volatile uint32_t *next_txbb = *wqe;
> >  	/* Extract the size from the control segment of the WQE. */
> > -	num_txbbs = MLX4_SIZE_TO_TXBBS((((volatile struct
> mlx4_wqe_ctrl_seg *)
> > -					 wqe)->fence_size & 0x3f) << 4);
> > -	txbbs_size = num_txbbs * MLX4_TXBB_SIZE;
> > +	uint32_t size = RTE_ALIGN((uint32_t)
> > +				  ((((volatile struct mlx4_wqe_ctrl_seg *)
> > +				     next_txbb)->fence_size & 0x3f) << 4),
> > +				  MLX4_TXBB_SIZE);
> > +	uint32_t size_cd = size;
> > +
> >  	/* Optimize the common case when there is no wrap-around. */
> > -	if (wqe + txbbs_size <= sq->eob) {
> > +	if ((uintptr_t)next_txbb + size < (uintptr_t)sq->eob) {
> >  		/* Stamp the freed descriptor. */
> > -		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
> > -			*ptr = stamp;
> > -			ptr += MLX4_SQ_STAMP_DWORDS;
> > -		}
> > +		do {
> > +			*next_txbb = stamp;
> > +			next_txbb += MLX4_SQ_STAMP_DWORDS;
> > +			size_cd -= MLX4_TXBB_SIZE;
> > +		} while (size_cd);
> >  	} else {
> >  		/* Stamp the freed descriptor. */
> > -		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
> > -			*ptr = stamp;
> > -			ptr += MLX4_SQ_STAMP_DWORDS;
> > -			if ((volatile uint8_t *)ptr >= sq->eob) {
> > -				ptr = (volatile uint32_t *)sq->buf;
> > -				stamp ^= RTE_BE32(0x80000000);
> > +		do {
> > +			*next_txbb = stamp;
> > +			next_txbb += MLX4_SQ_STAMP_DWORDS;
> > +			if ((volatile uint8_t *)next_txbb >= sq->eob) {
> > +				next_txbb = (volatile uint32_t *)sq->buf;
> > +				/* Flip invalid stamping ownership. */
> > +				stamp ^= RTE_BE32(0x1 <<
> MLX4_SQ_OWNER_BIT);
> > +				sq->stamp = stamp;
> >  			}
> > -		}
> > +			size_cd -= MLX4_TXBB_SIZE;
> > +		} while (size_cd);
> >  	}
> > -	return num_txbbs;
> > +	*wqe = next_txbb;
> > +	return size;
> >  }
> >
> >  /**
> > @@ -326,24 +319,22 @@ struct pv {
> >   *
> >   * @param txq
> >   *   Pointer to Tx queue structure.
> > - *
> > - * @return
> > - *   0 on success, -1 on failure.
> >   */
> > -static int
> > +static void
> >  mlx4_txq_complete(struct txq *txq, const unsigned int elts_n,
> >  				  struct mlx4_sq *sq)
> >  {
> > -	unsigned int elts_comp = txq->elts_comp;
> >  	unsigned int elts_tail = txq->elts_tail;
> > -	unsigned int sq_tail = sq->tail;
> >  	struct mlx4_cq *cq = &txq->mcq;
> >  	volatile struct mlx4_cqe *cqe;
> >  	uint32_t cons_index = cq->cons_index;
> > -	uint16_t new_index;
> > -	uint16_t nr_txbbs = 0;
> > -	int pkts = 0;
> > -
> > +	volatile uint32_t *first_wqe;
> > +	volatile uint32_t *next_wqe = (volatile uint32_t *)
> > +			((&(*txq->elts)[elts_tail])->wqe);
> > +	volatile uint32_t *last_wqe;
> > +	uint16_t mask = (((uintptr_t)sq->eob - (uintptr_t)sq->buf) >>
> > +			 MLX4_TXBB_SHIFT) - 1;
> > +	uint32_t pkts = 0;
> >  	/*
> >  	 * Traverse over all CQ entries reported and handle each WQ entry
> >  	 * reported by them.
> > @@ -353,11 +344,11 @@ struct pv {
> >  		if (unlikely(!!(cqe->owner_sr_opcode &
> MLX4_CQE_OWNER_MASK) ^
> >  		    !!(cons_index & cq->cqe_cnt)))
> >  			break;
> > +#ifndef NDEBUG
> >  		/*
> >  		 * Make sure we read the CQE after we read the ownership
> bit.
> >  		 */
> >  		rte_io_rmb();
> > -#ifndef NDEBUG
> >  		if (unlikely((cqe->owner_sr_opcode &
> MLX4_CQE_OPCODE_MASK) ==
> >  			     MLX4_CQE_OPCODE_ERROR)) {
> >  			volatile struct mlx4_err_cqe *cqe_err = @@ -366,41
> +357,32 @@
> > struct pv {
> >  			      " syndrome: 0x%x\n",
> >  			      (void *)txq, cqe_err->vendor_err,
> >  			      cqe_err->syndrome);
> > +			break;
> >  		}
> >  #endif /* NDEBUG */
> > -		/* Get WQE index reported in the CQE. */
> > -		new_index =
> > -			rte_be_to_cpu_16(cqe->wqe_index) & sq-
> >txbb_cnt_mask;
> > +		/* Get WQE address buy index from the CQE. */
> > +		last_wqe = (volatile uint32_t *)((uintptr_t)sq->buf +
> > +			((rte_be_to_cpu_16(cqe->wqe_index) & mask) <<
> > +			 MLX4_TXBB_SHIFT));
> >  		do {
> >  			/* Free next descriptor. */
> > -			sq_tail += nr_txbbs;
> > -			nr_txbbs =
> > -				mlx4_txq_stamp_freed_wqe(sq,
> > -				     sq_tail & sq->txbb_cnt_mask,
> > -				     !!(sq_tail & sq->txbb_cnt));
> > +			first_wqe = next_wqe;
> > +			sq->remain_size +=
> > +				mlx4_txq_stamp_freed_wqe(sq,
> &next_wqe);
> >  			pkts++;
> > -		} while ((sq_tail & sq->txbb_cnt_mask) != new_index);
> > +		} while (first_wqe != last_wqe);
> >  		cons_index++;
> >  	} while (1);
> >  	if (unlikely(pkts == 0))
> > -		return 0;
> > -	/* Update CQ. */
> > +		return;
> > +	/* Update CQ consumer index. */
> >  	cq->cons_index = cons_index;
> > -	*cq->set_ci_db = rte_cpu_to_be_32(cq->cons_index &
> MLX4_CQ_DB_CI_MASK);
> > -	sq->tail = sq_tail + nr_txbbs;
> > -	/* Update the list of packets posted for transmission. */
> > -	elts_comp -= pkts;
> > -	assert(elts_comp <= txq->elts_comp);
> > -	/*
> > -	 * Assume completion status is successful as nothing can be done
> about
> > -	 * it anyway.
> > -	 */
> > +	*cq->set_ci_db = rte_cpu_to_be_32(cons_index &
> MLX4_CQ_DB_CI_MASK);
> > +	txq->elts_comp -= pkts;
> >  	elts_tail += pkts;
> >  	if (elts_tail >= elts_n)
> >  		elts_tail -= elts_n;
> >  	txq->elts_tail = elts_tail;
> > -	txq->elts_comp = elts_comp;
> > -	return 0;
> >  }
> >
> >  /**
> > @@ -421,41 +403,27 @@ struct pv {
> >  	return buf->pool;
> >  }
> >
> > -static int
> > +static volatile struct mlx4_wqe_ctrl_seg *
> >  mlx4_tx_burst_segs(struct rte_mbuf *buf, struct txq *txq,
> > -		   volatile struct mlx4_wqe_ctrl_seg **pctrl)
> > +		   volatile struct mlx4_wqe_ctrl_seg *ctrl)
> 
> Can you use this opportunity to document this function?
> 
Sure, new patch for this?

> >  {
> > -	int wqe_real_size;
> > -	int nr_txbbs;
> >  	struct pv *pv = (struct pv *)txq->bounce_buf;
> >  	struct mlx4_sq *sq = &txq->msq;
> > -	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> > -	volatile struct mlx4_wqe_ctrl_seg *ctrl;
> > -	volatile struct mlx4_wqe_data_seg *dseg;
> >  	struct rte_mbuf *sbuf = buf;
> >  	uint32_t lkey;
> >  	int pv_counter = 0;
> >  	int nb_segs = buf->nb_segs;
> > +	int32_t wqe_size;
> > +	volatile struct mlx4_wqe_data_seg *dseg =
> > +		(volatile struct mlx4_wqe_data_seg *)(ctrl + 1);
> >
> > -	/* Calculate the needed work queue entry size for this packet. */
> > -	wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) +
> > -		nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> > -	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> > -	/*
> > -	 * Check that there is room for this WQE in the send queue and that
> > -	 * the WQE size is legal.
> > -	 */
> > -	if (((sq->head - sq->tail) + nr_txbbs +
> > -				sq->headroom_txbbs) >= sq->txbb_cnt ||
> > -			nr_txbbs > MLX4_MAX_WQE_TXBBS) {
> > -		return -1;
> > -	}
> > -	/* Get the control and data entries of the WQE. */
> > -	ctrl = (volatile struct mlx4_wqe_ctrl_seg *)
> > -			mlx4_get_send_wqe(sq, head_idx);
> > -	dseg = (volatile struct mlx4_wqe_data_seg *)
> > -			((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg));
> > -	*pctrl = ctrl;
> > +	ctrl->fence_size = 1 + nb_segs;
> > +	wqe_size = RTE_ALIGN((int32_t)(ctrl->fence_size <<
> MLX4_SEG_SHIFT),
> > +			     MLX4_TXBB_SIZE);
> > +	/* Validate WQE size and WQE space in the send queue. */
> > +	if (sq->remain_size < wqe_size ||
> > +	    wqe_size > MLX4_MAX_WQE_SIZE)
> > +		return NULL;
> >  	/*
> >  	 * Fill the data segments with buffer information.
> >  	 * First WQE TXBB head segment is always control segment, @@ -
> 469,7
> > +437,7 @@ struct pv {
> >  	if (unlikely(lkey == (uint32_t)-1)) {
> >  		DEBUG("%p: unable to get MP <-> MR association",
> >  		      (void *)txq);
> > -		return -1;
> > +		return NULL;
> >  	}
> >  	/* Handle WQE wraparound. */
> >  	if (dseg >=
> > @@ -501,7 +469,7 @@ struct pv {
> >  		if (unlikely(lkey == (uint32_t)-1)) {
> >  			DEBUG("%p: unable to get MP <-> MR association",
> >  			      (void *)txq);
> > -			return -1;
> > +			return NULL;
> >  		}
> >  		mlx4_fill_tx_data_seg(dseg, lkey,
> >  				      rte_pktmbuf_mtod(sbuf, uintptr_t), @@ -
> 517,7 +485,7 @@
> > struct pv {
> >  		if (unlikely(lkey == (uint32_t)-1)) {
> >  			DEBUG("%p: unable to get MP <-> MR association",
> >  			      (void *)txq);
> > -			return -1;
> > +			return NULL;
> >  		}
> >  		mlx4_fill_tx_data_seg(dseg, lkey,
> >  				      rte_pktmbuf_mtod(sbuf, uintptr_t), @@ -
> 533,7 +501,7 @@
> > struct pv {
> >  		if (unlikely(lkey == (uint32_t)-1)) {
> >  			DEBUG("%p: unable to get MP <-> MR association",
> >  			      (void *)txq);
> > -			return -1;
> > +			return NULL;
> >  		}
> >  		mlx4_fill_tx_data_seg(dseg, lkey,
> >  				      rte_pktmbuf_mtod(sbuf, uintptr_t), @@ -
> 557,9 +525,10 @@
> > struct pv {
> >  		for (--pv_counter; pv_counter  >= 0; pv_counter--)
> >  			pv[pv_counter].dseg->byte_count =
> pv[pv_counter].val;
> >  	}
> > -	/* Fill the control parameters for this packet. */
> > -	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> > -	return nr_txbbs;
> > +	sq->remain_size -= wqe_size;
> > +	/* Align next WQE address to the next TXBB. */
> > +	return (volatile struct mlx4_wqe_ctrl_seg *)
> > +		((volatile uint8_t *)ctrl + wqe_size);
> >  }
> >
> >  /**
> > @@ -585,7 +554,8 @@ struct pv {
> >  	unsigned int i;
> >  	unsigned int max;
> >  	struct mlx4_sq *sq = &txq->msq;
> > -	int nr_txbbs;
> > +	volatile struct mlx4_wqe_ctrl_seg *ctrl;
> > +	struct txq_elt *elt;
> >
> >  	assert(txq->elts_comp_cd != 0);
> >  	if (likely(txq->elts_comp != 0))
> > @@ -599,29 +569,30 @@ struct pv {
> >  	--max;
> >  	if (max > pkts_n)
> >  		max = pkts_n;
> > +	elt = &(*txq->elts)[elts_head];
> > +	/* Each element saves its appropriate work queue. */
> > +	ctrl = elt->wqe;
> >  	for (i = 0; (i != max); ++i) {
> >  		struct rte_mbuf *buf = pkts[i];
> >  		unsigned int elts_head_next =
> >  			(((elts_head + 1) == elts_n) ? 0 : elts_head + 1);
> >  		struct txq_elt *elt_next = &(*txq->elts)[elts_head_next];
> > -		struct txq_elt *elt = &(*txq->elts)[elts_head];
> > -		uint32_t owner_opcode = MLX4_OPCODE_SEND;
> > -		volatile struct mlx4_wqe_ctrl_seg *ctrl;
> > -		volatile struct mlx4_wqe_data_seg *dseg;
> > +		uint32_t owner_opcode = sq->owner_opcode;
> > +		volatile struct mlx4_wqe_data_seg *dseg =
> > +				(volatile struct mlx4_wqe_data_seg *)(ctrl +
> 1);
> > +		volatile struct mlx4_wqe_ctrl_seg *ctrl_next;
> >  		union {
> >  			uint32_t flags;
> >  			uint16_t flags16[2];
> >  		} srcrb;
> > -		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> >  		uint32_t lkey;
> >
> >  		/* Clean up old buffer. */
> >  		if (likely(elt->buf != NULL)) {
> >  			struct rte_mbuf *tmp = elt->buf;
> > -
> 
> Empty line following variable declarations should stay.
> 
> >  #ifndef NDEBUG
> >  			/* Poisoning. */
> > -			memset(elt, 0x66, sizeof(*elt));
> > +			elt->buf = (struct rte_mbuf *)0x6666666666666666;
> 
> Note this address depends on pointer size, which may in turn trigger a
> compilation warning/error. Keep memset() on elt->buf.
>

 ok

> >  #endif
> >  			/* Faster than rte_pktmbuf_free(). */
> >  			do {
> > @@ -633,23 +604,11 @@ struct pv {
> >  		}
> >  		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
> >  		if (buf->nb_segs == 1) {
> > -			/*
> > -			 * Check that there is room for this WQE in the send
> > -			 * queue and that the WQE size is legal
> > -			 */
> > -			if (((sq->head - sq->tail) + 1 + sq->headroom_txbbs)
> >=
> > -			     sq->txbb_cnt || 1 > MLX4_MAX_WQE_TXBBS) {
> > +			/* Validate WQE space in the send queue. */
> > +			if (sq->remain_size < MLX4_TXBB_SIZE) {
> >  				elt->buf = NULL;
> >  				break;
> >  			}
> > -			/* Get the control and data entries of the WQE. */
> > -			ctrl = (volatile struct mlx4_wqe_ctrl_seg *)
> > -					mlx4_get_send_wqe(sq, head_idx);
> > -			dseg = (volatile struct mlx4_wqe_data_seg *)
> > -					((uintptr_t)ctrl +
> > -					sizeof(struct mlx4_wqe_ctrl_seg));
> > -
> > -			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> >  			lkey = mlx4_txq_mp2mr(txq,
> mlx4_txq_mb2mp(buf));
> >  			if (unlikely(lkey == (uint32_t)-1)) {
> >  				/* MR does not exist. */
> > @@ -658,23 +617,33 @@ struct pv {
> >  				elt->buf = NULL;
> >  				break;
> >  			}
> > -			mlx4_fill_tx_data_seg(dseg, lkey,
> > +			mlx4_fill_tx_data_seg(dseg++, lkey,
> >  					      rte_pktmbuf_mtod(buf,
> uintptr_t),
> >  					      rte_cpu_to_be_32(buf-
> >data_len));
> > -			nr_txbbs = 1;
> > +			/* Set WQE size in 16-byte units. */
> > +			ctrl->fence_size = 0x2;
> > +			sq->remain_size -= MLX4_TXBB_SIZE;
> > +			/* Align next WQE address to the next TXBB. */
> > +			ctrl_next = ctrl + 0x4;
> >  		} else {
> > -			nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl);
> > -			if (nr_txbbs < 0) {
> > +			ctrl_next = mlx4_tx_burst_segs(buf, txq, ctrl);
> > +			if (!ctrl_next) {
> >  				elt->buf = NULL;
> >  				break;
> >  			}
> >  		}
> > +		/* Hold SQ ring wrap around. */
> > +		if ((volatile uint8_t *)ctrl_next >= sq->eob) {
> > +			ctrl_next = (volatile struct mlx4_wqe_ctrl_seg *)
> > +				((volatile uint8_t *)ctrl_next - sq->size);
> > +			/* Flip HW valid ownership. */
> > +			sq->owner_opcode ^= 0x1 <<
> MLX4_SQ_OWNER_BIT;
> > +		}
> >  		/*
> >  		 * For raw Ethernet, the SOLICIT flag is used to indicate
> >  		 * that no ICRC should be calculated.
> >  		 */
> > -		txq->elts_comp_cd -= nr_txbbs;
> > -		if (unlikely(txq->elts_comp_cd <= 0)) {
> > +		if (--txq->elts_comp_cd == 0) {
> >  			txq->elts_comp_cd = txq->elts_comp_cd_init;
> >  			srcrb.flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT |
> >  					       MLX4_WQE_CTRL_CQ_UPDATE);
> @@ -720,13 +689,13 @@ struct pv
> > {
> >  		 * executing as soon as we do).
> >  		 */
> >  		rte_io_wmb();
> > -		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
> > -					      ((sq->head & sq->txbb_cnt) ?
> > -						       MLX4_BIT_WQE_OWN :
> 0));
> > -		sq->head += nr_txbbs;
> > +		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode);
> >  		elt->buf = buf;
> >  		bytes_sent += buf->pkt_len;
> >  		elts_head = elts_head_next;
> > +		elt_next->wqe = ctrl_next;
> > +		ctrl = ctrl_next;
> > +		elt = elt_next;
> >  	}
> >  	/* Take a shortcut if nothing must be sent. */
> >  	if (unlikely(i == 0))
> > diff --git a/drivers/net/mlx4/mlx4_rxtx.h
> > b/drivers/net/mlx4/mlx4_rxtx.h index 8207232..c092afa 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.h
> > +++ b/drivers/net/mlx4/mlx4_rxtx.h
> > @@ -105,6 +105,7 @@ struct mlx4_rss {
> >  /** Tx element. */
> >  struct txq_elt {
> >  	struct rte_mbuf *buf; /**< Buffer. */
> > +	volatile struct mlx4_wqe_ctrl_seg *wqe; /**< SQ WQE. */
> >  };
> >
> >  /** Rx queue counters. */
> > diff --git a/drivers/net/mlx4/mlx4_txq.c b/drivers/net/mlx4/mlx4_txq.c
> > index 7882a4d..4c7b62a 100644
> > --- a/drivers/net/mlx4/mlx4_txq.c
> > +++ b/drivers/net/mlx4/mlx4_txq.c
> > @@ -84,6 +84,7 @@
> >  		assert(elt->buf != NULL);
> >  		rte_pktmbuf_free(elt->buf);
> >  		elt->buf = NULL;
> > +		elt->wqe = NULL;
> >  		if (++elts_tail == RTE_DIM(*elts))
> >  			elts_tail = 0;
> >  	}
> > @@ -163,20 +164,19 @@ struct txq_mp2mr_mbuf_check_data {
> >  	struct mlx4_cq *cq = &txq->mcq;
> >  	struct mlx4dv_qp *dqp = mlxdv->qp.out;
> >  	struct mlx4dv_cq *dcq = mlxdv->cq.out;
> > -	uint32_t sq_size = (uint32_t)dqp->rq.offset - (uint32_t)dqp-
> >sq.offset;
> >
> > -	sq->buf = (uint8_t *)dqp->buf.buf + dqp->sq.offset;
> >  	/* Total length, including headroom and spare WQEs. */
> > -	sq->eob = sq->buf + sq_size;
> > -	sq->head = 0;
> > -	sq->tail = 0;
> > -	sq->txbb_cnt =
> > -		(dqp->sq.wqe_cnt << dqp->sq.wqe_shift) >>
> MLX4_TXBB_SHIFT;
> > -	sq->txbb_cnt_mask = sq->txbb_cnt - 1;
> > +	sq->size = (uint32_t)dqp->rq.offset - (uint32_t)dqp->sq.offset;
> > +	sq->buf = (uint8_t *)dqp->buf.buf + dqp->sq.offset;
> > +	sq->eob = sq->buf + sq->size;
> > +	uint32_t headroom_size = 2048 + (1 << dqp->sq.wqe_shift);
> > +	/* Continuous headroom size bytes must always stay freed. */
> > +	sq->remain_size = sq->size - headroom_size;
> > +	sq->owner_opcode = MLX4_OPCODE_SEND | (0 <<
> MLX4_SQ_OWNER_BIT);
> > +	sq->stamp = rte_cpu_to_be_32(MLX4_SQ_STAMP_VAL |
> > +				     (0 << MLX4_SQ_OWNER_BIT));
> >  	sq->db = dqp->sdb;
> >  	sq->doorbell_qpn = dqp->doorbell_qpn;
> > -	sq->headroom_txbbs =
> > -		(2048 + (1 << dqp->sq.wqe_shift)) >> MLX4_TXBB_SHIFT;
> >  	cq->buf = dcq->buf.buf;
> >  	cq->cqe_cnt = dcq->cqe_cnt;
> >  	cq->set_ci_db = dcq->set_ci_db;
> > @@ -362,6 +362,9 @@ struct txq_mp2mr_mbuf_check_data {
> >  		goto error;
> >  	}
> >  	mlx4_txq_fill_dv_obj_info(txq, &mlxdv);
> > +	/* Save first wqe pointer in the first element. */
> > +	(&(*txq->elts)[0])->wqe =
> > +		(volatile struct mlx4_wqe_ctrl_seg *)txq->msq.buf;
> >  	/* Pre-register known mempools. */
> >  	rte_mempool_walk(mlx4_txq_mp2mr_iter, txq);
> >  	DEBUG("%p: adding Tx queue %p to list", (void *)dev, (void *)txq);
> > --
> > 1.8.3.1
> >
> 
> Otherwise this patch looks OK.
> 
> --
> Adrien Mazarguil
> 6WIND
  
Adrien Mazarguil Dec. 6, 2017, 12:09 p.m. UTC | #3
On Wed, Dec 06, 2017 at 11:43:25AM +0000, Matan Azrad wrote:
> Hi Adrien
> 
> > -----Original Message-----
> > From: Adrien Mazarguil [mailto:adrien.mazarguil@6wind.com]
> > Sent: Wednesday, December 6, 2017 12:59 PM
> > To: Matan Azrad <matan@mellanox.com>
> > Cc: dev@dpdk.org
> > Subject: Re: [PATCH 5/8] net/mlx4: merge Tx queue rings management
> > 
> > On Tue, Nov 28, 2017 at 12:19:27PM +0000, Matan Azrad wrote:
> > > The Tx queue send ring was managed by Tx block head,tail,count and
> > > mask management variables which were used for managing the send
> > queue
> > > remain space and next places of empty or completted work queue entries.
> > 
> > completted => completed
> > 
> > >
> > > This method suffered from an actual addresses recalculation per
> > > packet, an unnecessary Tx block based calculations and an expensive
> > > dual management of Tx rings.
> > >
> > > Move send queue ring calculation to be based on actual addresses while
> > > managing it by descriptors ring indexes.
> > >
> > > Add new work queue entry pointer to the descriptor element to hold the
> > > appropriate entry in the send queue.
> > >
> > > Signed-off-by: Matan Azrad <matan@mellanox.com>
> > > ---
> > >  drivers/net/mlx4/mlx4_prm.h  |  20 ++--  drivers/net/mlx4/mlx4_rxtx.c
> > > | 241 +++++++++++++++++++------------------------
> > >  drivers/net/mlx4/mlx4_rxtx.h |   1 +
> > >  drivers/net/mlx4/mlx4_txq.c  |  23 +++--
> > >  4 files changed, 126 insertions(+), 159 deletions(-)
> > >
> > > diff --git a/drivers/net/mlx4/mlx4_prm.h b/drivers/net/mlx4/mlx4_prm.h
> > > index fcc7c12..2ca303a 100644
> > > --- a/drivers/net/mlx4/mlx4_prm.h
> > > +++ b/drivers/net/mlx4/mlx4_prm.h
<snip>
> > > {  struct mlx4_sq {
> > >  	volatile uint8_t *buf; /**< SQ buffer. */
> > >  	volatile uint8_t *eob; /**< End of SQ buffer */
> > > -	uint32_t head; /**< SQ head counter in units of TXBBS. */
> > > -	uint32_t tail; /**< SQ tail counter in units of TXBBS. */
> > > -	uint32_t txbb_cnt; /**< Num of WQEBB in the Q (should be ^2). */
> > > -	uint32_t txbb_cnt_mask; /**< txbbs_cnt mask (txbb_cnt is ^2). */
> > > -	uint32_t headroom_txbbs; /**< Num of txbbs that should be kept
> > free. */
> > > +	uint32_t size; /**< SQ size includes headroom. */
> > > +	int32_t remain_size; /**< Remain WQE size in SQ. */
> > 
> > Remain => Remaining?
> > 
> OK
> > By "size", do you mean "room" as there could be several WQEs in there?
> > 
> Size in bytes.
> remaining size | remaining space | remaining room | remaining bytes , What are you preferred?

I think this should fully clarify:

 Remaining WQE room in SQ (bytes).

> 
> > Note before reviewing the rest of this patch, the fact it's a signed integer
> > bothers me; it's probably a mistake.
> 
> There is place in the code where this variable can used for signed calculations.

My point is these signed calculations shouldn't be signed in the first
place. A buffer size cannot be negative.

> 
> > You should standardize on unsigned values everywhere.
> 
> Why?
> Each field with the most appropriate type.

Because you used the wrong type everywhere else. The root cause seems to be
with:

 #define MLX4_MAX_WQE_SIZE 512

Which must either be cast when used or redefined like:

 #define MLX4_MAX_WQE_SIZE 512u

Or even:

 #define MLX4_MAX_WQE_SIZE UINT32_C(512)

<snip>
> > > a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c index
> > > b9cb2fc..0a8ef93 100644
> > > --- a/drivers/net/mlx4/mlx4_rxtx.c
> > > +++ b/drivers/net/mlx4/mlx4_rxtx.c
<snip>
> > > @@ -421,41 +403,27 @@ struct pv {
> > >  	return buf->pool;
> > >  }
> > >
> > > -static int
> > > +static volatile struct mlx4_wqe_ctrl_seg *
> > >  mlx4_tx_burst_segs(struct rte_mbuf *buf, struct txq *txq,
> > > -		   volatile struct mlx4_wqe_ctrl_seg **pctrl)
> > > +		   volatile struct mlx4_wqe_ctrl_seg *ctrl)
> > 
> > Can you use this opportunity to document this function?
> > 
> Sure, new patch for this?

You can make it part of this one if you prefer, no problem either way.
  

Patch

diff --git a/drivers/net/mlx4/mlx4_prm.h b/drivers/net/mlx4/mlx4_prm.h
index fcc7c12..2ca303a 100644
--- a/drivers/net/mlx4/mlx4_prm.h
+++ b/drivers/net/mlx4/mlx4_prm.h
@@ -54,22 +54,18 @@ 
 
 /* Typical TSO descriptor with 16 gather entries is 352 bytes. */
 #define MLX4_MAX_WQE_SIZE 512
-#define MLX4_MAX_WQE_TXBBS (MLX4_MAX_WQE_SIZE / MLX4_TXBB_SIZE)
+#define MLX4_SEG_SHIFT 4
 
 /* Send queue stamping/invalidating information. */
 #define MLX4_SQ_STAMP_STRIDE 64
 #define MLX4_SQ_STAMP_DWORDS (MLX4_SQ_STAMP_STRIDE / 4)
-#define MLX4_SQ_STAMP_SHIFT 31
+#define MLX4_SQ_OWNER_BIT 31
 #define MLX4_SQ_STAMP_VAL 0x7fffffff
 
 /* Work queue element (WQE) flags. */
-#define MLX4_BIT_WQE_OWN 0x80000000
 #define MLX4_WQE_CTRL_IIP_HDR_CSUM (1 << 28)
 #define MLX4_WQE_CTRL_IL4_HDR_CSUM (1 << 27)
 
-#define MLX4_SIZE_TO_TXBBS(size) \
-	(RTE_ALIGN((size), (MLX4_TXBB_SIZE)) >> (MLX4_TXBB_SHIFT))
-
 /* CQE checksum flags. */
 enum {
 	MLX4_CQE_L2_TUNNEL_IPV4 = (int)(1u << 25),
@@ -98,17 +94,15 @@  enum {
 struct mlx4_sq {
 	volatile uint8_t *buf; /**< SQ buffer. */
 	volatile uint8_t *eob; /**< End of SQ buffer */
-	uint32_t head; /**< SQ head counter in units of TXBBS. */
-	uint32_t tail; /**< SQ tail counter in units of TXBBS. */
-	uint32_t txbb_cnt; /**< Num of WQEBB in the Q (should be ^2). */
-	uint32_t txbb_cnt_mask; /**< txbbs_cnt mask (txbb_cnt is ^2). */
-	uint32_t headroom_txbbs; /**< Num of txbbs that should be kept free. */
+	uint32_t size; /**< SQ size includes headroom. */
+	int32_t remain_size; /**< Remain WQE size in SQ. */
+	/**< Default owner opcode with HW valid owner bit. */
+	uint32_t owner_opcode;
+	uint32_t stamp; /**< Stamp value with an invalid HW owner bit. */
 	volatile uint32_t *db; /**< Pointer to the doorbell. */
 	uint32_t doorbell_qpn; /**< qp number to write to the doorbell. */
 };
 
-#define mlx4_get_send_wqe(sq, n) ((sq)->buf + ((n) * (MLX4_TXBB_SIZE)))
-
 /* Completion queue events, numbers and masks. */
 #define MLX4_CQ_DB_GEQ_N_MASK 0x3
 #define MLX4_CQ_DOORBELL 0x20
diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
index b9cb2fc..0a8ef93 100644
--- a/drivers/net/mlx4/mlx4_rxtx.c
+++ b/drivers/net/mlx4/mlx4_rxtx.c
@@ -61,9 +61,6 @@ 
 #include "mlx4_rxtx.h"
 #include "mlx4_utils.h"
 
-#define WQE_ONE_DATA_SEG_SIZE \
-	(sizeof(struct mlx4_wqe_ctrl_seg) + sizeof(struct mlx4_wqe_data_seg))
-
 /**
  * Pointer-value pair structure used in tx_post_send for saving the first
  * DWORD (32 byte) of a TXBB.
@@ -268,52 +265,48 @@  struct pv {
  *
  * @param sq
  *   Pointer to the SQ structure.
- * @param index
- *   Index of the freed WQE.
- * @param num_txbbs
- *   Number of blocks to stamp.
- *   If < 0 the routine will use the size written in the WQ entry.
- * @param owner
- *   The value of the WQE owner bit to use in the stamp.
+ * @param wqe
+ *   Pointer of WQE to stamp.
  *
  * @return
- *   The number of Tx basic blocs (TXBB) the WQE contained.
+ *   WQE size.
  */
-static int
-mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, uint16_t index, uint8_t owner)
+static uint32_t
+mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, volatile uint32_t **wqe)
 {
-	uint32_t stamp = rte_cpu_to_be_32(MLX4_SQ_STAMP_VAL |
-					  (!!owner << MLX4_SQ_STAMP_SHIFT));
-	volatile uint8_t *wqe = mlx4_get_send_wqe(sq,
-						(index & sq->txbb_cnt_mask));
-	volatile uint32_t *ptr = (volatile uint32_t *)wqe;
-	int i;
-	int txbbs_size;
-	int num_txbbs;
-
+	uint32_t stamp = sq->stamp;
+	volatile uint32_t *next_txbb = *wqe;
 	/* Extract the size from the control segment of the WQE. */
-	num_txbbs = MLX4_SIZE_TO_TXBBS((((volatile struct mlx4_wqe_ctrl_seg *)
-					 wqe)->fence_size & 0x3f) << 4);
-	txbbs_size = num_txbbs * MLX4_TXBB_SIZE;
+	uint32_t size = RTE_ALIGN((uint32_t)
+				  ((((volatile struct mlx4_wqe_ctrl_seg *)
+				     next_txbb)->fence_size & 0x3f) << 4),
+				  MLX4_TXBB_SIZE);
+	uint32_t size_cd = size;
+
 	/* Optimize the common case when there is no wrap-around. */
-	if (wqe + txbbs_size <= sq->eob) {
+	if ((uintptr_t)next_txbb + size < (uintptr_t)sq->eob) {
 		/* Stamp the freed descriptor. */
-		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
-			*ptr = stamp;
-			ptr += MLX4_SQ_STAMP_DWORDS;
-		}
+		do {
+			*next_txbb = stamp;
+			next_txbb += MLX4_SQ_STAMP_DWORDS;
+			size_cd -= MLX4_TXBB_SIZE;
+		} while (size_cd);
 	} else {
 		/* Stamp the freed descriptor. */
-		for (i = 0; i < txbbs_size; i += MLX4_SQ_STAMP_STRIDE) {
-			*ptr = stamp;
-			ptr += MLX4_SQ_STAMP_DWORDS;
-			if ((volatile uint8_t *)ptr >= sq->eob) {
-				ptr = (volatile uint32_t *)sq->buf;
-				stamp ^= RTE_BE32(0x80000000);
+		do {
+			*next_txbb = stamp;
+			next_txbb += MLX4_SQ_STAMP_DWORDS;
+			if ((volatile uint8_t *)next_txbb >= sq->eob) {
+				next_txbb = (volatile uint32_t *)sq->buf;
+				/* Flip invalid stamping ownership. */
+				stamp ^= RTE_BE32(0x1 << MLX4_SQ_OWNER_BIT);
+				sq->stamp = stamp;
 			}
-		}
+			size_cd -= MLX4_TXBB_SIZE;
+		} while (size_cd);
 	}
-	return num_txbbs;
+	*wqe = next_txbb;
+	return size;
 }
 
 /**
@@ -326,24 +319,22 @@  struct pv {
  *
  * @param txq
  *   Pointer to Tx queue structure.
- *
- * @return
- *   0 on success, -1 on failure.
  */
-static int
+static void
 mlx4_txq_complete(struct txq *txq, const unsigned int elts_n,
 				  struct mlx4_sq *sq)
 {
-	unsigned int elts_comp = txq->elts_comp;
 	unsigned int elts_tail = txq->elts_tail;
-	unsigned int sq_tail = sq->tail;
 	struct mlx4_cq *cq = &txq->mcq;
 	volatile struct mlx4_cqe *cqe;
 	uint32_t cons_index = cq->cons_index;
-	uint16_t new_index;
-	uint16_t nr_txbbs = 0;
-	int pkts = 0;
-
+	volatile uint32_t *first_wqe;
+	volatile uint32_t *next_wqe = (volatile uint32_t *)
+			((&(*txq->elts)[elts_tail])->wqe);
+	volatile uint32_t *last_wqe;
+	uint16_t mask = (((uintptr_t)sq->eob - (uintptr_t)sq->buf) >>
+			 MLX4_TXBB_SHIFT) - 1;
+	uint32_t pkts = 0;
 	/*
 	 * Traverse over all CQ entries reported and handle each WQ entry
 	 * reported by them.
@@ -353,11 +344,11 @@  struct pv {
 		if (unlikely(!!(cqe->owner_sr_opcode & MLX4_CQE_OWNER_MASK) ^
 		    !!(cons_index & cq->cqe_cnt)))
 			break;
+#ifndef NDEBUG
 		/*
 		 * Make sure we read the CQE after we read the ownership bit.
 		 */
 		rte_io_rmb();
-#ifndef NDEBUG
 		if (unlikely((cqe->owner_sr_opcode & MLX4_CQE_OPCODE_MASK) ==
 			     MLX4_CQE_OPCODE_ERROR)) {
 			volatile struct mlx4_err_cqe *cqe_err =
@@ -366,41 +357,32 @@  struct pv {
 			      " syndrome: 0x%x\n",
 			      (void *)txq, cqe_err->vendor_err,
 			      cqe_err->syndrome);
+			break;
 		}
 #endif /* NDEBUG */
-		/* Get WQE index reported in the CQE. */
-		new_index =
-			rte_be_to_cpu_16(cqe->wqe_index) & sq->txbb_cnt_mask;
+		/* Get WQE address buy index from the CQE. */
+		last_wqe = (volatile uint32_t *)((uintptr_t)sq->buf +
+			((rte_be_to_cpu_16(cqe->wqe_index) & mask) <<
+			 MLX4_TXBB_SHIFT));
 		do {
 			/* Free next descriptor. */
-			sq_tail += nr_txbbs;
-			nr_txbbs =
-				mlx4_txq_stamp_freed_wqe(sq,
-				     sq_tail & sq->txbb_cnt_mask,
-				     !!(sq_tail & sq->txbb_cnt));
+			first_wqe = next_wqe;
+			sq->remain_size +=
+				mlx4_txq_stamp_freed_wqe(sq, &next_wqe);
 			pkts++;
-		} while ((sq_tail & sq->txbb_cnt_mask) != new_index);
+		} while (first_wqe != last_wqe);
 		cons_index++;
 	} while (1);
 	if (unlikely(pkts == 0))
-		return 0;
-	/* Update CQ. */
+		return;
+	/* Update CQ consumer index. */
 	cq->cons_index = cons_index;
-	*cq->set_ci_db = rte_cpu_to_be_32(cq->cons_index & MLX4_CQ_DB_CI_MASK);
-	sq->tail = sq_tail + nr_txbbs;
-	/* Update the list of packets posted for transmission. */
-	elts_comp -= pkts;
-	assert(elts_comp <= txq->elts_comp);
-	/*
-	 * Assume completion status is successful as nothing can be done about
-	 * it anyway.
-	 */
+	*cq->set_ci_db = rte_cpu_to_be_32(cons_index & MLX4_CQ_DB_CI_MASK);
+	txq->elts_comp -= pkts;
 	elts_tail += pkts;
 	if (elts_tail >= elts_n)
 		elts_tail -= elts_n;
 	txq->elts_tail = elts_tail;
-	txq->elts_comp = elts_comp;
-	return 0;
 }
 
 /**
@@ -421,41 +403,27 @@  struct pv {
 	return buf->pool;
 }
 
-static int
+static volatile struct mlx4_wqe_ctrl_seg *
 mlx4_tx_burst_segs(struct rte_mbuf *buf, struct txq *txq,
-		   volatile struct mlx4_wqe_ctrl_seg **pctrl)
+		   volatile struct mlx4_wqe_ctrl_seg *ctrl)
 {
-	int wqe_real_size;
-	int nr_txbbs;
 	struct pv *pv = (struct pv *)txq->bounce_buf;
 	struct mlx4_sq *sq = &txq->msq;
-	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
-	volatile struct mlx4_wqe_ctrl_seg *ctrl;
-	volatile struct mlx4_wqe_data_seg *dseg;
 	struct rte_mbuf *sbuf = buf;
 	uint32_t lkey;
 	int pv_counter = 0;
 	int nb_segs = buf->nb_segs;
+	int32_t wqe_size;
+	volatile struct mlx4_wqe_data_seg *dseg =
+		(volatile struct mlx4_wqe_data_seg *)(ctrl + 1);
 
-	/* Calculate the needed work queue entry size for this packet. */
-	wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) +
-		nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
-	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
-	/*
-	 * Check that there is room for this WQE in the send queue and that
-	 * the WQE size is legal.
-	 */
-	if (((sq->head - sq->tail) + nr_txbbs +
-				sq->headroom_txbbs) >= sq->txbb_cnt ||
-			nr_txbbs > MLX4_MAX_WQE_TXBBS) {
-		return -1;
-	}
-	/* Get the control and data entries of the WQE. */
-	ctrl = (volatile struct mlx4_wqe_ctrl_seg *)
-			mlx4_get_send_wqe(sq, head_idx);
-	dseg = (volatile struct mlx4_wqe_data_seg *)
-			((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg));
-	*pctrl = ctrl;
+	ctrl->fence_size = 1 + nb_segs;
+	wqe_size = RTE_ALIGN((int32_t)(ctrl->fence_size << MLX4_SEG_SHIFT),
+			     MLX4_TXBB_SIZE);
+	/* Validate WQE size and WQE space in the send queue. */
+	if (sq->remain_size < wqe_size ||
+	    wqe_size > MLX4_MAX_WQE_SIZE)
+		return NULL;
 	/*
 	 * Fill the data segments with buffer information.
 	 * First WQE TXBB head segment is always control segment,
@@ -469,7 +437,7 @@  struct pv {
 	if (unlikely(lkey == (uint32_t)-1)) {
 		DEBUG("%p: unable to get MP <-> MR association",
 		      (void *)txq);
-		return -1;
+		return NULL;
 	}
 	/* Handle WQE wraparound. */
 	if (dseg >=
@@ -501,7 +469,7 @@  struct pv {
 		if (unlikely(lkey == (uint32_t)-1)) {
 			DEBUG("%p: unable to get MP <-> MR association",
 			      (void *)txq);
-			return -1;
+			return NULL;
 		}
 		mlx4_fill_tx_data_seg(dseg, lkey,
 				      rte_pktmbuf_mtod(sbuf, uintptr_t),
@@ -517,7 +485,7 @@  struct pv {
 		if (unlikely(lkey == (uint32_t)-1)) {
 			DEBUG("%p: unable to get MP <-> MR association",
 			      (void *)txq);
-			return -1;
+			return NULL;
 		}
 		mlx4_fill_tx_data_seg(dseg, lkey,
 				      rte_pktmbuf_mtod(sbuf, uintptr_t),
@@ -533,7 +501,7 @@  struct pv {
 		if (unlikely(lkey == (uint32_t)-1)) {
 			DEBUG("%p: unable to get MP <-> MR association",
 			      (void *)txq);
-			return -1;
+			return NULL;
 		}
 		mlx4_fill_tx_data_seg(dseg, lkey,
 				      rte_pktmbuf_mtod(sbuf, uintptr_t),
@@ -557,9 +525,10 @@  struct pv {
 		for (--pv_counter; pv_counter  >= 0; pv_counter--)
 			pv[pv_counter].dseg->byte_count = pv[pv_counter].val;
 	}
-	/* Fill the control parameters for this packet. */
-	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
-	return nr_txbbs;
+	sq->remain_size -= wqe_size;
+	/* Align next WQE address to the next TXBB. */
+	return (volatile struct mlx4_wqe_ctrl_seg *)
+		((volatile uint8_t *)ctrl + wqe_size);
 }
 
 /**
@@ -585,7 +554,8 @@  struct pv {
 	unsigned int i;
 	unsigned int max;
 	struct mlx4_sq *sq = &txq->msq;
-	int nr_txbbs;
+	volatile struct mlx4_wqe_ctrl_seg *ctrl;
+	struct txq_elt *elt;
 
 	assert(txq->elts_comp_cd != 0);
 	if (likely(txq->elts_comp != 0))
@@ -599,29 +569,30 @@  struct pv {
 	--max;
 	if (max > pkts_n)
 		max = pkts_n;
+	elt = &(*txq->elts)[elts_head];
+	/* Each element saves its appropriate work queue. */
+	ctrl = elt->wqe;
 	for (i = 0; (i != max); ++i) {
 		struct rte_mbuf *buf = pkts[i];
 		unsigned int elts_head_next =
 			(((elts_head + 1) == elts_n) ? 0 : elts_head + 1);
 		struct txq_elt *elt_next = &(*txq->elts)[elts_head_next];
-		struct txq_elt *elt = &(*txq->elts)[elts_head];
-		uint32_t owner_opcode = MLX4_OPCODE_SEND;
-		volatile struct mlx4_wqe_ctrl_seg *ctrl;
-		volatile struct mlx4_wqe_data_seg *dseg;
+		uint32_t owner_opcode = sq->owner_opcode;
+		volatile struct mlx4_wqe_data_seg *dseg =
+				(volatile struct mlx4_wqe_data_seg *)(ctrl + 1);
+		volatile struct mlx4_wqe_ctrl_seg *ctrl_next;
 		union {
 			uint32_t flags;
 			uint16_t flags16[2];
 		} srcrb;
-		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
 		uint32_t lkey;
 
 		/* Clean up old buffer. */
 		if (likely(elt->buf != NULL)) {
 			struct rte_mbuf *tmp = elt->buf;
-
 #ifndef NDEBUG
 			/* Poisoning. */
-			memset(elt, 0x66, sizeof(*elt));
+			elt->buf = (struct rte_mbuf *)0x6666666666666666;
 #endif
 			/* Faster than rte_pktmbuf_free(). */
 			do {
@@ -633,23 +604,11 @@  struct pv {
 		}
 		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
 		if (buf->nb_segs == 1) {
-			/*
-			 * Check that there is room for this WQE in the send
-			 * queue and that the WQE size is legal
-			 */
-			if (((sq->head - sq->tail) + 1 + sq->headroom_txbbs) >=
-			     sq->txbb_cnt || 1 > MLX4_MAX_WQE_TXBBS) {
+			/* Validate WQE space in the send queue. */
+			if (sq->remain_size < MLX4_TXBB_SIZE) {
 				elt->buf = NULL;
 				break;
 			}
-			/* Get the control and data entries of the WQE. */
-			ctrl = (volatile struct mlx4_wqe_ctrl_seg *)
-					mlx4_get_send_wqe(sq, head_idx);
-			dseg = (volatile struct mlx4_wqe_data_seg *)
-					((uintptr_t)ctrl +
-					sizeof(struct mlx4_wqe_ctrl_seg));
-
-			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
 			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
 			if (unlikely(lkey == (uint32_t)-1)) {
 				/* MR does not exist. */
@@ -658,23 +617,33 @@  struct pv {
 				elt->buf = NULL;
 				break;
 			}
-			mlx4_fill_tx_data_seg(dseg, lkey,
+			mlx4_fill_tx_data_seg(dseg++, lkey,
 					      rte_pktmbuf_mtod(buf, uintptr_t),
 					      rte_cpu_to_be_32(buf->data_len));
-			nr_txbbs = 1;
+			/* Set WQE size in 16-byte units. */
+			ctrl->fence_size = 0x2;
+			sq->remain_size -= MLX4_TXBB_SIZE;
+			/* Align next WQE address to the next TXBB. */
+			ctrl_next = ctrl + 0x4;
 		} else {
-			nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl);
-			if (nr_txbbs < 0) {
+			ctrl_next = mlx4_tx_burst_segs(buf, txq, ctrl);
+			if (!ctrl_next) {
 				elt->buf = NULL;
 				break;
 			}
 		}
+		/* Hold SQ ring wrap around. */
+		if ((volatile uint8_t *)ctrl_next >= sq->eob) {
+			ctrl_next = (volatile struct mlx4_wqe_ctrl_seg *)
+				((volatile uint8_t *)ctrl_next - sq->size);
+			/* Flip HW valid ownership. */
+			sq->owner_opcode ^= 0x1 << MLX4_SQ_OWNER_BIT;
+		}
 		/*
 		 * For raw Ethernet, the SOLICIT flag is used to indicate
 		 * that no ICRC should be calculated.
 		 */
-		txq->elts_comp_cd -= nr_txbbs;
-		if (unlikely(txq->elts_comp_cd <= 0)) {
+		if (--txq->elts_comp_cd == 0) {
 			txq->elts_comp_cd = txq->elts_comp_cd_init;
 			srcrb.flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT |
 					       MLX4_WQE_CTRL_CQ_UPDATE);
@@ -720,13 +689,13 @@  struct pv {
 		 * executing as soon as we do).
 		 */
 		rte_io_wmb();
-		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
-					      ((sq->head & sq->txbb_cnt) ?
-						       MLX4_BIT_WQE_OWN : 0));
-		sq->head += nr_txbbs;
+		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode);
 		elt->buf = buf;
 		bytes_sent += buf->pkt_len;
 		elts_head = elts_head_next;
+		elt_next->wqe = ctrl_next;
+		ctrl = ctrl_next;
+		elt = elt_next;
 	}
 	/* Take a shortcut if nothing must be sent. */
 	if (unlikely(i == 0))
diff --git a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h
index 8207232..c092afa 100644
--- a/drivers/net/mlx4/mlx4_rxtx.h
+++ b/drivers/net/mlx4/mlx4_rxtx.h
@@ -105,6 +105,7 @@  struct mlx4_rss {
 /** Tx element. */
 struct txq_elt {
 	struct rte_mbuf *buf; /**< Buffer. */
+	volatile struct mlx4_wqe_ctrl_seg *wqe; /**< SQ WQE. */
 };
 
 /** Rx queue counters. */
diff --git a/drivers/net/mlx4/mlx4_txq.c b/drivers/net/mlx4/mlx4_txq.c
index 7882a4d..4c7b62a 100644
--- a/drivers/net/mlx4/mlx4_txq.c
+++ b/drivers/net/mlx4/mlx4_txq.c
@@ -84,6 +84,7 @@ 
 		assert(elt->buf != NULL);
 		rte_pktmbuf_free(elt->buf);
 		elt->buf = NULL;
+		elt->wqe = NULL;
 		if (++elts_tail == RTE_DIM(*elts))
 			elts_tail = 0;
 	}
@@ -163,20 +164,19 @@  struct txq_mp2mr_mbuf_check_data {
 	struct mlx4_cq *cq = &txq->mcq;
 	struct mlx4dv_qp *dqp = mlxdv->qp.out;
 	struct mlx4dv_cq *dcq = mlxdv->cq.out;
-	uint32_t sq_size = (uint32_t)dqp->rq.offset - (uint32_t)dqp->sq.offset;
 
-	sq->buf = (uint8_t *)dqp->buf.buf + dqp->sq.offset;
 	/* Total length, including headroom and spare WQEs. */
-	sq->eob = sq->buf + sq_size;
-	sq->head = 0;
-	sq->tail = 0;
-	sq->txbb_cnt =
-		(dqp->sq.wqe_cnt << dqp->sq.wqe_shift) >> MLX4_TXBB_SHIFT;
-	sq->txbb_cnt_mask = sq->txbb_cnt - 1;
+	sq->size = (uint32_t)dqp->rq.offset - (uint32_t)dqp->sq.offset;
+	sq->buf = (uint8_t *)dqp->buf.buf + dqp->sq.offset;
+	sq->eob = sq->buf + sq->size;
+	uint32_t headroom_size = 2048 + (1 << dqp->sq.wqe_shift);
+	/* Continuous headroom size bytes must always stay freed. */
+	sq->remain_size = sq->size - headroom_size;
+	sq->owner_opcode = MLX4_OPCODE_SEND | (0 << MLX4_SQ_OWNER_BIT);
+	sq->stamp = rte_cpu_to_be_32(MLX4_SQ_STAMP_VAL |
+				     (0 << MLX4_SQ_OWNER_BIT));
 	sq->db = dqp->sdb;
 	sq->doorbell_qpn = dqp->doorbell_qpn;
-	sq->headroom_txbbs =
-		(2048 + (1 << dqp->sq.wqe_shift)) >> MLX4_TXBB_SHIFT;
 	cq->buf = dcq->buf.buf;
 	cq->cqe_cnt = dcq->cqe_cnt;
 	cq->set_ci_db = dcq->set_ci_db;
@@ -362,6 +362,9 @@  struct txq_mp2mr_mbuf_check_data {
 		goto error;
 	}
 	mlx4_txq_fill_dv_obj_info(txq, &mlxdv);
+	/* Save first wqe pointer in the first element. */
+	(&(*txq->elts)[0])->wqe =
+		(volatile struct mlx4_wqe_ctrl_seg *)txq->msq.buf;
 	/* Pre-register known mempools. */
 	rte_mempool_walk(mlx4_txq_mp2mr_iter, txq);
 	DEBUG("%p: adding Tx queue %p to list", (void *)dev, (void *)txq);