[dpdk-dev] [PATCH] vhost: remove lockless enqueue to the virtio ring

Huawei Xie huawei.xie at intel.com
Mon Jan 4 15:46:27 CET 2016


This patch removes the internal lockless enqueue implmentation.
DPDK doesn't support receiving/transmitting packets from/to the same
queue. Vhost PMD wraps vhost device as normal DPDK port. DPDK
applications normally have their own lock implmentation when enqueue
packets to the same queue of a port.

The atomic cmpset is a costly operation. This patch should help
performance a bit.

Signed-off-by: Huawei Xie <huawei.xie at intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 86 +++++++++++++------------------------------
 1 file changed, 25 insertions(+), 61 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index bbf3fac..26a1b9c 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -69,10 +69,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	uint64_t buff_hdr_addr = 0;
 	uint32_t head[MAX_PKT_BURST];
 	uint32_t head_idx, packet_success = 0;
-	uint16_t avail_idx, res_cur_idx;
-	uint16_t res_base_idx, res_end_idx;
+	uint16_t avail_idx, res_cur_idx, res_end_idx;
 	uint16_t free_entries;
-	uint8_t success = 0;
 
 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
 	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
@@ -88,29 +86,18 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 
 	count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
 
-	/*
-	 * As many data cores may want access to available buffers,
-	 * they need to be reserved.
-	 */
-	do {
-		res_base_idx = vq->last_used_idx_res;
-		avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
-		free_entries = (avail_idx - res_base_idx);
-		/*check that we have enough buffers*/
-		if (unlikely(count > free_entries))
-			count = free_entries;
-
-		if (count == 0)
-			return 0;
-
-		res_end_idx = res_base_idx + count;
-		/* vq->last_used_idx_res is atomically updated. */
-		/* TODO: Allow to disable cmpset if no concurrency in application. */
-		success = rte_atomic16_cmpset(&vq->last_used_idx_res,
-				res_base_idx, res_end_idx);
-	} while (unlikely(success == 0));
-	res_cur_idx = res_base_idx;
+	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+	free_entries = (avail_idx - vq->last_used_idx_res);
+	/*check that we have enough buffers*/
+	if (unlikely(count > free_entries))
+		count = free_entries;
+	if (count == 0)
+		return 0;
+
+	res_cur_idx = vq->last_used_idx_res;
+	res_end_idx = res_cur_idx + count;
+	vq->last_used_idx_res = res_end_idx;
+
 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
 			dev->device_fh, res_cur_idx, res_end_idx);
 
@@ -230,10 +217,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 
 	rte_compiler_barrier();
 
-	/* Wait until it's our turn to add our buffer to the used ring. */
-	while (unlikely(vq->last_used_idx != res_base_idx))
-		rte_pause();
-
 	*(volatile uint16_t *)&vq->used->idx += count;
 	vq->last_used_idx = res_end_idx;
 
@@ -474,7 +457,6 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 	uint32_t pkt_idx = 0, entry_success = 0;
 	uint16_t avail_idx;
 	uint16_t res_base_idx, res_cur_idx;
-	uint8_t success = 0;
 
 	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
 		dev->device_fh);
@@ -496,46 +478,28 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 
 	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
 		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;
+		uint32_t secure_len = 0;
+		uint32_t vec_idx = 0;
 
-		do {
-			/*
-			 * As many data cores may want access to available
-			 * buffers, they need to be reserved.
-			 */
-			uint32_t secure_len = 0;
-			uint32_t vec_idx = 0;
-
-			res_base_idx = vq->last_used_idx_res;
-			res_cur_idx = res_base_idx;
+		res_base_idx = res_cur_idx = vq->last_used_idx_res;
 
-			do {
-				avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-				if (unlikely(res_cur_idx == avail_idx))
-					goto merge_rx_exit;
+		do {
+			avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+			if (unlikely(res_cur_idx == avail_idx))
+				goto merge_rx_exit;
 
-				update_secure_len(vq, res_cur_idx,
-						  &secure_len, &vec_idx);
-				res_cur_idx++;
-			} while (pkt_len > secure_len);
+			update_secure_len(vq, res_cur_idx,
+					&secure_len, &vec_idx);
+			res_cur_idx++;
+		} while (pkt_len > secure_len);
 
-			/* vq->last_used_idx_res is atomically updated. */
-			success = rte_atomic16_cmpset(&vq->last_used_idx_res,
-							res_base_idx,
-							res_cur_idx);
-		} while (success == 0);
+		vq->last_used_idx_res = res_cur_idx;
 
 		entry_success = copy_from_mbuf_to_vring(dev, queue_id,
 			res_base_idx, res_cur_idx, pkts[pkt_idx]);
 
 		rte_compiler_barrier();
 
-		/*
-		 * Wait until it's our turn to add our buffer
-		 * to the used ring.
-		 */
-		while (unlikely(vq->last_used_idx != res_base_idx))
-			rte_pause();
-
 		*(volatile uint16_t *)&vq->used->idx += entry_success;
 		vq->last_used_idx = res_cur_idx;
 	}
-- 
1.8.1.4



More information about the dev mailing list