[v3,03/15] vhost: add batch enqueue function for packed ring

Message ID 20190925171329.63734-4-yong.liu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series vhost packed ring performance optimization |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Marvin Liu Sept. 25, 2019, 5:13 p.m. UTC
  Batch enqueue function will first check whether descriptors are cache
aligned. It will also check prerequisites in the beginning. Batch
enqueue function not support chained mbufs, single packet enqueue
function will handle it.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
  

Comments

Mattias Rönnblom Sept. 25, 2019, 7:31 p.m. UTC | #1
On 2019-09-25 19:13, Marvin Liu wrote:
> Batch enqueue function will first check whether descriptors are cache
> aligned. It will also check prerequisites in the beginning. Batch
> enqueue function not support chained mbufs, single packet enqueue
> function will handle it.
> 
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> 
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 4cba8c5ef..e241436c7 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -39,6 +39,10 @@
>   
>   #define VHOST_LOG_CACHE_NR 32
>   
> +#define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
> +			    sizeof(struct vring_packed_desc))
> +#define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
> +
>   #ifdef SUPPORT_GCC_UNROLL_PRAGMA
>   #define UNROLL_PRAGMA_PARAM "GCC unroll 4"
>   #endif
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 520c4c6a8..5e08f7d9b 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -883,6 +883,86 @@ virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
>   	return pkt_idx;
>   }
>   
> +static __rte_unused int
> +virtio_dev_rx_batch_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +	 struct rte_mbuf **pkts)
> +{
> +	bool wrap_counter = vq->avail_wrap_counter;
> +	struct vring_packed_desc *descs = vq->desc_packed;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +	uint64_t desc_addrs[PACKED_BATCH_SIZE];
> +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
> +	uint32_t buf_offset = dev->vhost_hlen;
> +	uint64_t lens[PACKED_BATCH_SIZE];
> +	uint16_t i;
> +
> +	if (unlikely(avail_idx & PACKED_BATCH_MASK))
> +		return -1;

Does this really generate better code than just "avail_idx < 
PACKED_BATCH_SIZE"? and+jne vs cmp+jbe.

> +
> +	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
> +		return -1;
> +
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> +		if (unlikely(pkts[i]->next != NULL))
> +			return -1;
> +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> +					    wrap_counter)))
> +			return -1;
> +	}
> +
> +	rte_smp_rmb();
> +
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++)
> +		lens[i] = descs[avail_idx + i].len;
> +
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> +			return -1;
> +	}
> +
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++)
> +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> +						  descs[avail_idx + i].addr,
> +						  &lens[i],
> +						  VHOST_ACCESS_RW);
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> +			return -1;
> +	}
> +
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
> +					(uintptr_t)desc_addrs[i];
> +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> +	}
> +
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++)
> +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> +
> +	vq->last_avail_idx += PACKED_BATCH_SIZE;
> +	if (vq->last_avail_idx >= vq->size) {
> +		vq->last_avail_idx -= vq->size;
> +		vq->avail_wrap_counter ^= 1;
> +	}
> +
> +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> +			   pkts[i]->pkt_len);
> +	}
> +
> +	return 0;
> +}
> +
>   static __rte_unused int16_t
>   virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
>   	struct rte_mbuf *pkt)
>
  
Marvin Liu Oct. 9, 2019, 2:45 a.m. UTC | #2
> -----Original Message-----
> From: Mattias Rönnblom [mailto:hofors@lysator.liu.se]
> Sent: Thursday, September 26, 2019 3:31 AM
> To: Liu, Yong <yong.liu@intel.com>; maxime.coquelin@redhat.com; Bie, Tiwei
> <tiwei.bie@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>;
> stephen@networkplumber.org; gavin.hu@arm.com
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3 03/15] vhost: add batch enqueue function
> for packed ring
> 
> On 2019-09-25 19:13, Marvin Liu wrote:
> > Batch enqueue function will first check whether descriptors are cache
> > aligned. It will also check prerequisites in the beginning. Batch
> > enqueue function not support chained mbufs, single packet enqueue
> > function will handle it.
> >
> > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> >
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > index 4cba8c5ef..e241436c7 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -39,6 +39,10 @@
> >
> >   #define VHOST_LOG_CACHE_NR 32
> >
> > +#define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
> > +			    sizeof(struct vring_packed_desc))
> > +#define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
> > +
> >   #ifdef SUPPORT_GCC_UNROLL_PRAGMA
> >   #define UNROLL_PRAGMA_PARAM "GCC unroll 4"
> >   #endif
> > diff --git a/lib/librte_vhost/virtio_net.c
> b/lib/librte_vhost/virtio_net.c
> > index 520c4c6a8..5e08f7d9b 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -883,6 +883,86 @@ virtio_dev_rx_split(struct virtio_net *dev, struct
> vhost_virtqueue *vq,
> >   	return pkt_idx;
> >   }
> >
> > +static __rte_unused int
> > +virtio_dev_rx_batch_packed(struct virtio_net *dev, struct
> vhost_virtqueue *vq,
> > +	 struct rte_mbuf **pkts)
> > +{
> > +	bool wrap_counter = vq->avail_wrap_counter;
> > +	struct vring_packed_desc *descs = vq->desc_packed;
> > +	uint16_t avail_idx = vq->last_avail_idx;
> > +	uint64_t desc_addrs[PACKED_BATCH_SIZE];
> > +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
> > +	uint32_t buf_offset = dev->vhost_hlen;
> > +	uint64_t lens[PACKED_BATCH_SIZE];
> > +	uint16_t i;
> > +
> > +	if (unlikely(avail_idx & PACKED_BATCH_MASK))
> > +		return -1;
> 
> Does this really generate better code than just "avail_idx <
> PACKED_BATCH_SIZE"? and+jne vs cmp+jbe.

Hi Mattias,
This comparison is to check whether descriptor location is cache aligned. 
In x86 cache line size is 64 bytes, so here mask is 0x3. This check will be and + test + je which is very simple. 
Most of times the cost of execution will be eliminated as the result can be predicted.

Thanks,
Marvin

> 
> > +
> > +	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
> > +		return -1;
> > +
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> > +		if (unlikely(pkts[i]->next != NULL))
> > +			return -1;
> > +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> > +					    wrap_counter)))
> > +			return -1;
> > +	}
> > +
> > +	rte_smp_rmb();
> > +
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++)
> > +		lens[i] = descs[avail_idx + i].len;
> > +
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> > +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> > +			return -1;
> > +	}
> > +
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++)
> > +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> > +						  descs[avail_idx + i].addr,
> > +						  &lens[i],
> > +						  VHOST_ACCESS_RW);
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> > +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> > +			return -1;
> > +	}
> > +
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> > +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> > +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
> > +					(uintptr_t)desc_addrs[i];
> > +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> > +	}
> > +
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++)
> > +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> > +
> > +	vq->last_avail_idx += PACKED_BATCH_SIZE;
> > +	if (vq->last_avail_idx >= vq->size) {
> > +		vq->last_avail_idx -= vq->size;
> > +		vq->avail_wrap_counter ^= 1;
> > +	}
> > +
> > +	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
> > +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> > +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> > +			   pkts[i]->pkt_len);
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> >   static __rte_unused int16_t
> >   virtio_dev_rx_single_packed(struct virtio_net *dev, struct
> vhost_virtqueue *vq,
> >   	struct rte_mbuf *pkt)
> >
  

Patch

diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 4cba8c5ef..e241436c7 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -39,6 +39,10 @@ 
 
 #define VHOST_LOG_CACHE_NR 32
 
+#define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
+			    sizeof(struct vring_packed_desc))
+#define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
+
 #ifdef SUPPORT_GCC_UNROLL_PRAGMA
 #define UNROLL_PRAGMA_PARAM "GCC unroll 4"
 #endif
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 520c4c6a8..5e08f7d9b 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -883,6 +883,86 @@  virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return pkt_idx;
 }
 
+static __rte_unused int
+virtio_dev_rx_batch_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
+	 struct rte_mbuf **pkts)
+{
+	bool wrap_counter = vq->avail_wrap_counter;
+	struct vring_packed_desc *descs = vq->desc_packed;
+	uint16_t avail_idx = vq->last_avail_idx;
+	uint64_t desc_addrs[PACKED_BATCH_SIZE];
+	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
+	uint32_t buf_offset = dev->vhost_hlen;
+	uint64_t lens[PACKED_BATCH_SIZE];
+	uint16_t i;
+
+	if (unlikely(avail_idx & PACKED_BATCH_MASK))
+		return -1;
+
+	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
+		return -1;
+
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
+		if (unlikely(pkts[i]->next != NULL))
+			return -1;
+		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
+					    wrap_counter)))
+			return -1;
+	}
+
+	rte_smp_rmb();
+
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++)
+		lens[i] = descs[avail_idx + i].len;
+
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
+		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
+			return -1;
+	}
+
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++)
+		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
+						  descs[avail_idx + i].addr,
+						  &lens[i],
+						  VHOST_ACCESS_RW);
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
+		if (unlikely(lens[i] != descs[avail_idx + i].len))
+			return -1;
+	}
+
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
+		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
+		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
+					(uintptr_t)desc_addrs[i];
+		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
+	}
+
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++)
+		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
+
+	vq->last_avail_idx += PACKED_BATCH_SIZE;
+	if (vq->last_avail_idx >= vq->size) {
+		vq->last_avail_idx -= vq->size;
+		vq->avail_wrap_counter ^= 1;
+	}
+
+	UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM)
+	for (i = 0; i < PACKED_BATCH_SIZE; i++) {
+		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
+			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
+			   pkts[i]->pkt_len);
+	}
+
+	return 0;
+}
+
 static __rte_unused int16_t
 virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mbuf *pkt)