[dpdk-dev] Making space in mbuf data-structure

Ananyev, Konstantin konstantin.ananyev at intel.com
Tue Jul 15 11:31:20 CEST 2014


Hi Bruce,

> -----Original Message-----
> From: dev [mailto:dev-bounces at dpdk.org] On Behalf Of Richardson, Bruce
> Sent: Friday, July 04, 2014 12:39 AM
> To: dev at dpdk.org
> Subject: [dpdk-dev] Making space in mbuf data-structure
> 
> Hi all,
> 
> At this stage it's been well recognised that we need to make more space in the mbuf data structure for new fields. We in Intel have
> had some discussion on this and this email is to outline what our current thinking and approach on this issue is, and look for additional
> suggestions and feedback.
> 
> Firstly, we believe that there is no possible way that we can ever fit all the fields we need to fit into a 64-byte mbuf, and so we need to
> start looking at a 128-byte mbuf instead. Examples of new fields that need to fit in, include - 32-64 bits for additional offload flags, 32-
> bits more for filter information for support for the new filters in the i40e driver, an additional 2-4 bytes for storing info on a second
> vlan tag, 4-bytes for storing a sequence number to enable packet reordering in future, as well as potentially a number of other fields
> or splitting out fields that are superimposed over each other right now, e.g. for the qos scheduler. We also want to allow space for use
> by other non-Intel NIC drivers that may be open-sourced to dpdk.org in the future too, where they support fields and offloads that
> our hardware doesn't.
> 
> If we accept the fact of a 2-cache-line mbuf, then the issue becomes how to rework things so that we spread our fields over the two
> cache lines while causing the lowest slow-down possible. The general approach that we are looking to take is to focus the first cache
> line on fields that are updated on RX , so that receive only deals with one cache line. The second cache line can be used for application
> data and information that will only be used on the TX leg. This would allow us to work on the first cache line in RX as now, and have the
> second cache line being prefetched in the background so that it is available when necessary. Hardware prefetches should help us out
> here. We also may move rarely used, or slow-path RX fields e.g. such as those for chained mbufs with jumbo frames, to the second
> cache line, depending upon the performance impact and bytes savings achieved.
> 
> With this approach, we still need to make space in the first cache line for information for the new or expanded receive offloads. First
> off the blocks is to look at moving the mempool pointer into the second cache line. This will free-up 8 bytes in cache line  0, with a field
> that is only used when cleaning up after TX. A prototype patch for this change is given below, for your review and comment. Initial
> quick tests with testpmd (testpmd -c 600 -n 4 -- --mbcache=250 --txqflags=0xf01 --burst=32 --txrst=32 --txfreet=32 --rxfreet=32 --
> total-num-mbufs=65536), and l3fwd (l3fwd -c 400 -n 4 -- -p F -P --config="(0,0,10),(1,0,10),(2,0,10),(3,0,10)") showed only a slight
> performance decrease with testpmd and equal or slightly better performance with l3fwd. These would both have been using the
> vector PMD - I haven't looked to change the fallback TX implementation yet for this change, since it's not as performance optimized
> and hence cycle-count sensitive.

Regarding code changes itself:
If I understand your code changes correctly:
ixgbe_tx_free_bufs() will use rte_mempool_put_bulk() *only* if all mbufs in the bulk belong to the same mempool.
If we have let say 2 groups of mbufs from 2 different mempools - it wouldn't be able to use put_bulk. 
While, as I remember,  current implementation is able to use put_bulk() in such case.
I think, it is possible to change you code to do a better grouping.

> 
> Beyond this change, I'm also investigating potentially moving the "next" pointer to the second cache line, but it's looking harder to
> move without serious impact, so we'll have to see there what can be done. Other fields I'll examine in due course too, and send on
> any findings I may have.

Could you explain it a bit?
I always had an impression that moving next pointer to the 2-nd cache line would have less impact then moving mempool pointer.
My thinking was: next is used only in scatter versions of RX/TX which are slower than optimised RX/TX anyway.
So few extra cycles wouldn't be that noticeable. 
Though I admit I never did any measurements for that case.

About space savings for the first cache line:
- I still think that Oliver's suggestion of replacing data pointer (8 bytes) with data offset (2 bytes) makes a lot of sense.

> 
> Once we have freed up space, then we can start looking at what fields get to use that space and what way we shuffle the existing
> fields about, but that's a discussion for another day!
> 
> Please see patch below for moving pool pointer to second cache line of mbuf. All feedback welcome, naturally.
> 
> Regards,
> /Bruce
> 
> diff --git a/app/test/test_mbuf.c b/app/test/test_mbuf.c
> index 2b87521..e0cf30c 100644
> --- a/app/test/test_mbuf.c
> +++ b/app/test/test_mbuf.c
> @@ -832,7 +832,7 @@ test_failing_mbuf_sanity_check(void)
>  int
>  test_mbuf(void)
>  {
> -	RTE_BUILD_BUG_ON(sizeof(struct rte_mbuf) != 64);
> +	RTE_BUILD_BUG_ON(sizeof(struct rte_mbuf) != CACHE_LINE_SIZE*2);
> 
>  	/* create pktmbuf pool if it does not exist */
>  	if (pktmbuf_pool == NULL) {
> diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
> index 2735f37..aa3ec32 100644
> --- a/lib/librte_mbuf/rte_mbuf.h
> +++ b/lib/librte_mbuf/rte_mbuf.h
> @@ -181,7 +181,8 @@ enum rte_mbuf_type {
>   * The generic rte_mbuf, containing a packet mbuf or a control mbuf.
>   */
>  struct rte_mbuf {
> -	struct rte_mempool *pool; /**< Pool from which mbuf was allocated. */
> +	uint64_t this_space_for_rent; // 8-bytes free for reuse.
> +
>  	void *buf_addr;           /**< Virtual address of segment buffer. */
>  	phys_addr_t buf_physaddr; /**< Physical address of segment buffer. */
>  	uint16_t buf_len;         /**< Length of segment buffer. */
> @@ -210,12 +211,16 @@ struct rte_mbuf {
>  		struct rte_pktmbuf pkt;
>  	};
> 
> +// second cache line starts here
> +	struct rte_mempool *pool __rte_cache_aligned;
> +			/**< Pool from which mbuf was allocated. */
> +
>  	union {
>  		uint8_t metadata[0];
>  		uint16_t metadata16[0];
>  		uint32_t metadata32[0];
>  		uint64_t metadata64[0];
> -	};
> +	} __rte_cache_aligned;
>  } __rte_cache_aligned;
> 
>  #define RTE_MBUF_METADATA_UINT8(mbuf, offset)              \
> diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
> index 64c0695..7374e0d 100644
> --- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
> +++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
> @@ -171,10 +171,6 @@ struct igb_tx_queue {
>  	volatile union ixgbe_adv_tx_desc *tx_ring;
>  	uint64_t            tx_ring_phys_addr; /**< TX ring DMA address. */
>  	struct igb_tx_entry *sw_ring;      /**< virtual address of SW ring. */
> -#ifdef RTE_IXGBE_INC_VECTOR
> -	/** continuous tx entry sequence within the same mempool */
> -	struct igb_tx_entry_seq *sw_ring_seq;
> -#endif
>  	volatile uint32_t   *tdt_reg_addr; /**< Address of TDT register. */
>  	uint16_t            nb_tx_desc;    /**< number of TX descriptors. */
>  	uint16_t            tx_tail;       /**< current value of TDT reg. */
> diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
> index 09e19a3..e9fd739 100644
> --- a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
> +++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
> @@ -401,9 +401,8 @@ static inline int __attribute__((always_inline))
>  ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
>  {
>  	struct igb_tx_entry_v *txep;
> -	struct igb_tx_entry_seq *txsp;
>  	uint32_t status;
> -	uint32_t n, k;
> +	uint32_t n;
>  #ifdef RTE_MBUF_SCATTER_GATHER
>  	uint32_t i;
>  	int nb_free = 0;
> @@ -423,25 +422,36 @@ ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
>  	 */
>  	txep = &((struct igb_tx_entry_v *)txq->sw_ring)[txq->tx_next_dd -
>  			(n - 1)];
> -	txsp = &txq->sw_ring_seq[txq->tx_next_dd - (n - 1)];
> 
> -	while (n > 0) {
> -		k = RTE_MIN(n, txsp[n-1].same_pool);
>  #ifdef RTE_MBUF_SCATTER_GATHER
> -		for (i = 0; i < k; i++) {
> -			m = __rte_pktmbuf_prefree_seg((txep+n-k+i)->mbuf);
> +	m = __rte_pktmbuf_prefree_seg(txep[0].mbuf);
> +	if (likely(m != NULL)) {
> +		free[0] = m;
> +		nb_free = 1;
> +		for (i = 1; i < n; i++) {
> +			m = __rte_pktmbuf_prefree_seg(txep[i].mbuf);
> +			if (likely(m != NULL)) {
> +				if (likely(m->pool == free[0]->pool))
> +					free[nb_free++] = m;
> +				else
> +					rte_mempool_put(m->pool, m);
> +			}
> +		}
> +		rte_mempool_put_bulk(free[0]->pool, (void **)free, nb_free);
> +	}
> +	else {
> +		for (i = 1; i < n; i++) {
> +			m = __rte_pktmbuf_prefree_seg(txep[i].mbuf);
>  			if (m != NULL)
> -				free[nb_free++] = m;
> +				rte_mempool_put(m->pool, m);
>  		}
> -		rte_mempool_put_bulk((void *)txsp[n-1].pool,
> -				(void **)free, nb_free);
> -#else
> -		rte_mempool_put_bulk((void *)txsp[n-1].pool,
> -				(void **)(txep+n-k), k);
> -#endif
> -		n -= k;
>  	}
> 
> +#else /* no scatter_gather */
> +	for (i = 0; i < n; i++)
> +		rte_mempool_put(m->pool, txep[i].mbuf);
> +#endif /* scatter_gather */
> +
>  	/* buffers were freed, update counters */
>  	txq->nb_tx_free = (uint16_t)(txq->nb_tx_free + txq->tx_rs_thresh);
>  	txq->tx_next_dd = (uint16_t)(txq->tx_next_dd + txq->tx_rs_thresh);
> @@ -453,19 +463,11 @@ ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
> 
>  static inline void __attribute__((always_inline))
>  tx_backlog_entry(struct igb_tx_entry_v *txep,
> -		 struct igb_tx_entry_seq *txsp,
>  		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>  {
>  	int i;
> -	for (i = 0; i < (int)nb_pkts; ++i) {
> +	for (i = 0; i < (int)nb_pkts; ++i)
>  		txep[i].mbuf = tx_pkts[i];
> -		/* check and update sequence number */
> -		txsp[i].pool = tx_pkts[i]->pool;
> -		if (txsp[i-1].pool == tx_pkts[i]->pool)
> -			txsp[i].same_pool = txsp[i-1].same_pool + 1;
> -		else
> -			txsp[i].same_pool = 1;
> -	}
>  }
> 
>  uint16_t
> @@ -475,7 +477,6 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
>  	struct igb_tx_queue *txq = (struct igb_tx_queue *)tx_queue;
>  	volatile union ixgbe_adv_tx_desc *txdp;
>  	struct igb_tx_entry_v *txep;
> -	struct igb_tx_entry_seq *txsp;
>  	uint16_t n, nb_commit, tx_id;
>  	__m128i flags = _mm_set_epi32(DCMD_DTYP_FLAGS, 0, 0, 0);
>  	__m128i rs = _mm_set_epi32(IXGBE_ADVTXD_DCMD_RS|DCMD_DTYP_FLAGS,
> @@ -495,14 +496,13 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
>  	tx_id = txq->tx_tail;
>  	txdp = &txq->tx_ring[tx_id];
>  	txep = &((struct igb_tx_entry_v *)txq->sw_ring)[tx_id];
> -	txsp = &txq->sw_ring_seq[tx_id];
> 
>  	txq->nb_tx_free = (uint16_t)(txq->nb_tx_free - nb_pkts);
> 
>  	n = (uint16_t)(txq->nb_tx_desc - tx_id);
>  	if (nb_commit >= n) {
> 
> -		tx_backlog_entry(txep, txsp, tx_pkts, n);
> +		tx_backlog_entry(txep, tx_pkts, n);
> 
>  		for (i = 0; i < n - 1; ++i, ++tx_pkts, ++txdp)
>  			vtx1(txdp, *tx_pkts, flags);
> @@ -517,10 +517,9 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
>  		/* avoid reach the end of ring */
>  		txdp = &(txq->tx_ring[tx_id]);
>  		txep = &(((struct igb_tx_entry_v *)txq->sw_ring)[tx_id]);
> -		txsp = &(txq->sw_ring_seq[tx_id]);
>  	}
> 
> -	tx_backlog_entry(txep, txsp, tx_pkts, nb_commit);
> +	tx_backlog_entry(txep, tx_pkts, nb_commit);
> 
>  	vtx(txdp, tx_pkts, nb_commit, flags);
> 
> @@ -544,7 +543,6 @@ ixgbe_tx_queue_release_mbufs(struct igb_tx_queue *txq)
>  {
>  	unsigned i;
>  	struct igb_tx_entry_v *txe;
> -	struct igb_tx_entry_seq *txs;
>  	uint16_t nb_free, max_desc;
> 
>  	if (txq->sw_ring != NULL) {
> @@ -562,10 +560,6 @@ ixgbe_tx_queue_release_mbufs(struct igb_tx_queue *txq)
>  		for (i = 0; i < txq->nb_tx_desc; i++) {
>  			txe = (struct igb_tx_entry_v *)&txq->sw_ring[i];
>  			txe->mbuf = NULL;
> -
> -			txs = &txq->sw_ring_seq[i];
> -			txs->pool = NULL;
> -			txs->same_pool = 0;
>  		}
>  	}
>  }
> @@ -580,11 +574,6 @@ ixgbe_tx_free_swring(struct igb_tx_queue *txq)
>  		rte_free((struct igb_rx_entry *)txq->sw_ring - 1);
>  		txq->sw_ring = NULL;
>  	}
> -
> -	if (txq->sw_ring_seq != NULL) {
> -		rte_free(txq->sw_ring_seq - 1);
> -		txq->sw_ring_seq = NULL;
> -	}
>  }
> 
>  static void
> @@ -593,7 +582,6 @@ ixgbe_reset_tx_queue(struct igb_tx_queue *txq)
>  	static const union ixgbe_adv_tx_desc zeroed_desc = { .read = {
>  			.buffer_addr = 0} };
>  	struct igb_tx_entry_v *txe = (struct igb_tx_entry_v *)txq->sw_ring;
> -	struct igb_tx_entry_seq *txs = txq->sw_ring_seq;
>  	uint16_t i;
> 
>  	/* Zero out HW ring memory */
> @@ -605,8 +593,6 @@ ixgbe_reset_tx_queue(struct igb_tx_queue *txq)
>  		volatile union ixgbe_adv_tx_desc *txd = &txq->tx_ring[i];
>  		txd->wb.status = IXGBE_TXD_STAT_DD;
>  		txe[i].mbuf = NULL;
> -		txs[i].pool = NULL;
> -		txs[i].same_pool = 0;
>  	}
> 
>  	txq->tx_next_dd = (uint16_t)(txq->tx_rs_thresh - 1);
> @@ -632,27 +618,14 @@ static struct ixgbe_txq_ops vec_txq_ops = {
>  };
> 
>  int ixgbe_txq_vec_setup(struct igb_tx_queue *txq,
> -			unsigned int socket_id)
> +			__rte_unused unsigned int socket_id)
>  {
> -	uint16_t nb_desc;
> -
>  	if (txq->sw_ring == NULL)
>  		return -1;
> 
> -	/* request addtional one entry for continous sequence check */
> -	nb_desc = (uint16_t)(txq->nb_tx_desc + 1);
> -
> -	txq->sw_ring_seq = rte_zmalloc_socket("txq->sw_ring_seq",
> -				sizeof(struct igb_tx_entry_seq) * nb_desc,
> -				CACHE_LINE_SIZE, socket_id);
> -	if (txq->sw_ring_seq == NULL)
> -		return -1;
> -
> -
>  	/* leave the first one for overflow */
>  	txq->sw_ring = (struct igb_tx_entry *)
>  		((struct igb_tx_entry_v *)txq->sw_ring + 1);
> -	txq->sw_ring_seq += 1;
>  	txq->ops = &vec_txq_ops;
> 
>  	return 0;


More information about the dev mailing list