[dpdk-dev] [PATCH] vhost: remove lockless enqueue to the virtio ring
Huawei Xie
huawei.xie at intel.com
Mon Jan 4 15:46:27 CET 2016
This patch removes the internal lockless enqueue implmentation.
DPDK doesn't support receiving/transmitting packets from/to the same
queue. Vhost PMD wraps vhost device as normal DPDK port. DPDK
applications normally have their own lock implmentation when enqueue
packets to the same queue of a port.
The atomic cmpset is a costly operation. This patch should help
performance a bit.
Signed-off-by: Huawei Xie <huawei.xie at intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 86 +++++++++++++------------------------------
1 file changed, 25 insertions(+), 61 deletions(-)
diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index bbf3fac..26a1b9c 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -69,10 +69,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
uint64_t buff_hdr_addr = 0;
uint32_t head[MAX_PKT_BURST];
uint32_t head_idx, packet_success = 0;
- uint16_t avail_idx, res_cur_idx;
- uint16_t res_base_idx, res_end_idx;
+ uint16_t avail_idx, res_cur_idx, res_end_idx;
uint16_t free_entries;
- uint8_t success = 0;
LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
@@ -88,29 +86,18 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
- /*
- * As many data cores may want access to available buffers,
- * they need to be reserved.
- */
- do {
- res_base_idx = vq->last_used_idx_res;
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- free_entries = (avail_idx - res_base_idx);
- /*check that we have enough buffers*/
- if (unlikely(count > free_entries))
- count = free_entries;
-
- if (count == 0)
- return 0;
-
- res_end_idx = res_base_idx + count;
- /* vq->last_used_idx_res is atomically updated. */
- /* TODO: Allow to disable cmpset if no concurrency in application. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx, res_end_idx);
- } while (unlikely(success == 0));
- res_cur_idx = res_base_idx;
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+ free_entries = (avail_idx - vq->last_used_idx_res);
+ /*check that we have enough buffers*/
+ if (unlikely(count > free_entries))
+ count = free_entries;
+ if (count == 0)
+ return 0;
+
+ res_cur_idx = vq->last_used_idx_res;
+ res_end_idx = res_cur_idx + count;
+ vq->last_used_idx_res = res_end_idx;
+
LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
dev->device_fh, res_cur_idx, res_end_idx);
@@ -230,10 +217,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
rte_compiler_barrier();
- /* Wait until it's our turn to add our buffer to the used ring. */
- while (unlikely(vq->last_used_idx != res_base_idx))
- rte_pause();
-
*(volatile uint16_t *)&vq->used->idx += count;
vq->last_used_idx = res_end_idx;
@@ -474,7 +457,6 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
uint32_t pkt_idx = 0, entry_success = 0;
uint16_t avail_idx;
uint16_t res_base_idx, res_cur_idx;
- uint8_t success = 0;
LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
dev->device_fh);
@@ -496,46 +478,28 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;
+ uint32_t secure_len = 0;
+ uint32_t vec_idx = 0;
- do {
- /*
- * As many data cores may want access to available
- * buffers, they need to be reserved.
- */
- uint32_t secure_len = 0;
- uint32_t vec_idx = 0;
-
- res_base_idx = vq->last_used_idx_res;
- res_cur_idx = res_base_idx;
+ res_base_idx = res_cur_idx = vq->last_used_idx_res;
- do {
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
- if (unlikely(res_cur_idx == avail_idx))
- goto merge_rx_exit;
+ do {
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+ if (unlikely(res_cur_idx == avail_idx))
+ goto merge_rx_exit;
- update_secure_len(vq, res_cur_idx,
- &secure_len, &vec_idx);
- res_cur_idx++;
- } while (pkt_len > secure_len);
+ update_secure_len(vq, res_cur_idx,
+ &secure_len, &vec_idx);
+ res_cur_idx++;
+ } while (pkt_len > secure_len);
- /* vq->last_used_idx_res is atomically updated. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx,
- res_cur_idx);
- } while (success == 0);
+ vq->last_used_idx_res = res_cur_idx;
entry_success = copy_from_mbuf_to_vring(dev, queue_id,
res_base_idx, res_cur_idx, pkts[pkt_idx]);
rte_compiler_barrier();
- /*
- * Wait until it's our turn to add our buffer
- * to the used ring.
- */
- while (unlikely(vq->last_used_idx != res_base_idx))
- rte_pause();
-
*(volatile uint16_t *)&vq->used->idx += entry_success;
vq->last_used_idx = res_cur_idx;
}
--
1.8.1.4
More information about the dev
mailing list