[dpdk-dev,v3,5/7] net/mlx4: separate Tx segment cases

Message ID 1509358049-18854-6-git-send-email-matan@mellanox.com (mailing list archive)
State Superseded, archived
Delegated to: Ferruh Yigit
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Matan Azrad Oct. 30, 2017, 10:07 a.m. UTC
  Since single segment packets shouldn't use additional memory to
save segments byte count we can prevent additional memory
unnecessary usage in this case; Prevent loop management.

Call a dedicated function for handling multi segments case.

Signed-off-by: Matan Azrad <matan@mellanox.com>
Signed-off-by: Ophir Munk <ophirmu@mellanox.com>
---
 drivers/net/mlx4/mlx4_rxtx.c | 247 +++++++++++++++++++++++++++----------------
 1 file changed, 158 insertions(+), 89 deletions(-)
  

Comments

Adrien Mazarguil Oct. 30, 2017, 2:23 p.m. UTC | #1
On Mon, Oct 30, 2017 at 10:07:27AM +0000, Matan Azrad wrote:
> Since single segment packets shouldn't use additional memory to
> save segments byte count we can prevent additional memory
> unnecessary usage in this case; Prevent loop management.
> 

Sorry for asking but I really don't understand the above, can you
reformulate the problem addressed by this patch?

> Call a dedicated function for handling multi segments case.

This sentence is clearer, I'll base my review on what this patch does, not
the reasons behind it.

> Signed-off-by: Matan Azrad <matan@mellanox.com>
> Signed-off-by: Ophir Munk <ophirmu@mellanox.com>
> ---
>  drivers/net/mlx4/mlx4_rxtx.c | 247 +++++++++++++++++++++++++++----------------
>  1 file changed, 158 insertions(+), 89 deletions(-)
> 
> diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
> index 8ce70d6..8ea8851 100644
> --- a/drivers/net/mlx4/mlx4_rxtx.c
> +++ b/drivers/net/mlx4/mlx4_rxtx.c
> @@ -62,6 +62,9 @@
>  #include "mlx4_rxtx.h"
>  #include "mlx4_utils.h"
>  
> +#define WQE_ONE_DATA_SEG_SIZE \
> +	(sizeof(struct mlx4_wqe_ctrl_seg) + sizeof(struct mlx4_wqe_data_seg))
> +
>  /**
>   * Pointer-value pair structure used in tx_post_send for saving the first
>   * DWORD (32 byte) of a TXBB.
> @@ -140,22 +143,19 @@ struct pv {
>   * @return
>   *   0 on success, -1 on failure.
>   */
> -static int
> -mlx4_txq_complete(struct txq *txq)
> +static inline int

While likely harmless, I think the addition of this inline keyword is not
related to this patch.

> +mlx4_txq_complete(struct txq *txq, const unsigned int elts_n,
> +				struct mlx4_sq *sq)

Looks like an indentation issue, you should align it to the contents of the
opening "(" to match the coding style of this file.

>  {
>  	unsigned int elts_comp = txq->elts_comp;
>  	unsigned int elts_tail = txq->elts_tail;
> -	const unsigned int elts_n = txq->elts_n;
>  	struct mlx4_cq *cq = &txq->mcq;
> -	struct mlx4_sq *sq = &txq->msq;
>  	struct mlx4_cqe *cqe;
>  	uint32_t cons_index = cq->cons_index;
>  	uint16_t new_index;
>  	uint16_t nr_txbbs = 0;
>  	int pkts = 0;
>  
> -	if (unlikely(elts_comp == 0))
> -		return 0;
>  	/*
>  	 * Traverse over all CQ entries reported and handle each WQ entry
>  	 * reported by them.
> @@ -238,6 +238,122 @@ struct pv {
>  	return buf->pool;
>  }
>  
> +static int handle_multi_segs(struct rte_mbuf *buf,
> +			    struct txq *txq,
> +			    struct mlx4_wqe_ctrl_seg **pctrl)

How about naming this function in a way that follows the mlx4_something()
convention?

Here's a suggestion based on how this function remains tied to
mlx4_tx_burst():

 mlx4_tx_burst_seg()

> +{
> +	int wqe_real_size;
> +	int nr_txbbs;
> +	struct pv *pv = (struct pv *)txq->bounce_buf;
> +	struct mlx4_sq *sq = &txq->msq;
> +	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> +	struct mlx4_wqe_ctrl_seg *ctrl;
> +	struct mlx4_wqe_data_seg *dseg;
> +	uint32_t lkey;
> +	uintptr_t addr;
> +	uint32_t byte_count;
> +	int pv_counter = 0;
> +
> +	/* Calculate the needed work queue entry size for this packet. */
> +	wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
> +		buf->nb_segs * sizeof(struct mlx4_wqe_data_seg);
> +	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> +	/*
> +	 * Check that there is room for this WQE in the send queue and that
> +	 * the WQE size is legal.
> +	 */
> +	if (((sq->head - sq->tail) + nr_txbbs +
> +				sq->headroom_txbbs) >= sq->txbb_cnt ||
> +			nr_txbbs > MLX4_MAX_WQE_TXBBS) {
> +		return -1;
> +	}
> +

Extra empty line.

> +	/* Get the control and data entries of the WQE. */
> +	ctrl = (struct mlx4_wqe_ctrl_seg *)mlx4_get_send_wqe(sq, head_idx);
> +	dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
> +			sizeof(struct mlx4_wqe_ctrl_seg));
> +	*pctrl = ctrl;
> +	/* Fill the data segments with buffer information. */
> +	struct rte_mbuf *sbuf;

I'm usually fine with mixing declarations and code when there's a good
reason, however in this case there's no point. sbuf could have been defined
with the rest at the beginning of the function.

> +

Extra empty line here as well.

> +	for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> +		addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> +		rte_prefetch0((volatile void *)addr);
> +		/* Handle WQE wraparound. */
> +		if (dseg >= (struct mlx4_wqe_data_seg *)sq->eob)
> +			dseg = (struct mlx4_wqe_data_seg *)sq->buf;
> +		dseg->addr = rte_cpu_to_be_64(addr);
> +		/* Memory region key (big endian) for this memory pool. */
> +		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> +		dseg->lkey = rte_cpu_to_be_32(lkey);
> +#ifndef NDEBUG
> +		/* Calculate the needed work queue entry size for this packet */
> +		if (unlikely(dseg->lkey == rte_cpu_to_be_32((uint32_t)-1))) {
> +			/* MR does not exist. */
> +			DEBUG("%p: unable to get MP <-> MR association",
> +					(void *)txq);
> +			/*
> +			 * Restamp entry in case of failure.
> +			 * Make sure that size is written correctly
> +			 * Note that we give ownership to the SW, not the HW.
> +			 */
> +			wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
> +				buf->nb_segs * sizeof(struct mlx4_wqe_data_seg);
> +			ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> +			mlx4_txq_stamp_freed_wqe(sq, head_idx,
> +					(sq->head & sq->txbb_cnt) ? 0 : 1);
> +			return -1;
> +		}
> +#endif /* NDEBUG */
> +		if (likely(sbuf->data_len)) {
> +			byte_count = rte_cpu_to_be_32(sbuf->data_len);
> +		} else {
> +			/*
> +			 * Zero length segment is treated as inline segment
> +			 * with zero data.
> +			 */
> +			byte_count = RTE_BE32(0x80000000);
> +		}
> +		/*
> +		 * If the data segment is not at the beginning of a
> +		 * Tx basic block (TXBB) then write the byte count,
> +		 * else postpone the writing to just before updating the
> +		 * control segment.
> +		 */
> +		if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
> +			/*
> +			 * Need a barrier here before writing the byte_count
> +			 * fields to make sure that all the data is visible
> +			 * before the byte_count field is set.
> +			 * Otherwise, if the segment begins a new cacheline,
> +			 * the HCA prefetcher could grab the 64-byte chunk and
> +			 * get a valid (!= 0xffffffff) byte count but stale
> +			 * data, and end up sending the wrong data.
> +			 */
> +			rte_io_wmb();
> +			dseg->byte_count = byte_count;
> +		} else {
> +			/*
> +			 * This data segment starts at the beginning of a new
> +			 * TXBB, so we need to postpone its byte_count writing
> +			 * for later.
> +			 */
> +			pv[pv_counter].dseg = dseg;
> +			pv[pv_counter++].val = byte_count;
> +		}
> +	}
> +	/* Write the first DWORD of each TXBB save earlier. */
> +	if (pv_counter) {
> +		/* Need a barrier here before writing the byte_count. */
> +		rte_io_wmb();
> +		for (--pv_counter; pv_counter  >= 0; pv_counter--)
> +			pv[pv_counter].dseg->byte_count = pv[pv_counter].val;
> +	}
> +	/* Fill the control parameters for this packet. */
> +	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> +

Extra empty line.

> +	return nr_txbbs;
> +}
>  /**
>   * DPDK callback for Tx.
>   *
> @@ -261,10 +377,11 @@ struct pv {
>  	unsigned int i;
>  	unsigned int max;
>  	struct mlx4_sq *sq = &txq->msq;
> -	struct pv *pv = (struct pv *)txq->bounce_buf;
> +	int nr_txbbs;
>  
>  	assert(txq->elts_comp_cd != 0);
> -	mlx4_txq_complete(txq);
> +	if (likely(txq->elts_comp != 0))
> +		mlx4_txq_complete(txq, elts_n, sq);
>  	max = (elts_n - (elts_head - txq->elts_tail));
>  	if (max > elts_n)
>  		max -= elts_n;
> @@ -283,7 +400,6 @@ struct pv {
>  		uint32_t owner_opcode = MLX4_OPCODE_SEND;
>  		struct mlx4_wqe_ctrl_seg *ctrl;
>  		struct mlx4_wqe_data_seg *dseg;
> -		struct rte_mbuf *sbuf;
>  		union {
>  			uint32_t flags;
>  			uint16_t flags16[2];
> @@ -291,10 +407,6 @@ struct pv {
>  		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
>  		uint32_t lkey;
>  		uintptr_t addr;
> -		uint32_t byte_count;
> -		int wqe_real_size;
> -		int nr_txbbs;
> -		int pv_counter = 0;
>  
>  		/* Clean up old buffer. */
>  		if (likely(elt->buf != NULL)) {
> @@ -313,40 +425,29 @@ struct pv {
>  			} while (tmp != NULL);
>  		}
>  		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
> -
> -		/*
> -		 * Calculate the needed work queue entry size
> -		 * for this packet.
> -		 */
> -		wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
> -				buf->nb_segs * sizeof(struct mlx4_wqe_data_seg);
> -		nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> -		/*
> -		 * Check that there is room for this WQE in the send
> -		 * queue and that the WQE size is legal.
> -		 */
> -		if (((sq->head - sq->tail) + nr_txbbs +
> -		     sq->headroom_txbbs) >= sq->txbb_cnt ||
> -		    nr_txbbs > MLX4_MAX_WQE_TXBBS) {
> -			elt->buf = NULL;
> -			break;
> -		}
> -		/* Get the control and data entries of the WQE. */
> -		ctrl = (struct mlx4_wqe_ctrl_seg *)
> -				mlx4_get_send_wqe(sq, head_idx);
> -		dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
> -				sizeof(struct mlx4_wqe_ctrl_seg));
> -		/* Fill the data segments with buffer information. */
> -		for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> -			addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> +		if (buf->nb_segs == 1) {
> +			/*
> +			 * Check that there is room for this WQE in the send
> +			 * queue and that the WQE size is legal
> +			 */
> +			if (((sq->head - sq->tail) + 1 + sq->headroom_txbbs) >=
> +			     sq->txbb_cnt || 1 > MLX4_MAX_WQE_TXBBS) {
> +				elt->buf = NULL;
> +				break;
> +			}
> +			/* Get the control and data entries of the WQE. */
> +			ctrl = (struct mlx4_wqe_ctrl_seg *)
> +					mlx4_get_send_wqe(sq, head_idx);
> +			dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
> +					sizeof(struct mlx4_wqe_ctrl_seg));
> +			addr = rte_pktmbuf_mtod(buf, uintptr_t);
>  			rte_prefetch0((volatile void *)addr);
>  			/* Handle WQE wraparound. */
> -			if (unlikely(dseg >=
> -			    (struct mlx4_wqe_data_seg *)sq->eob))
> +			if (dseg >= (struct mlx4_wqe_data_seg *)sq->eob)

Ideally this change should have been on its own in a fix commit.

>  				dseg = (struct mlx4_wqe_data_seg *)sq->buf;
>  			dseg->addr = rte_cpu_to_be_64(addr);
>  			/* Memory region key (big endian). */
> -			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> +			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
>  			dseg->lkey = rte_cpu_to_be_32(lkey);
>  #ifndef NDEBUG
>  			if (unlikely(dseg->lkey ==
> @@ -360,61 +461,28 @@ struct pv {
>  				 * Note that we give ownership to the SW,
>  				 * not the HW.
>  				 */
> -				ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> +				ctrl->fence_size =
> +					(WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
>  				mlx4_txq_stamp_freed_wqe(sq, head_idx,
>  					     (sq->head & sq->txbb_cnt) ? 0 : 1);
>  				elt->buf = NULL;
>  				break;
>  			}
>  #endif /* NDEBUG */
> -			if (likely(sbuf->data_len)) {
> -				byte_count = rte_cpu_to_be_32(sbuf->data_len);
> -			} else {
> -				/*
> -				 * Zero length segment is treated as inline
> -				 * segment with zero data.
> -				 */
> -				byte_count = RTE_BE32(0x80000000);
> -			}
> -			/*
> -			 * If the data segment is not at the beginning
> -			 * of a Tx basic block (TXBB) then write the
> -			 * byte count, else postpone the writing to
> -			 * just before updating the control segment.
> -			 */
> -			if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
> -				/*
> -				 * Need a barrier here before writing the
> -				 * byte_count fields to make sure that all the
> -				 * data is visible before the byte_count field
> -				 * is set. otherwise, if the segment begins a
> -				 * new cacheline, the HCA prefetcher could grab
> -				 * the 64-byte chunk and get a valid
> -				 * (!= 0xffffffff) byte count but stale data,
> -				 * and end up sending the wrong data.
> -				 */
> -				rte_io_wmb();
> -				dseg->byte_count = byte_count;
> -			} else {
> -				/*
> -				 * This data segment starts at the beginning of
> -				 * a new TXBB, so we need to postpone its
> -				 * byte_count writing for later.
> -				 */
> -				pv[pv_counter].dseg = dseg;
> -				pv[pv_counter++].val = byte_count;
> -			}
> -		}
> -		/* Write the first DWORD of each TXBB save earlier. */
> -		if (pv_counter) {
> -			/* Need a barrier before writing the byte_count. */
> +			/* Need a barrier here before byte count store. */
>  			rte_io_wmb();
> -			for (--pv_counter; pv_counter  >= 0; pv_counter--)
> -				pv[pv_counter].dseg->byte_count =
> -						pv[pv_counter].val;
> +			dseg->byte_count = rte_cpu_to_be_32(buf->data_len);
> +

Extra empty line.

> +			/* Fill the control parameters for this packet. */
> +			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
> +			nr_txbbs = 1;
> +		} else {
> +			nr_txbbs = handle_multi_segs(buf, txq, &ctrl);
> +			if (nr_txbbs < 0) {
> +				elt->buf = NULL;
> +				break;
> +			}
>  		}
> -		/* Fill the control parameters for this packet. */
> -		ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
>  		/*
>  		 * For raw Ethernet, the SOLICIT flag is used to indicate
>  		 * that no ICRC should be calculated.
> @@ -469,6 +537,7 @@ struct pv {
>  		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
>  					      ((sq->head & sq->txbb_cnt) ?
>  						       MLX4_BIT_WQE_OWN : 0));
> +

Extra empty line.

>  		sq->head += nr_txbbs;
>  		elt->buf = buf;
>  		bytes_sent += buf->pkt_len;
> -- 
> 1.8.3.1
>
  
Matan Azrad Oct. 30, 2017, 6:23 p.m. UTC | #2
Hi Adrien

> -----Original Message-----
> From: Adrien Mazarguil [mailto:adrien.mazarguil@6wind.com]
> Sent: Monday, October 30, 2017 4:24 PM
> To: Matan Azrad <matan@mellanox.com>
> Cc: dev@dpdk.org; Ophir Munk <ophirmu@mellanox.com>
> Subject: Re: [PATCH v3 5/7] net/mlx4: separate Tx segment cases
> 
> On Mon, Oct 30, 2017 at 10:07:27AM +0000, Matan Azrad wrote:
> > Since single segment packets shouldn't use additional memory to save
> > segments byte count we can prevent additional memory unnecessary
> usage
> > in this case; Prevent loop management.
> >
> 
> Sorry for asking but I really don't understand the above, can you reformulate
> the problem addressed by this patch?
> 
What's about next?
Optimize single segment case by processing it in different code which prevents checks and calculations relevant only to multi segment case. 

> > Call a dedicated function for handling multi segments case.
> 
> This sentence is clearer, I'll base my review on what this patch does, not the
> reasons behind it.
> 
> > Signed-off-by: Matan Azrad <matan@mellanox.com>
> > Signed-off-by: Ophir Munk <ophirmu@mellanox.com>
> > ---
> >  drivers/net/mlx4/mlx4_rxtx.c | 247
> > +++++++++++++++++++++++++++----------------
> >  1 file changed, 158 insertions(+), 89 deletions(-)
> >
> > diff --git a/drivers/net/mlx4/mlx4_rxtx.c
> > b/drivers/net/mlx4/mlx4_rxtx.c index 8ce70d6..8ea8851 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.c
> > +++ b/drivers/net/mlx4/mlx4_rxtx.c
> > @@ -62,6 +62,9 @@
> >  #include "mlx4_rxtx.h"
> >  #include "mlx4_utils.h"
> >
> > +#define WQE_ONE_DATA_SEG_SIZE \
> > +	(sizeof(struct mlx4_wqe_ctrl_seg) + sizeof(struct
> > +mlx4_wqe_data_seg))
> > +
> >  /**
> >   * Pointer-value pair structure used in tx_post_send for saving the first
> >   * DWORD (32 byte) of a TXBB.
> > @@ -140,22 +143,19 @@ struct pv {
> >   * @return
> >   *   0 on success, -1 on failure.
> >   */
> > -static int
> > -mlx4_txq_complete(struct txq *txq)
> > +static inline int
> 
> While likely harmless, I think the addition of this inline keyword is not related
> to this patch.
> 
Yes, you right, will be fixed in next version.

> > +mlx4_txq_complete(struct txq *txq, const unsigned int elts_n,
> > +				struct mlx4_sq *sq)
> 
> Looks like an indentation issue, you should align it to the contents of the
> opening "(" to match the coding style of this file.
> 
OK.
> >  {
> >  	unsigned int elts_comp = txq->elts_comp;
> >  	unsigned int elts_tail = txq->elts_tail;
> > -	const unsigned int elts_n = txq->elts_n;
> >  	struct mlx4_cq *cq = &txq->mcq;
> > -	struct mlx4_sq *sq = &txq->msq;
> >  	struct mlx4_cqe *cqe;
> >  	uint32_t cons_index = cq->cons_index;
> >  	uint16_t new_index;
> >  	uint16_t nr_txbbs = 0;
> >  	int pkts = 0;
> >
> > -	if (unlikely(elts_comp == 0))
> > -		return 0;
> >  	/*
> >  	 * Traverse over all CQ entries reported and handle each WQ entry
> >  	 * reported by them.
> > @@ -238,6 +238,122 @@ struct pv {
> >  	return buf->pool;
> >  }
> >
> > +static int handle_multi_segs(struct rte_mbuf *buf,
> > +			    struct txq *txq,
> > +			    struct mlx4_wqe_ctrl_seg **pctrl)
> 
> How about naming this function in a way that follows the mlx4_something()
> convention?
> 
> Here's a suggestion based on how this function remains tied to
> mlx4_tx_burst():
> 
>  mlx4_tx_burst_seg()
> 
Good, thanks!

> > +{
> > +	int wqe_real_size;
> > +	int nr_txbbs;
> > +	struct pv *pv = (struct pv *)txq->bounce_buf;
> > +	struct mlx4_sq *sq = &txq->msq;
> > +	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> > +	struct mlx4_wqe_ctrl_seg *ctrl;
> > +	struct mlx4_wqe_data_seg *dseg;
> > +	uint32_t lkey;
> > +	uintptr_t addr;
> > +	uint32_t byte_count;
> > +	int pv_counter = 0;
> > +
> > +	/* Calculate the needed work queue entry size for this packet. */
> > +	wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
> > +		buf->nb_segs * sizeof(struct mlx4_wqe_data_seg);
> > +	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> > +	/*
> > +	 * Check that there is room for this WQE in the send queue and that
> > +	 * the WQE size is legal.
> > +	 */
> > +	if (((sq->head - sq->tail) + nr_txbbs +
> > +				sq->headroom_txbbs) >= sq->txbb_cnt ||
> > +			nr_txbbs > MLX4_MAX_WQE_TXBBS) {
> > +		return -1;
> > +	}
> > +
> 
> Extra empty line.
> 
> > +	/* Get the control and data entries of the WQE. */
> > +	ctrl = (struct mlx4_wqe_ctrl_seg *)mlx4_get_send_wqe(sq,
> head_idx);
> > +	dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
> > +			sizeof(struct mlx4_wqe_ctrl_seg));
> > +	*pctrl = ctrl;
> > +	/* Fill the data segments with buffer information. */
> > +	struct rte_mbuf *sbuf;
> 
> I'm usually fine with mixing declarations and code when there's a good
> reason, however in this case there's no point. sbuf could have been defined
> with the rest at the beginning of the function.
>

OK.
 
> > +
> 
> Extra empty line here as well.
> 
> > +	for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> > +		addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> > +		rte_prefetch0((volatile void *)addr);
> > +		/* Handle WQE wraparound. */
> > +		if (dseg >= (struct mlx4_wqe_data_seg *)sq->eob)
> > +			dseg = (struct mlx4_wqe_data_seg *)sq->buf;
> > +		dseg->addr = rte_cpu_to_be_64(addr);
> > +		/* Memory region key (big endian) for this memory pool. */
> > +		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > +		dseg->lkey = rte_cpu_to_be_32(lkey); #ifndef NDEBUG
> > +		/* Calculate the needed work queue entry size for this
> packet */
> > +		if (unlikely(dseg->lkey == rte_cpu_to_be_32((uint32_t)-1))) {
> > +			/* MR does not exist. */
> > +			DEBUG("%p: unable to get MP <-> MR association",
> > +					(void *)txq);
> > +			/*
> > +			 * Restamp entry in case of failure.
> > +			 * Make sure that size is written correctly
> > +			 * Note that we give ownership to the SW, not the
> HW.
> > +			 */
> > +			wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
> > +				buf->nb_segs * sizeof(struct
> mlx4_wqe_data_seg);
> > +			ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> > +			mlx4_txq_stamp_freed_wqe(sq, head_idx,
> > +					(sq->head & sq->txbb_cnt) ? 0 : 1);
> > +			return -1;
> > +		}
> > +#endif /* NDEBUG */
> > +		if (likely(sbuf->data_len)) {
> > +			byte_count = rte_cpu_to_be_32(sbuf->data_len);
> > +		} else {
> > +			/*
> > +			 * Zero length segment is treated as inline segment
> > +			 * with zero data.
> > +			 */
> > +			byte_count = RTE_BE32(0x80000000);
> > +		}
> > +		/*
> > +		 * If the data segment is not at the beginning of a
> > +		 * Tx basic block (TXBB) then write the byte count,
> > +		 * else postpone the writing to just before updating the
> > +		 * control segment.
> > +		 */
> > +		if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
> > +			/*
> > +			 * Need a barrier here before writing the byte_count
> > +			 * fields to make sure that all the data is visible
> > +			 * before the byte_count field is set.
> > +			 * Otherwise, if the segment begins a new cacheline,
> > +			 * the HCA prefetcher could grab the 64-byte chunk
> and
> > +			 * get a valid (!= 0xffffffff) byte count but stale
> > +			 * data, and end up sending the wrong data.
> > +			 */
> > +			rte_io_wmb();
> > +			dseg->byte_count = byte_count;
> > +		} else {
> > +			/*
> > +			 * This data segment starts at the beginning of a new
> > +			 * TXBB, so we need to postpone its byte_count
> writing
> > +			 * for later.
> > +			 */
> > +			pv[pv_counter].dseg = dseg;
> > +			pv[pv_counter++].val = byte_count;
> > +		}
> > +	}
> > +	/* Write the first DWORD of each TXBB save earlier. */
> > +	if (pv_counter) {
> > +		/* Need a barrier here before writing the byte_count. */
> > +		rte_io_wmb();
> > +		for (--pv_counter; pv_counter  >= 0; pv_counter--)
> > +			pv[pv_counter].dseg->byte_count =
> pv[pv_counter].val;
> > +	}
> > +	/* Fill the control parameters for this packet. */
> > +	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> > +
> 
> Extra empty line.
> 
> > +	return nr_txbbs;
> > +}
> >  /**
> >   * DPDK callback for Tx.
> >   *
> > @@ -261,10 +377,11 @@ struct pv {
> >  	unsigned int i;
> >  	unsigned int max;
> >  	struct mlx4_sq *sq = &txq->msq;
> > -	struct pv *pv = (struct pv *)txq->bounce_buf;
> > +	int nr_txbbs;
> >
> >  	assert(txq->elts_comp_cd != 0);
> > -	mlx4_txq_complete(txq);
> > +	if (likely(txq->elts_comp != 0))
> > +		mlx4_txq_complete(txq, elts_n, sq);
> >  	max = (elts_n - (elts_head - txq->elts_tail));
> >  	if (max > elts_n)
> >  		max -= elts_n;
> > @@ -283,7 +400,6 @@ struct pv {
> >  		uint32_t owner_opcode = MLX4_OPCODE_SEND;
> >  		struct mlx4_wqe_ctrl_seg *ctrl;
> >  		struct mlx4_wqe_data_seg *dseg;
> > -		struct rte_mbuf *sbuf;
> >  		union {
> >  			uint32_t flags;
> >  			uint16_t flags16[2];
> > @@ -291,10 +407,6 @@ struct pv {
> >  		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> >  		uint32_t lkey;
> >  		uintptr_t addr;
> > -		uint32_t byte_count;
> > -		int wqe_real_size;
> > -		int nr_txbbs;
> > -		int pv_counter = 0;
> >
> >  		/* Clean up old buffer. */
> >  		if (likely(elt->buf != NULL)) {
> > @@ -313,40 +425,29 @@ struct pv {
> >  			} while (tmp != NULL);
> >  		}
> >  		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
> > -
> > -		/*
> > -		 * Calculate the needed work queue entry size
> > -		 * for this packet.
> > -		 */
> > -		wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
> > -				buf->nb_segs * sizeof(struct
> mlx4_wqe_data_seg);
> > -		nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> > -		/*
> > -		 * Check that there is room for this WQE in the send
> > -		 * queue and that the WQE size is legal.
> > -		 */
> > -		if (((sq->head - sq->tail) + nr_txbbs +
> > -		     sq->headroom_txbbs) >= sq->txbb_cnt ||
> > -		    nr_txbbs > MLX4_MAX_WQE_TXBBS) {
> > -			elt->buf = NULL;
> > -			break;
> > -		}
> > -		/* Get the control and data entries of the WQE. */
> > -		ctrl = (struct mlx4_wqe_ctrl_seg *)
> > -				mlx4_get_send_wqe(sq, head_idx);
> > -		dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
> > -				sizeof(struct mlx4_wqe_ctrl_seg));
> > -		/* Fill the data segments with buffer information. */
> > -		for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> > -			addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> > +		if (buf->nb_segs == 1) {
> > +			/*
> > +			 * Check that there is room for this WQE in the send
> > +			 * queue and that the WQE size is legal
> > +			 */
> > +			if (((sq->head - sq->tail) + 1 + sq->headroom_txbbs)
> >=
> > +			     sq->txbb_cnt || 1 > MLX4_MAX_WQE_TXBBS) {
> > +				elt->buf = NULL;
> > +				break;
> > +			}
> > +			/* Get the control and data entries of the WQE. */
> > +			ctrl = (struct mlx4_wqe_ctrl_seg *)
> > +					mlx4_get_send_wqe(sq, head_idx);
> > +			dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl
> +
> > +					sizeof(struct mlx4_wqe_ctrl_seg));
> > +			addr = rte_pktmbuf_mtod(buf, uintptr_t);
> >  			rte_prefetch0((volatile void *)addr);
> >  			/* Handle WQE wraparound. */
> > -			if (unlikely(dseg >=
> > -			    (struct mlx4_wqe_data_seg *)sq->eob))
> > +			if (dseg >= (struct mlx4_wqe_data_seg *)sq->eob)
> 
> Ideally this change should have been on its own in a fix commit.
> 
> >  				dseg = (struct mlx4_wqe_data_seg *)sq-
> >buf;
> >  			dseg->addr = rte_cpu_to_be_64(addr);
> >  			/* Memory region key (big endian). */
> > -			lkey = mlx4_txq_mp2mr(txq,
> mlx4_txq_mb2mp(sbuf));
> > +			lkey = mlx4_txq_mp2mr(txq,
> mlx4_txq_mb2mp(buf));
> >  			dseg->lkey = rte_cpu_to_be_32(lkey);  #ifndef
> NDEBUG
> >  			if (unlikely(dseg->lkey ==
> > @@ -360,61 +461,28 @@ struct pv {
> >  				 * Note that we give ownership to the SW,
> >  				 * not the HW.
> >  				 */
> > -				ctrl->fence_size = (wqe_real_size >> 4) &
> 0x3f;
> > +				ctrl->fence_size =
> > +					(WQE_ONE_DATA_SEG_SIZE >> 4) &
> 0x3f;
> >  				mlx4_txq_stamp_freed_wqe(sq, head_idx,
> >  					     (sq->head & sq->txbb_cnt) ? 0 : 1);
> >  				elt->buf = NULL;
> >  				break;
> >  			}
> >  #endif /* NDEBUG */
> > -			if (likely(sbuf->data_len)) {
> > -				byte_count = rte_cpu_to_be_32(sbuf-
> >data_len);
> > -			} else {
> > -				/*
> > -				 * Zero length segment is treated as inline
> > -				 * segment with zero data.
> > -				 */
> > -				byte_count = RTE_BE32(0x80000000);
> > -			}
> > -			/*
> > -			 * If the data segment is not at the beginning
> > -			 * of a Tx basic block (TXBB) then write the
> > -			 * byte count, else postpone the writing to
> > -			 * just before updating the control segment.
> > -			 */
> > -			if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE -
> 1)) {
> > -				/*
> > -				 * Need a barrier here before writing the
> > -				 * byte_count fields to make sure that all the
> > -				 * data is visible before the byte_count field
> > -				 * is set. otherwise, if the segment begins a
> > -				 * new cacheline, the HCA prefetcher could
> grab
> > -				 * the 64-byte chunk and get a valid
> > -				 * (!= 0xffffffff) byte count but stale data,
> > -				 * and end up sending the wrong data.
> > -				 */
> > -				rte_io_wmb();
> > -				dseg->byte_count = byte_count;
> > -			} else {
> > -				/*
> > -				 * This data segment starts at the beginning
> of
> > -				 * a new TXBB, so we need to postpone its
> > -				 * byte_count writing for later.
> > -				 */
> > -				pv[pv_counter].dseg = dseg;
> > -				pv[pv_counter++].val = byte_count;
> > -			}
> > -		}
> > -		/* Write the first DWORD of each TXBB save earlier. */
> > -		if (pv_counter) {
> > -			/* Need a barrier before writing the byte_count. */
> > +			/* Need a barrier here before byte count store. */
> >  			rte_io_wmb();
> > -			for (--pv_counter; pv_counter  >= 0; pv_counter--)
> > -				pv[pv_counter].dseg->byte_count =
> > -						pv[pv_counter].val;
> > +			dseg->byte_count = rte_cpu_to_be_32(buf-
> >data_len);
> > +
> 
> Extra empty line.
> 
> > +			/* Fill the control parameters for this packet. */
> > +			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> > +			nr_txbbs = 1;
> > +		} else {
> > +			nr_txbbs = handle_multi_segs(buf, txq, &ctrl);
> > +			if (nr_txbbs < 0) {
> > +				elt->buf = NULL;
> > +				break;
> > +			}
> >  		}
> > -		/* Fill the control parameters for this packet. */
> > -		ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
> >  		/*
> >  		 * For raw Ethernet, the SOLICIT flag is used to indicate
> >  		 * that no ICRC should be calculated.
> > @@ -469,6 +537,7 @@ struct pv {
> >  		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
> >  					      ((sq->head & sq->txbb_cnt) ?
> >  						       MLX4_BIT_WQE_OWN :
> 0));
> > +
> 
> Extra empty line.
> 
> >  		sq->head += nr_txbbs;
> >  		elt->buf = buf;
> >  		bytes_sent += buf->pkt_len;
> > --
> > 1.8.3.1
> >
> 
> --
> Adrien Mazarguil
> 6WIND

Thanks!
  
Adrien Mazarguil Oct. 31, 2017, 10:17 a.m. UTC | #3
On Mon, Oct 30, 2017 at 06:23:31PM +0000, Matan Azrad wrote:
> Hi Adrien
> 
> > -----Original Message-----
> > From: Adrien Mazarguil [mailto:adrien.mazarguil@6wind.com]
> > Sent: Monday, October 30, 2017 4:24 PM
> > To: Matan Azrad <matan@mellanox.com>
> > Cc: dev@dpdk.org; Ophir Munk <ophirmu@mellanox.com>
> > Subject: Re: [PATCH v3 5/7] net/mlx4: separate Tx segment cases
> > 
> > On Mon, Oct 30, 2017 at 10:07:27AM +0000, Matan Azrad wrote:
> > > Since single segment packets shouldn't use additional memory to save
> > > segments byte count we can prevent additional memory unnecessary
> > usage
> > > in this case; Prevent loop management.
> > >
> > 
> > Sorry for asking but I really don't understand the above, can you reformulate
> > the problem addressed by this patch?
> > 
> What's about next?
> Optimize single segment case by processing it in different code which prevents checks and calculations relevant only to multi segment case. 

All right, just add "Tx" somewhere and it should be OK.
  

Patch

diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c
index 8ce70d6..8ea8851 100644
--- a/drivers/net/mlx4/mlx4_rxtx.c
+++ b/drivers/net/mlx4/mlx4_rxtx.c
@@ -62,6 +62,9 @@ 
 #include "mlx4_rxtx.h"
 #include "mlx4_utils.h"
 
+#define WQE_ONE_DATA_SEG_SIZE \
+	(sizeof(struct mlx4_wqe_ctrl_seg) + sizeof(struct mlx4_wqe_data_seg))
+
 /**
  * Pointer-value pair structure used in tx_post_send for saving the first
  * DWORD (32 byte) of a TXBB.
@@ -140,22 +143,19 @@  struct pv {
  * @return
  *   0 on success, -1 on failure.
  */
-static int
-mlx4_txq_complete(struct txq *txq)
+static inline int
+mlx4_txq_complete(struct txq *txq, const unsigned int elts_n,
+				struct mlx4_sq *sq)
 {
 	unsigned int elts_comp = txq->elts_comp;
 	unsigned int elts_tail = txq->elts_tail;
-	const unsigned int elts_n = txq->elts_n;
 	struct mlx4_cq *cq = &txq->mcq;
-	struct mlx4_sq *sq = &txq->msq;
 	struct mlx4_cqe *cqe;
 	uint32_t cons_index = cq->cons_index;
 	uint16_t new_index;
 	uint16_t nr_txbbs = 0;
 	int pkts = 0;
 
-	if (unlikely(elts_comp == 0))
-		return 0;
 	/*
 	 * Traverse over all CQ entries reported and handle each WQ entry
 	 * reported by them.
@@ -238,6 +238,122 @@  struct pv {
 	return buf->pool;
 }
 
+static int handle_multi_segs(struct rte_mbuf *buf,
+			    struct txq *txq,
+			    struct mlx4_wqe_ctrl_seg **pctrl)
+{
+	int wqe_real_size;
+	int nr_txbbs;
+	struct pv *pv = (struct pv *)txq->bounce_buf;
+	struct mlx4_sq *sq = &txq->msq;
+	uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
+	struct mlx4_wqe_ctrl_seg *ctrl;
+	struct mlx4_wqe_data_seg *dseg;
+	uint32_t lkey;
+	uintptr_t addr;
+	uint32_t byte_count;
+	int pv_counter = 0;
+
+	/* Calculate the needed work queue entry size for this packet. */
+	wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
+		buf->nb_segs * sizeof(struct mlx4_wqe_data_seg);
+	nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
+	/*
+	 * Check that there is room for this WQE in the send queue and that
+	 * the WQE size is legal.
+	 */
+	if (((sq->head - sq->tail) + nr_txbbs +
+				sq->headroom_txbbs) >= sq->txbb_cnt ||
+			nr_txbbs > MLX4_MAX_WQE_TXBBS) {
+		return -1;
+	}
+
+	/* Get the control and data entries of the WQE. */
+	ctrl = (struct mlx4_wqe_ctrl_seg *)mlx4_get_send_wqe(sq, head_idx);
+	dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
+			sizeof(struct mlx4_wqe_ctrl_seg));
+	*pctrl = ctrl;
+	/* Fill the data segments with buffer information. */
+	struct rte_mbuf *sbuf;
+
+	for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
+		addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
+		rte_prefetch0((volatile void *)addr);
+		/* Handle WQE wraparound. */
+		if (dseg >= (struct mlx4_wqe_data_seg *)sq->eob)
+			dseg = (struct mlx4_wqe_data_seg *)sq->buf;
+		dseg->addr = rte_cpu_to_be_64(addr);
+		/* Memory region key (big endian) for this memory pool. */
+		lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
+		dseg->lkey = rte_cpu_to_be_32(lkey);
+#ifndef NDEBUG
+		/* Calculate the needed work queue entry size for this packet */
+		if (unlikely(dseg->lkey == rte_cpu_to_be_32((uint32_t)-1))) {
+			/* MR does not exist. */
+			DEBUG("%p: unable to get MP <-> MR association",
+					(void *)txq);
+			/*
+			 * Restamp entry in case of failure.
+			 * Make sure that size is written correctly
+			 * Note that we give ownership to the SW, not the HW.
+			 */
+			wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
+				buf->nb_segs * sizeof(struct mlx4_wqe_data_seg);
+			ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
+			mlx4_txq_stamp_freed_wqe(sq, head_idx,
+					(sq->head & sq->txbb_cnt) ? 0 : 1);
+			return -1;
+		}
+#endif /* NDEBUG */
+		if (likely(sbuf->data_len)) {
+			byte_count = rte_cpu_to_be_32(sbuf->data_len);
+		} else {
+			/*
+			 * Zero length segment is treated as inline segment
+			 * with zero data.
+			 */
+			byte_count = RTE_BE32(0x80000000);
+		}
+		/*
+		 * If the data segment is not at the beginning of a
+		 * Tx basic block (TXBB) then write the byte count,
+		 * else postpone the writing to just before updating the
+		 * control segment.
+		 */
+		if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
+			/*
+			 * Need a barrier here before writing the byte_count
+			 * fields to make sure that all the data is visible
+			 * before the byte_count field is set.
+			 * Otherwise, if the segment begins a new cacheline,
+			 * the HCA prefetcher could grab the 64-byte chunk and
+			 * get a valid (!= 0xffffffff) byte count but stale
+			 * data, and end up sending the wrong data.
+			 */
+			rte_io_wmb();
+			dseg->byte_count = byte_count;
+		} else {
+			/*
+			 * This data segment starts at the beginning of a new
+			 * TXBB, so we need to postpone its byte_count writing
+			 * for later.
+			 */
+			pv[pv_counter].dseg = dseg;
+			pv[pv_counter++].val = byte_count;
+		}
+	}
+	/* Write the first DWORD of each TXBB save earlier. */
+	if (pv_counter) {
+		/* Need a barrier here before writing the byte_count. */
+		rte_io_wmb();
+		for (--pv_counter; pv_counter  >= 0; pv_counter--)
+			pv[pv_counter].dseg->byte_count = pv[pv_counter].val;
+	}
+	/* Fill the control parameters for this packet. */
+	ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
+
+	return nr_txbbs;
+}
 /**
  * DPDK callback for Tx.
  *
@@ -261,10 +377,11 @@  struct pv {
 	unsigned int i;
 	unsigned int max;
 	struct mlx4_sq *sq = &txq->msq;
-	struct pv *pv = (struct pv *)txq->bounce_buf;
+	int nr_txbbs;
 
 	assert(txq->elts_comp_cd != 0);
-	mlx4_txq_complete(txq);
+	if (likely(txq->elts_comp != 0))
+		mlx4_txq_complete(txq, elts_n, sq);
 	max = (elts_n - (elts_head - txq->elts_tail));
 	if (max > elts_n)
 		max -= elts_n;
@@ -283,7 +400,6 @@  struct pv {
 		uint32_t owner_opcode = MLX4_OPCODE_SEND;
 		struct mlx4_wqe_ctrl_seg *ctrl;
 		struct mlx4_wqe_data_seg *dseg;
-		struct rte_mbuf *sbuf;
 		union {
 			uint32_t flags;
 			uint16_t flags16[2];
@@ -291,10 +407,6 @@  struct pv {
 		uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
 		uint32_t lkey;
 		uintptr_t addr;
-		uint32_t byte_count;
-		int wqe_real_size;
-		int nr_txbbs;
-		int pv_counter = 0;
 
 		/* Clean up old buffer. */
 		if (likely(elt->buf != NULL)) {
@@ -313,40 +425,29 @@  struct pv {
 			} while (tmp != NULL);
 		}
 		RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
-
-		/*
-		 * Calculate the needed work queue entry size
-		 * for this packet.
-		 */
-		wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
-				buf->nb_segs * sizeof(struct mlx4_wqe_data_seg);
-		nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
-		/*
-		 * Check that there is room for this WQE in the send
-		 * queue and that the WQE size is legal.
-		 */
-		if (((sq->head - sq->tail) + nr_txbbs +
-		     sq->headroom_txbbs) >= sq->txbb_cnt ||
-		    nr_txbbs > MLX4_MAX_WQE_TXBBS) {
-			elt->buf = NULL;
-			break;
-		}
-		/* Get the control and data entries of the WQE. */
-		ctrl = (struct mlx4_wqe_ctrl_seg *)
-				mlx4_get_send_wqe(sq, head_idx);
-		dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
-				sizeof(struct mlx4_wqe_ctrl_seg));
-		/* Fill the data segments with buffer information. */
-		for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
-			addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
+		if (buf->nb_segs == 1) {
+			/*
+			 * Check that there is room for this WQE in the send
+			 * queue and that the WQE size is legal
+			 */
+			if (((sq->head - sq->tail) + 1 + sq->headroom_txbbs) >=
+			     sq->txbb_cnt || 1 > MLX4_MAX_WQE_TXBBS) {
+				elt->buf = NULL;
+				break;
+			}
+			/* Get the control and data entries of the WQE. */
+			ctrl = (struct mlx4_wqe_ctrl_seg *)
+					mlx4_get_send_wqe(sq, head_idx);
+			dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
+					sizeof(struct mlx4_wqe_ctrl_seg));
+			addr = rte_pktmbuf_mtod(buf, uintptr_t);
 			rte_prefetch0((volatile void *)addr);
 			/* Handle WQE wraparound. */
-			if (unlikely(dseg >=
-			    (struct mlx4_wqe_data_seg *)sq->eob))
+			if (dseg >= (struct mlx4_wqe_data_seg *)sq->eob)
 				dseg = (struct mlx4_wqe_data_seg *)sq->buf;
 			dseg->addr = rte_cpu_to_be_64(addr);
 			/* Memory region key (big endian). */
-			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
+			lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
 			dseg->lkey = rte_cpu_to_be_32(lkey);
 #ifndef NDEBUG
 			if (unlikely(dseg->lkey ==
@@ -360,61 +461,28 @@  struct pv {
 				 * Note that we give ownership to the SW,
 				 * not the HW.
 				 */
-				ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
+				ctrl->fence_size =
+					(WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
 				mlx4_txq_stamp_freed_wqe(sq, head_idx,
 					     (sq->head & sq->txbb_cnt) ? 0 : 1);
 				elt->buf = NULL;
 				break;
 			}
 #endif /* NDEBUG */
-			if (likely(sbuf->data_len)) {
-				byte_count = rte_cpu_to_be_32(sbuf->data_len);
-			} else {
-				/*
-				 * Zero length segment is treated as inline
-				 * segment with zero data.
-				 */
-				byte_count = RTE_BE32(0x80000000);
-			}
-			/*
-			 * If the data segment is not at the beginning
-			 * of a Tx basic block (TXBB) then write the
-			 * byte count, else postpone the writing to
-			 * just before updating the control segment.
-			 */
-			if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
-				/*
-				 * Need a barrier here before writing the
-				 * byte_count fields to make sure that all the
-				 * data is visible before the byte_count field
-				 * is set. otherwise, if the segment begins a
-				 * new cacheline, the HCA prefetcher could grab
-				 * the 64-byte chunk and get a valid
-				 * (!= 0xffffffff) byte count but stale data,
-				 * and end up sending the wrong data.
-				 */
-				rte_io_wmb();
-				dseg->byte_count = byte_count;
-			} else {
-				/*
-				 * This data segment starts at the beginning of
-				 * a new TXBB, so we need to postpone its
-				 * byte_count writing for later.
-				 */
-				pv[pv_counter].dseg = dseg;
-				pv[pv_counter++].val = byte_count;
-			}
-		}
-		/* Write the first DWORD of each TXBB save earlier. */
-		if (pv_counter) {
-			/* Need a barrier before writing the byte_count. */
+			/* Need a barrier here before byte count store. */
 			rte_io_wmb();
-			for (--pv_counter; pv_counter  >= 0; pv_counter--)
-				pv[pv_counter].dseg->byte_count =
-						pv[pv_counter].val;
+			dseg->byte_count = rte_cpu_to_be_32(buf->data_len);
+
+			/* Fill the control parameters for this packet. */
+			ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f;
+			nr_txbbs = 1;
+		} else {
+			nr_txbbs = handle_multi_segs(buf, txq, &ctrl);
+			if (nr_txbbs < 0) {
+				elt->buf = NULL;
+				break;
+			}
 		}
-		/* Fill the control parameters for this packet. */
-		ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
 		/*
 		 * For raw Ethernet, the SOLICIT flag is used to indicate
 		 * that no ICRC should be calculated.
@@ -469,6 +537,7 @@  struct pv {
 		ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
 					      ((sq->head & sq->txbb_cnt) ?
 						       MLX4_BIT_WQE_OWN : 0));
+
 		sq->head += nr_txbbs;
 		elt->buf = buf;
 		bytes_sent += buf->pkt_len;