[dpdk-dev] [PATCH] vhost: add back support for concurrent enqueue
Rich Lane
rich.lane at bigswitch.com
Mon Aug 15 22:00:24 CEST 2016
Concurrent enqueue is an important performance optimization when the number
of cores used for switching is different than the number of vhost queues.
I've observed a 20% performance improvement compared to a strategy that
binds queues to cores.
The atomic cmpset is only executed when the application calls
rte_vhost_enqueue_burst_mp. Benchmarks show no performance impact
when not using concurrent enqueue.
Mergeable RX buffers aren't supported by concurrent enqueue to minimize
code complexity.
Partially reverts 39449e74 ("vhost: remove concurrent enqueue") and
includes a fix from "vhost: avoid reordering of used->idx and last_used_idx
updating".
Signed-off-by: Rich Lane <rich.lane at bigswitch.com>
---
lib/librte_vhost/rte_vhost_version.map | 6 +++
lib/librte_vhost/rte_virtio_net.h | 19 +++++++++
lib/librte_vhost/vhost-net.h | 2 +
lib/librte_vhost/vhost_rxtx.c | 77 ++++++++++++++++++++++++++++++----
lib/librte_vhost/virtio-net.c | 2 +
5 files changed, 97 insertions(+), 9 deletions(-)
diff --git a/lib/librte_vhost/rte_vhost_version.map b/lib/librte_vhost/rte_vhost_version.map
index 5ceaa8a..ca9d49e 100644
--- a/lib/librte_vhost/rte_vhost_version.map
+++ b/lib/librte_vhost/rte_vhost_version.map
@@ -30,3 +30,9 @@ DPDK_16.07 {
rte_vhost_get_queue_num;
} DPDK_2.1;
+
+DPDK_16.11 {
+ global:
+
+ rte_vhost_enqueue_burst_mp
+} DPDK_16.07;
diff --git a/lib/librte_vhost/rte_virtio_net.h b/lib/librte_vhost/rte_virtio_net.h
index 9caa622..0f05917 100644
--- a/lib/librte_vhost/rte_virtio_net.h
+++ b/lib/librte_vhost/rte_virtio_net.h
@@ -175,6 +175,25 @@ uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
struct rte_mbuf **pkts, uint16_t count);
/**
+ * This function adds buffers to the virtio devices RX virtqueue. Buffers can
+ * be received from the physical port or from another virtual device. A packet
+ * count is returned to indicate the number of packets that were successfully
+ * added to the RX queue. This version is multi-producer safe.
+ * @param vid
+ * virtio-net device ID
+ * @param queue_id
+ * virtio queue index in mq case
+ * @param pkts
+ * array to contain packets to be enqueued
+ * @param count
+ * packets num to be enqueued
+ * @return
+ * num of packets enqueued
+ */
+uint16_t rte_vhost_enqueue_burst_mp(int vid, uint16_t queue_id,
+ struct rte_mbuf **pkts, uint16_t count);
+
+/**
* This function gets guest buffers from the virtio device TX virtqueue,
* construct host mbufs, copies guest buffer content to host mbufs and
* store them in pkts to be processed.
diff --git a/lib/librte_vhost/vhost-net.h b/lib/librte_vhost/vhost-net.h
index 38593a2..ea9c377 100644
--- a/lib/librte_vhost/vhost-net.h
+++ b/lib/librte_vhost/vhost-net.h
@@ -72,6 +72,8 @@ struct vhost_virtqueue {
/* Last index used on the available ring */
volatile uint16_t last_used_idx;
+ /* Used for multiple devices reserving buffers */
+ volatile uint16_t last_used_idx_res;
#define VIRTIO_INVALID_EVENTFD (-1)
#define VIRTIO_UNINITIALIZED_EVENTFD (-2)
diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 08a73fd..eea810e 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -212,6 +212,47 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
return 0;
}
+/*
+ * As many data cores may want to access available buffers
+ * they need to be reserved.
+ */
+static inline uint32_t
+reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count, bool mp,
+ uint16_t *start)
+{
+ uint16_t res_start_idx;
+ uint16_t avail_idx;
+ uint16_t free_entries;
+ int success;
+
+ count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+
+again:
+ res_start_idx = mp ? vq->last_used_idx_res : vq->last_used_idx;
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+ free_entries = avail_idx - res_start_idx;
+ count = RTE_MIN(count, free_entries);
+ if (count == 0)
+ return 0;
+
+ if (mp) {
+ uint16_t res_end_idx = res_start_idx + count;
+
+ /*
+ * update vq->last_used_idx_res atomically; try again if failed.
+ */
+ success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_end_idx);
+ if (unlikely(!success))
+ goto again;
+ }
+
+ *start = res_start_idx;
+
+ return count;
+}
+
/**
* This function adds buffers to the virtio devices RX virtqueue. Buffers can
* be received from the physical port or from another virtio device. A packet
@@ -221,10 +262,10 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
*/
static inline uint32_t __attribute__((always_inline))
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
- struct rte_mbuf **pkts, uint32_t count)
+ struct rte_mbuf **pkts, uint32_t count, bool mp)
{
struct vhost_virtqueue *vq;
- uint16_t avail_idx, free_entries, start_idx;
+ uint16_t start_idx;
uint16_t desc_indexes[MAX_PKT_BURST];
uint16_t used_idx;
uint32_t i;
@@ -240,11 +281,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
if (unlikely(vq->enabled == 0))
return 0;
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
- start_idx = vq->last_used_idx;
- free_entries = avail_idx - start_idx;
- count = RTE_MIN(count, free_entries);
- count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+ count = reserve_avail_buf(vq, count, mp, &start_idx);
if (count == 0)
return 0;
@@ -284,8 +321,16 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
rte_smp_wmb();
+ if (mp) {
+ /*
+ * Wait until it's our turn to add our buffer to the
+ * used ring.
+ */
+ while (unlikely(vq->last_used_idx != start_idx))
+ rte_pause();
+ }
+
*(volatile uint16_t *)&vq->used->idx += count;
- vq->last_used_idx += count;
vhost_log_used_vring(dev, vq,
offsetof(struct vring_used, idx),
sizeof(vq->used->idx));
@@ -293,6 +338,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
/* flush used->idx update before we read avail->flags. */
rte_mb();
+ vq->last_used_idx = start_idx + count;
+
/* Kick the guest if necessary. */
if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
&& (vq->callfd >= 0))
@@ -545,7 +592,19 @@ rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
return virtio_dev_merge_rx(dev, queue_id, pkts, count);
else
- return virtio_dev_rx(dev, queue_id, pkts, count);
+ return virtio_dev_rx(dev, queue_id, pkts, count, false);
+}
+
+uint16_t
+rte_vhost_enqueue_burst_mp(int vid, uint16_t queue_id,
+ struct rte_mbuf **pkts, uint16_t count)
+{
+ struct virtio_net *dev = get_device(vid);
+
+ if (!dev)
+ return 0;
+
+ return virtio_dev_rx(dev, queue_id, pkts, count, true);
}
static void
diff --git a/lib/librte_vhost/virtio-net.c b/lib/librte_vhost/virtio-net.c
index 1785695..9fc04ff 100644
--- a/lib/librte_vhost/virtio-net.c
+++ b/lib/librte_vhost/virtio-net.c
@@ -567,6 +567,7 @@ vhost_set_vring_addr(int vid, struct vhost_vring_addr *addr)
"some packets maybe resent for Tx and dropped for Rx\n",
vq->last_used_idx, vq->used->idx);
vq->last_used_idx = vq->used->idx;
+ vq->last_used_idx_res = vq->used->idx;
}
vq->log_guest_addr = addr->log_guest_addr;
@@ -598,6 +599,7 @@ vhost_set_vring_base(int vid, struct vhost_vring_state *state)
/* State->index refers to the queue index. The txq is 1, rxq is 0. */
dev->virtqueue[state->index]->last_used_idx = state->num;
+ dev->virtqueue[state->index]->last_used_idx_res = state->num;
return 0;
}
--
1.9.1
More information about the dev
mailing list