[dpdk-dev] [PATCH v7 5/7] vhost: shadow used ring update

Yuanhan Liu yuanhan.liu at linux.intel.com
Fri Oct 14 11:34:36 CEST 2016


From: Zhihong Wang <zhihong.wang at intel.com>

The basic idea is to shadow the used ring update: update them into a
local buffer first, and then flush them all to the virtio used vring
at once in the end.

And since we do avail ring reservation before enqueuing data, we would
know which and how many descs will be used. Which means we could update
the shadow used ring at the reservation time. It also introduce another
slight advantage: we don't need access the desc->flag any more inside
copy_mbuf_to_desc_mergeable().

Signed-off-by: Zhihong Wang <zhihong.wang at intel.com>
Signed-off-by: Yuanhan Liu <yuanhan.liu at linux.intel.com>
---
 lib/librte_vhost/vhost.c      |  13 +++-
 lib/librte_vhost/vhost.h      |   3 +
 lib/librte_vhost/vhost_user.c |  23 +++++--
 lib/librte_vhost/virtio_net.c | 138 +++++++++++++++++++++++++-----------------
 4 files changed, 113 insertions(+), 64 deletions(-)

diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index 469117a..d8116ff 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -121,9 +121,18 @@ static void
 free_device(struct virtio_net *dev)
 {
 	uint32_t i;
+	struct vhost_virtqueue *rxq, *txq;
 
-	for (i = 0; i < dev->virt_qp_nb; i++)
-		rte_free(dev->virtqueue[i * VIRTIO_QNUM]);
+	for (i = 0; i < dev->virt_qp_nb; i++) {
+		rxq = dev->virtqueue[i * VIRTIO_QNUM + VIRTIO_RXQ];
+		txq = dev->virtqueue[i * VIRTIO_QNUM + VIRTIO_TXQ];
+
+		rte_free(rxq->shadow_used_ring);
+		rte_free(txq->shadow_used_ring);
+
+		/* rxq and txq are allocated together as queue-pair */
+		rte_free(rxq);
+	}
 
 	rte_free(dev);
 }
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 17c557f..acec772 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -105,6 +105,9 @@ struct vhost_virtqueue {
 	uint16_t		last_zmbuf_idx;
 	struct zcopy_mbuf	*zmbufs;
 	struct zcopy_mbuf_list	zmbuf_list;
+
+	struct vring_used_elem  *shadow_used_ring;
+	uint16_t                shadow_used_idx;
 } __rte_cache_aligned;
 
 /* Old kernels have no such macro defined */
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 3074227..6b83c15 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -198,6 +198,15 @@ vhost_user_set_vring_num(struct virtio_net *dev,
 		}
 	}
 
+	vq->shadow_used_ring = rte_malloc(NULL,
+				vq->size * sizeof(struct vring_used_elem),
+				RTE_CACHE_LINE_SIZE);
+	if (!vq->shadow_used_ring) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"failed to allocate memory for shadow used ring.\n");
+		return -1;
+	}
+
 	return 0;
 }
 
@@ -711,6 +720,8 @@ static int
 vhost_user_get_vring_base(struct virtio_net *dev,
 			  struct vhost_vring_state *state)
 {
+	struct vhost_virtqueue *vq = dev->virtqueue[state->index];
+
 	/* We have to stop the queue (virtio) if it is running. */
 	if (dev->flags & VIRTIO_DEV_RUNNING) {
 		dev->flags &= ~VIRTIO_DEV_RUNNING;
@@ -718,7 +729,7 @@ vhost_user_get_vring_base(struct virtio_net *dev,
 	}
 
 	/* Here we are safe to get the last used index */
-	state->num = dev->virtqueue[state->index]->last_used_idx;
+	state->num = vq->last_used_idx;
 
 	RTE_LOG(INFO, VHOST_CONFIG,
 		"vring base idx:%d file:%d\n", state->index, state->num);
@@ -727,13 +738,15 @@ vhost_user_get_vring_base(struct virtio_net *dev,
 	 * sent and only sent in vhost_vring_stop.
 	 * TODO: cleanup the vring, it isn't usable since here.
 	 */
-	if (dev->virtqueue[state->index]->kickfd >= 0)
-		close(dev->virtqueue[state->index]->kickfd);
+	if (vq->kickfd >= 0)
+		close(vq->kickfd);
 
-	dev->virtqueue[state->index]->kickfd = VIRTIO_UNINITIALIZED_EVENTFD;
+	vq->kickfd = VIRTIO_UNINITIALIZED_EVENTFD;
 
 	if (dev->dequeue_zero_copy)
-		free_zmbufs(dev->virtqueue[state->index]);
+		free_zmbufs(vq);
+	rte_free(vq->shadow_used_ring);
+	vq->shadow_used_ring = NULL;
 
 	return 0;
 }
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index b5ba633..2bdc2fe 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -91,6 +91,56 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb)
 	return (is_tx ^ (idx & 1)) == 0 && idx < qp_nb * VIRTIO_QNUM;
 }
 
+static inline void __attribute__((always_inline))
+do_flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			  uint16_t to, uint16_t from, uint16_t size)
+{
+	rte_memcpy(&vq->used->ring[to],
+			&vq->shadow_used_ring[from],
+			size * sizeof(struct vring_used_elem));
+	vhost_log_used_vring(dev, vq,
+			offsetof(struct vring_used, ring[to]),
+			size * sizeof(struct vring_used_elem));
+}
+
+static inline void __attribute__((always_inline))
+flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq)
+{
+	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
+
+	if (used_idx + vq->shadow_used_idx <= vq->size) {
+		do_flush_shadow_used_ring(dev, vq, used_idx, 0,
+					  vq->shadow_used_idx);
+	} else {
+		uint16_t size;
+
+		/* update used ring interval [used_idx, vq->size] */
+		size = vq->size - used_idx;
+		do_flush_shadow_used_ring(dev, vq, used_idx, 0, size);
+
+		/* update the left half used ring interval [0, left_size] */
+		do_flush_shadow_used_ring(dev, vq, 0, size,
+					  vq->shadow_used_idx - size);
+	}
+	vq->last_used_idx += vq->shadow_used_idx;
+
+	rte_smp_wmb();
+
+	*(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx;
+	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
+		sizeof(vq->used->idx));
+}
+
+static inline void __attribute__((always_inline))
+update_shadow_used_ring(struct vhost_virtqueue *vq,
+			 uint16_t desc_idx, uint16_t len)
+{
+	uint16_t i = vq->shadow_used_idx++;
+
+	vq->shadow_used_ring[i].id  = desc_idx;
+	vq->shadow_used_ring[i].len = len;
+}
+
 static void
 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
 {
@@ -300,15 +350,16 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	return count;
 }
 
-static inline int
+static inline int __attribute__((always_inline))
 fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
-	     uint32_t *allocated, uint32_t *vec_idx,
-	     struct buf_vector *buf_vec)
+	     uint32_t *vec_idx, struct buf_vector *buf_vec,
+	     uint16_t *desc_chain_head, uint16_t *desc_chain_len)
 {
 	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
 	uint32_t vec_id = *vec_idx;
-	uint32_t len    = *allocated;
+	uint32_t len    = 0;
 
+	*desc_chain_head = idx;
 	while (1) {
 		if (unlikely(vec_id >= BUF_VECTOR_MAX || idx >= vq->size))
 			return -1;
@@ -325,8 +376,8 @@ fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
 		idx = vq->desc[idx].next;
 	}
 
-	*allocated = len;
-	*vec_idx   = vec_id;
+	*desc_chain_len = len;
+	*vec_idx = vec_id;
 
 	return 0;
 }
@@ -340,26 +391,30 @@ reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
 {
 	uint16_t cur_idx;
 	uint16_t avail_idx;
-	uint32_t allocated = 0;
 	uint32_t vec_idx = 0;
 	uint16_t tries = 0;
 
-	cur_idx = vq->last_avail_idx;
+	uint16_t head_idx = 0;
+	uint16_t len = 0;
 
-	while (1) {
+	*num_buffers = 0;
+	cur_idx  = vq->last_avail_idx;
+
+	while (size > 0) {
 		avail_idx = *((volatile uint16_t *)&vq->avail->idx);
 		if (unlikely(cur_idx == avail_idx))
 			return -1;
 
-		if (unlikely(fill_vec_buf(vq, cur_idx, &allocated,
-					  &vec_idx, buf_vec) < 0))
+		if (unlikely(fill_vec_buf(vq, cur_idx, &vec_idx, buf_vec,
+					  &head_idx, &len) < 0))
 			return -1;
+		len = RTE_MIN(len, size);
+		update_shadow_used_ring(vq, head_idx, len);
+		size -= len;
 
 		cur_idx++;
 		tries++;
-
-		if (allocated >= size)
-			break;
+		*num_buffers += 1;
 
 		/*
 		 * if we tried all available ring items, and still
@@ -370,25 +425,19 @@ reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
 			return -1;
 	}
 
-	*num_buffers = cur_idx - vq->last_avail_idx;
 	return 0;
 }
 
 static inline int __attribute__((always_inline))
-copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
-			    struct rte_mbuf *m, struct buf_vector *buf_vec,
-			    uint16_t num_buffers)
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct rte_mbuf *m,
+			    struct buf_vector *buf_vec, uint16_t num_buffers)
 {
 	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
 	uint32_t vec_idx = 0;
-	uint16_t cur_idx = vq->last_used_idx;
 	uint64_t desc_addr;
-	uint32_t desc_chain_head;
-	uint32_t desc_chain_len;
 	uint32_t mbuf_offset, mbuf_avail;
 	uint32_t desc_offset, desc_avail;
 	uint32_t cpy_len;
-	uint16_t desc_idx, used_idx;
 	uint64_t hdr_addr, hdr_phys_addr;
 	struct rte_mbuf *hdr_mbuf;
 
@@ -409,34 +458,17 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
 
 	virtio_hdr.num_buffers = num_buffers;
 	LOG_DEBUG(VHOST_DATA, "(%d) RX: num merge buffers %d\n",
-		dev->vid, virtio_hdr.num_buffers);
+		dev->vid, num_buffers);
 
 	desc_avail  = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
 	desc_offset = dev->vhost_hlen;
-	desc_chain_head = buf_vec[vec_idx].desc_idx;
-	desc_chain_len = desc_offset;
 
 	mbuf_avail  = rte_pktmbuf_data_len(m);
 	mbuf_offset = 0;
 	while (mbuf_avail != 0 || m->next != NULL) {
 		/* done with current desc buf, get the next one */
 		if (desc_avail == 0) {
-			desc_idx = buf_vec[vec_idx].desc_idx;
 			vec_idx++;
-
-			if (!(vq->desc[desc_idx].flags & VRING_DESC_F_NEXT)) {
-				/* Update used ring with desc information */
-				used_idx = cur_idx++ & (vq->size - 1);
-				vq->used->ring[used_idx].id = desc_chain_head;
-				vq->used->ring[used_idx].len = desc_chain_len;
-				vhost_log_used_vring(dev, vq,
-					offsetof(struct vring_used,
-						 ring[used_idx]),
-					sizeof(vq->used->ring[used_idx]));
-				desc_chain_head = buf_vec[vec_idx].desc_idx;
-				desc_chain_len = 0;
-			}
-
 			desc_addr = gpa_to_vva(dev, buf_vec[vec_idx].buf_addr);
 			if (unlikely(!desc_addr))
 				return -1;
@@ -478,16 +510,8 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
 		mbuf_offset += cpy_len;
 		desc_avail  -= cpy_len;
 		desc_offset += cpy_len;
-		desc_chain_len += cpy_len;
 	}
 
-	used_idx = cur_idx & (vq->size - 1);
-	vq->used->ring[used_idx].id = desc_chain_head;
-	vq->used->ring[used_idx].len = desc_chain_len;
-	vhost_log_used_vring(dev, vq,
-		offsetof(struct vring_used, ring[used_idx]),
-		sizeof(vq->used->ring[used_idx]));
-
 	return 0;
 }
 
@@ -515,6 +539,7 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 	if (count == 0)
 		return 0;
 
+	vq->shadow_used_idx = 0;
 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
 
@@ -523,23 +548,22 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 			LOG_DEBUG(VHOST_DATA,
 				"(%d) failed to get enough desc from vring\n",
 				dev->vid);
+			vq->shadow_used_idx -= num_buffers;
 			break;
 		}
 
-		if (copy_mbuf_to_desc_mergeable(dev, vq, pkts[pkt_idx],
-						buf_vec, num_buffers) < 0)
+		if (copy_mbuf_to_desc_mergeable(dev, pkts[pkt_idx],
+						buf_vec, num_buffers) < 0) {
+			vq->shadow_used_idx -= num_buffers;
 			break;
+		}
 
-		rte_smp_wmb();
-
-		*(volatile uint16_t *)&vq->used->idx += num_buffers;
-		vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
-			sizeof(vq->used->idx));
-		vq->last_used_idx += num_buffers;
 		vq->last_avail_idx += num_buffers;
 	}
 
-	if (likely(pkt_idx)) {
+	if (likely(vq->shadow_used_idx)) {
+		flush_shadow_used_ring(dev, vq);
+
 		/* flush used->idx update before we read avail->flags. */
 		rte_mb();
 
-- 
1.9.0



More information about the dev mailing list