[dpdk-dev] [PATCH] vhost: add back support for concurrent enqueue

Rich Lane rich.lane at bigswitch.com
Mon Aug 15 22:00:24 CEST 2016


Concurrent enqueue is an important performance optimization when the number
of cores used for switching is different than the number of vhost queues.
I've observed a 20% performance improvement compared to a strategy that
binds queues to cores.

The atomic cmpset is only executed when the application calls
rte_vhost_enqueue_burst_mp. Benchmarks show no performance impact
when not using concurrent enqueue.

Mergeable RX buffers aren't supported by concurrent enqueue to minimize
code complexity.

Partially reverts 39449e74 ("vhost: remove concurrent enqueue") and
includes a fix from "vhost: avoid reordering of used->idx and last_used_idx
updating".

Signed-off-by: Rich Lane <rich.lane at bigswitch.com>
---
 lib/librte_vhost/rte_vhost_version.map |  6 +++
 lib/librte_vhost/rte_virtio_net.h      | 19 +++++++++
 lib/librte_vhost/vhost-net.h           |  2 +
 lib/librte_vhost/vhost_rxtx.c          | 77 ++++++++++++++++++++++++++++++----
 lib/librte_vhost/virtio-net.c          |  2 +
 5 files changed, 97 insertions(+), 9 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_version.map b/lib/librte_vhost/rte_vhost_version.map
index 5ceaa8a..ca9d49e 100644
--- a/lib/librte_vhost/rte_vhost_version.map
+++ b/lib/librte_vhost/rte_vhost_version.map
@@ -30,3 +30,9 @@ DPDK_16.07 {
 	rte_vhost_get_queue_num;
 
 } DPDK_2.1;
+
+DPDK_16.11 {
+	global:
+
+	rte_vhost_enqueue_burst_mp
+} DPDK_16.07;
diff --git a/lib/librte_vhost/rte_virtio_net.h b/lib/librte_vhost/rte_virtio_net.h
index 9caa622..0f05917 100644
--- a/lib/librte_vhost/rte_virtio_net.h
+++ b/lib/librte_vhost/rte_virtio_net.h
@@ -175,6 +175,25 @@ uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
 	struct rte_mbuf **pkts, uint16_t count);
 
 /**
+ * This function adds buffers to the virtio devices RX virtqueue. Buffers can
+ * be received from the physical port or from another virtual device. A packet
+ * count is returned to indicate the number of packets that were successfully
+ * added to the RX queue. This version is multi-producer safe.
+ * @param vid
+ *  virtio-net device ID
+ * @param queue_id
+ *  virtio queue index in mq case
+ * @param pkts
+ *  array to contain packets to be enqueued
+ * @param count
+ *  packets num to be enqueued
+ * @return
+ *  num of packets enqueued
+ */
+uint16_t rte_vhost_enqueue_burst_mp(int vid, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint16_t count);
+
+/**
  * This function gets guest buffers from the virtio device TX virtqueue,
  * construct host mbufs, copies guest buffer content to host mbufs and
  * store them in pkts to be processed.
diff --git a/lib/librte_vhost/vhost-net.h b/lib/librte_vhost/vhost-net.h
index 38593a2..ea9c377 100644
--- a/lib/librte_vhost/vhost-net.h
+++ b/lib/librte_vhost/vhost-net.h
@@ -72,6 +72,8 @@ struct vhost_virtqueue {
 
 	/* Last index used on the available ring */
 	volatile uint16_t	last_used_idx;
+	/* Used for multiple devices reserving buffers */
+	volatile uint16_t       last_used_idx_res;
 #define VIRTIO_INVALID_EVENTFD		(-1)
 #define VIRTIO_UNINITIALIZED_EVENTFD	(-2)
 
diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 08a73fd..eea810e 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -212,6 +212,47 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
+/*
+ * As many data cores may want to access available buffers
+ * they need to be reserved.
+ */
+static inline uint32_t
+reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count, bool mp,
+		  uint16_t *start)
+{
+	uint16_t res_start_idx;
+	uint16_t avail_idx;
+	uint16_t free_entries;
+	int success;
+
+	count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+
+again:
+	res_start_idx = mp ? vq->last_used_idx_res : vq->last_used_idx;
+	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+	free_entries = avail_idx - res_start_idx;
+	count = RTE_MIN(count, free_entries);
+	if (count == 0)
+		return 0;
+
+	if (mp) {
+		uint16_t res_end_idx = res_start_idx + count;
+
+		/*
+		* update vq->last_used_idx_res atomically; try again if failed.
+		*/
+		success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+					res_start_idx, res_end_idx);
+		if (unlikely(!success))
+			goto again;
+	}
+
+	*start = res_start_idx;
+
+	return count;
+}
+
 /**
  * This function adds buffers to the virtio devices RX virtqueue. Buffers can
  * be received from the physical port or from another virtio device. A packet
@@ -221,10 +262,10 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
  */
 static inline uint32_t __attribute__((always_inline))
 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
-	      struct rte_mbuf **pkts, uint32_t count)
+	      struct rte_mbuf **pkts, uint32_t count, bool mp)
 {
 	struct vhost_virtqueue *vq;
-	uint16_t avail_idx, free_entries, start_idx;
+	uint16_t start_idx;
 	uint16_t desc_indexes[MAX_PKT_BURST];
 	uint16_t used_idx;
 	uint32_t i;
@@ -240,11 +281,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	if (unlikely(vq->enabled == 0))
 		return 0;
 
-	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-	start_idx = vq->last_used_idx;
-	free_entries = avail_idx - start_idx;
-	count = RTE_MIN(count, free_entries);
-	count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+	count = reserve_avail_buf(vq, count, mp, &start_idx);
 	if (count == 0)
 		return 0;
 
@@ -284,8 +321,16 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 
 	rte_smp_wmb();
 
+	if (mp) {
+		/*
+		 * Wait until it's our turn to add our buffer to the
+		 * used ring.
+		 */
+		while (unlikely(vq->last_used_idx != start_idx))
+			rte_pause();
+	}
+
 	*(volatile uint16_t *)&vq->used->idx += count;
-	vq->last_used_idx += count;
 	vhost_log_used_vring(dev, vq,
 		offsetof(struct vring_used, idx),
 		sizeof(vq->used->idx));
@@ -293,6 +338,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	/* flush used->idx update before we read avail->flags. */
 	rte_mb();
 
+	vq->last_used_idx = start_idx + count;
+
 	/* Kick the guest if necessary. */
 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
 			&& (vq->callfd >= 0))
@@ -545,7 +592,19 @@ rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
 	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
 		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
 	else
-		return virtio_dev_rx(dev, queue_id, pkts, count);
+		return virtio_dev_rx(dev, queue_id, pkts, count, false);
+}
+
+uint16_t
+rte_vhost_enqueue_burst_mp(int vid, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+
+	if (!dev)
+		return 0;
+
+	return virtio_dev_rx(dev, queue_id, pkts, count, true);
 }
 
 static void
diff --git a/lib/librte_vhost/virtio-net.c b/lib/librte_vhost/virtio-net.c
index 1785695..9fc04ff 100644
--- a/lib/librte_vhost/virtio-net.c
+++ b/lib/librte_vhost/virtio-net.c
@@ -567,6 +567,7 @@ vhost_set_vring_addr(int vid, struct vhost_vring_addr *addr)
 			"some packets maybe resent for Tx and dropped for Rx\n",
 			vq->last_used_idx, vq->used->idx);
 		vq->last_used_idx     = vq->used->idx;
+		vq->last_used_idx_res = vq->used->idx;
 	}
 
 	vq->log_guest_addr = addr->log_guest_addr;
@@ -598,6 +599,7 @@ vhost_set_vring_base(int vid, struct vhost_vring_state *state)
 
 	/* State->index refers to the queue index. The txq is 1, rxq is 0. */
 	dev->virtqueue[state->index]->last_used_idx = state->num;
+	dev->virtqueue[state->index]->last_used_idx_res = state->num;
 
 	return 0;
 }
-- 
1.9.1



More information about the dev mailing list