[v2,13/16] vhost: optimize dequeue function of packed ring

Message ID 20190919163643.24130-14-yong.liu@intel.com (mailing list archive)
State Superseded, archived
Headers
Series vhost packed ring performance optimization |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Marvin Liu Sept. 19, 2019, 4:36 p.m. UTC
  Optimize vhost device Rx datapath by separate functions. No-chained
and direct descriptors will be handled by burst and other will be
handled one by one as before.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
  

Patch

diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index a8df74f87..066514e43 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -182,17 +182,6 @@  flush_dequeue_shadow_packed(struct virtio_net *dev, struct vhost_virtqueue *vq)
 	vhost_log_cache_sync(dev, vq);
 }
 
-static __rte_always_inline void
-update_shadow_packed(struct vhost_virtqueue *vq,
-			 uint16_t desc_idx, uint32_t len, uint16_t count)
-{
-	uint16_t i = vq->shadow_used_idx++;
-
-	vq->shadow_used_packed[i].id  = desc_idx;
-	vq->shadow_used_packed[i].len = len;
-	vq->shadow_used_packed[i].count = count;
-}
-
 static __rte_always_inline void
 flush_burst_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	uint64_t *lens, uint16_t *ids, uint16_t flags)
@@ -383,7 +372,7 @@  flush_enqueue_packed(struct virtio_net *dev,
 	}
 }
 
-static __rte_unused __rte_always_inline void
+static __rte_always_inline void
 flush_dequeue_packed(struct virtio_net *dev, struct vhost_virtqueue *vq)
 {
 	if (!vq->shadow_used_idx)
@@ -1809,7 +1798,7 @@  vhost_dequeue_burst_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_burst_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts)
 {
@@ -1887,7 +1876,7 @@  vhost_dequeue_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts)
 {
@@ -1909,7 +1898,7 @@  virtio_dev_tx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-static __rte_unused __rte_always_inline int
+static __rte_always_inline int
 virtio_dev_tx_burst_packed_zmbuf(struct virtio_net *dev,
 					struct vhost_virtqueue *vq,
 					struct rte_mempool *mbuf_pool,
@@ -1971,7 +1960,7 @@  virtio_dev_tx_burst_packed_zmbuf(struct virtio_net *dev,
 	return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
 	struct rte_mbuf **pkts)
@@ -2006,7 +1995,7 @@  virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
 	return 0;
 }
 
-static __rte_unused void
+static __rte_always_inline void
 free_zmbuf(struct vhost_virtqueue *vq)
 {
 	struct zcopy_mbuf *next = NULL;
@@ -2048,120 +2037,97 @@  free_zmbuf(struct vhost_virtqueue *vq)
 }
 
 static __rte_noinline uint16_t
-virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+virtio_dev_tx_packed_zmbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint32_t count)
 {
-	uint16_t i;
-
-	if (unlikely(dev->dequeue_zero_copy)) {
-		struct zcopy_mbuf *zmbuf, *next;
+	uint32_t pkt_idx = 0;
+	uint32_t remained = count;
+	int ret;
 
-		for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
-		     zmbuf != NULL; zmbuf = next) {
-			next = TAILQ_NEXT(zmbuf, next);
+	free_zmbuf(vq);
 
-			if (mbuf_is_consumed(zmbuf->mbuf)) {
-				update_shadow_packed(vq,
-						zmbuf->desc_idx,
-						0,
-						zmbuf->desc_count);
+	do {
+		if (remained >= PACKED_DESCS_BURST) {
+			ret = virtio_dev_tx_burst_packed_zmbuf(dev, vq,
+							       mbuf_pool,
+							       &pkts[pkt_idx]);
 
-				TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
-				restore_mbuf(zmbuf->mbuf);
-				rte_pktmbuf_free(zmbuf->mbuf);
-				put_zmbuf(zmbuf);
-				vq->nr_zmbuf -= 1;
+			if (!ret) {
+				pkt_idx += PACKED_DESCS_BURST;
+				remained -= PACKED_DESCS_BURST;
+				continue;
 			}
 		}
 
-		if (likely(vq->shadow_used_idx)) {
-			flush_dequeue_shadow_packed(dev, vq);
-			vhost_vring_call_packed(dev, vq);
-		}
-	}
-
-	VHOST_LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
-
-	count = RTE_MIN(count, MAX_PKT_BURST);
-	VHOST_LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",
-			dev->vid, count);
+		if (virtio_dev_tx_single_packed_zmbuf(dev, vq, mbuf_pool,
+						      &pkts[pkt_idx]))
+			break;
 
-	for (i = 0; i < count; i++) {
-		struct buf_vector buf_vec[BUF_VECTOR_MAX];
-		uint16_t buf_id;
-		uint32_t dummy_len;
-		uint16_t desc_count, nr_vec = 0;
-		int err;
+		pkt_idx++;
+		remained--;
+	} while (remained);
 
-		if (unlikely(fill_vec_buf_packed(dev, vq,
-						vq->last_avail_idx, &desc_count,
-						buf_vec, &nr_vec,
-						&buf_id, &dummy_len,
-						VHOST_ACCESS_RO) < 0))
-			break;
+	if (pkt_idx)
+		vhost_vring_call_packed(dev, vq);
 
-		if (likely(dev->dequeue_zero_copy == 0))
-			update_shadow_packed(vq, buf_id, 0,
-					desc_count);
+	return pkt_idx;
+}
 
-		pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
-		if (unlikely(pkts[i] == NULL)) {
-			RTE_LOG(ERR, VHOST_DATA,
-				"Failed to allocate memory for mbuf.\n");
-			break;
-		}
+static __rte_noinline uint16_t
+virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint32_t count)
+{
+	uint32_t pkt_idx = 0;
+	uint32_t remained = count;
+	uint16_t fetch_idx;
+	int ret;
+	struct vring_packed_desc *descs = vq->desc_packed;
 
-		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
-				mbuf_pool);
-		if (unlikely(err)) {
-			rte_pktmbuf_free(pkts[i]);
-			break;
+	do {
+		if ((vq->last_avail_idx & 0x7) == 0) {
+			fetch_idx = vq->last_avail_idx + 8;
+			rte_prefetch0((void *)(uintptr_t)&descs[fetch_idx]);
 		}
 
-		if (unlikely(dev->dequeue_zero_copy)) {
-			struct zcopy_mbuf *zmbuf;
+		if (remained >= PACKED_DESCS_BURST) {
+			ret = virtio_dev_tx_burst_packed(dev, vq, mbuf_pool,
+							 &pkts[pkt_idx]);
 
-			zmbuf = get_zmbuf(vq);
-			if (!zmbuf) {
-				rte_pktmbuf_free(pkts[i]);
-				break;
+			if (!ret) {
+				flush_dequeue_packed(dev, vq);
+				pkt_idx += PACKED_DESCS_BURST;
+				remained -= PACKED_DESCS_BURST;
+				continue;
 			}
-			zmbuf->mbuf = pkts[i];
-			zmbuf->desc_idx = buf_id;
-			zmbuf->desc_count = desc_count;
+		}
 
-			/*
-			 * Pin lock the mbuf; we will check later to see
-			 * whether the mbuf is freed (when we are the last
-			 * user) or not. If that's the case, we then could
-			 * update the used ring safely.
-			 */
-			rte_mbuf_refcnt_update(pkts[i], 1);
+		/*
+		 * If remained descs can't bundled into one burst, just skip to
+		 * next round.
+		 */
+		if (((vq->last_avail_idx & PACKED_BURST_MASK) + remained) <
+			PACKED_DESCS_BURST)
+			break;
 
-			vq->nr_zmbuf += 1;
-			TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
-		}
+		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
+						&pkts[pkt_idx]))
+			break;
 
-		vq->last_avail_idx += desc_count;
-		if (vq->last_avail_idx >= vq->size) {
-			vq->last_avail_idx -= vq->size;
-			vq->avail_wrap_counter ^= 1;
-		}
-	}
+		pkt_idx++;
+		remained--;
+		flush_dequeue_packed(dev, vq);
 
-	if (likely(dev->dequeue_zero_copy == 0)) {
-		do_data_copy_dequeue(vq);
-		if (unlikely(i < count))
-			vq->shadow_used_idx = i;
-		if (likely(vq->shadow_used_idx)) {
-			flush_dequeue_shadow_packed(dev, vq);
-			vhost_vring_call_packed(dev, vq);
-		}
+	} while (remained);
+
+	if (pkt_idx) {
+		if (vq->shadow_used_idx)
+			do_data_copy_dequeue(vq);
 	}
 
-	return i;
+	return pkt_idx;
 }
 
+
 uint16_t
 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
@@ -2235,9 +2201,14 @@  rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 		count -= 1;
 	}
 
-	if (vq_is_packed(dev))
-		count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
-	else
+	if (vq_is_packed(dev)) {
+		if (unlikely(dev->dequeue_zero_copy))
+			count = virtio_dev_tx_packed_zmbuf(dev, vq, mbuf_pool,
+							   pkts, count);
+		else
+			count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts,
+						     count);
+	} else
 		count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
 
 out: