[dpdk-dev] [PATCH 08/11] net/vhostpci: add RX function

Zhiyong Yang zhiyong.yang at intel.com
Thu Nov 30 10:46:54 CET 2017


Add the functions to support receiving packets.

Signed-off-by: Zhiyong Yang <zhiyong.yang at intel.com>
---
 drivers/net/vhostpci/vhostpci_ethdev.c | 311 +++++++++++++++++++++++++++++++++
 1 file changed, 311 insertions(+)

diff --git a/drivers/net/vhostpci/vhostpci_ethdev.c b/drivers/net/vhostpci/vhostpci_ethdev.c
index 0582f73b7..06e3f5c50 100644
--- a/drivers/net/vhostpci/vhostpci_ethdev.c
+++ b/drivers/net/vhostpci/vhostpci_ethdev.c
@@ -49,6 +49,10 @@
 #include "vhostpci_logs.h"
 #include "vhostpci_ethdev.h"
 
+#define MAX_BATCH_LEN 256
+#define VHOSTPCI_MAX_PKT_BURST 32
+#define VHOSTPCI_BUF_VECTOR_MAX 256
+
 static void
 vhostpci_dev_info_get(struct rte_eth_dev *dev,
 		struct rte_eth_dev_info *dev_info);
@@ -92,6 +96,10 @@ vhostpci_dev_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
 static int
 vhostpci_init_device(struct rte_eth_dev *eth_dev, uint64_t req_features);
 
+static uint16_t
+vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count);
+
 static int
 vhostpci_dev_start(struct rte_eth_dev *dev);
 
@@ -313,6 +321,308 @@ vhostpci_dev_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
 	return 0;
 }
 
+static __rte_always_inline void
+update_used_ring(struct vhostpci_virtqueue *vq,
+		 uint32_t used_idx, uint32_t desc_idx)
+{
+	vq->used->ring[used_idx].id  = desc_idx;
+	vq->used->ring[used_idx].len = 0;
+}
+
+static __rte_always_inline int
+copy_desc_to_mbuf(struct vhostpci_net *dev, struct vhostpci_virtqueue *vq,
+		  struct vring_desc *descs, uint16_t max_desc,
+		  struct rte_mbuf *m, uint16_t desc_idx,
+		  struct rte_mempool *mbuf_pool)
+{
+	struct vring_desc *desc;
+	uint64_t desc_addr;
+	uint32_t desc_avail, desc_offset;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	struct rte_mbuf *cur = m, *prev = m;
+	/* A counter to avoid desc dead loop chain */
+	uint32_t nr_desc = 1;
+	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+	uint16_t copy_nb = vq->batch_copy_nb_elems;
+	int error = 0;
+
+	desc = &descs[desc_idx];
+	if (unlikely(desc->len < dev->vhost_hlen)) {
+		error = -1;
+		goto out;
+	}
+
+	desc_addr = remote_gpa_to_vva(dev, desc->addr);
+
+	if (unlikely(!desc_addr)) {
+		error = -1;
+		goto out;
+	}
+
+	/**
+	 * A virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (likely((desc->len == dev->vhost_hlen) &&
+		   (desc->flags & VRING_DESC_F_NEXT) != 0)) {
+		desc = &descs[desc->next];
+		if (unlikely(desc->flags & VRING_DESC_F_INDIRECT)) {
+			error = -1;
+			goto out;
+		}
+
+		desc_addr = remote_gpa_to_vva(dev, desc->addr);
+		if (unlikely(!desc_addr)) {
+			error = -1;
+			goto out;
+		}
+
+		desc_offset = 0;
+		desc_avail  = desc->len;
+		nr_desc    += 1;
+	} else {
+		desc_avail  = desc->len - dev->vhost_hlen;
+		desc_offset = dev->vhost_hlen;
+	}
+
+	rte_prefetch0((void *)(uintptr_t)(desc_addr + desc_offset));
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+	while (1) {
+		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+		if (likely(cpy_len > MAX_BATCH_LEN ||
+			   copy_nb >= vq->size ||
+			   (cur == m))) {
+			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
+				mbuf_offset), (void *)((uintptr_t)(desc_addr +
+				desc_offset)), cpy_len);
+		} else {
+			batch_copy[copy_nb].dst =
+				rte_pktmbuf_mtod_offset(cur, void *,
+								mbuf_offset);
+			batch_copy[copy_nb].src =
+				(void *)((uintptr_t)(desc_addr +
+						     desc_offset));
+			batch_copy[copy_nb].len = cpy_len;
+			copy_nb++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		desc_avail  -= cpy_len;
+		desc_offset += cpy_len;
+
+		/* This desc reaches to its end, get the next one */
+		if (desc_avail == 0) {
+			if ((desc->flags & VRING_DESC_F_NEXT) == 0)
+				break;
+
+			if (unlikely(desc->next >= max_desc ||
+				     ++nr_desc > max_desc)) {
+				error = -1;
+				goto out;
+			}
+			desc = &descs[desc->next];
+			if (unlikely(desc->flags & VRING_DESC_F_INDIRECT)) {
+				error = -1;
+				goto out;
+			}
+
+			desc_addr = remote_gpa_to_vva(dev, desc->addr);
+			if (unlikely(!desc_addr)) {
+				error = -1;
+				goto out;
+			}
+
+			rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+			desc_offset = 0;
+			desc_avail  = desc->len;
+
+		}
+
+		/**
+		 * This mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				error = -1;
+				goto out;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len    += mbuf_offset;
+
+out:
+	vq->batch_copy_nb_elems = copy_nb;
+
+	return error;
+}
+
+static inline void
+do_data_copy_dequeue(struct vhostpci_virtqueue *vq)
+{
+	struct batch_copy_elem *elem = vq->batch_copy_elems;
+	uint16_t count = vq->batch_copy_nb_elems;
+	int i;
+
+	for (i = 0; i < count; i++)
+		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+}
+
+static __rte_always_inline void
+update_used_idx(struct vhostpci_virtqueue *vq, uint32_t count)
+{
+	if (unlikely(count == 0))
+		return;
+
+	rte_smp_wmb();
+	rte_smp_rmb();
+
+	vq->used->idx += count;
+}
+
+static uint16_t
+vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count)
+{
+	struct vhostpci_virtqueue *vq;
+	uint32_t desc_indexes[VHOSTPCI_MAX_PKT_BURST];
+	uint32_t used_idx;
+	uint32_t i = 0;
+	uint16_t free_entries;
+	uint16_t avail_idx;
+
+	if (!dev)
+		return 0;
+
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring)))
+		return 0;
+
+	vq = dev->virtqueue[queue_id];
+	if (unlikely(vq->enabled == 0))
+		return 0;
+
+	vq->batch_copy_nb_elems = 0;
+
+	free_entries = *((volatile uint16_t *)&vq->avail->idx) -
+			vq->last_avail_idx;
+	if (free_entries == 0)
+		return 0;
+
+	/* Prefetch available and used ring */
+	avail_idx = vq->last_avail_idx & (vq->size - 1);
+	used_idx  = vq->last_used_idx  & (vq->size - 1);
+	rte_prefetch0(&vq->avail->ring[avail_idx]);
+	rte_prefetch0(&vq->used->ring[used_idx]);
+
+	count = RTE_MIN(count, VHOSTPCI_MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+
+	/* Retrieve all of the head indexes first to avoid caching issues. */
+	for (i = 0; i < count; i++) {
+		avail_idx = (vq->last_avail_idx + i) & (vq->size - 1);
+		used_idx  = (vq->last_used_idx  + i) & (vq->size - 1);
+		desc_indexes[i] = vq->avail->ring[avail_idx];
+		update_used_ring(vq, used_idx, desc_indexes[i]);
+	}
+
+	/* Prefetch descriptor index. */
+	rte_prefetch0(&vq->desc[desc_indexes[0]]);
+	for (i = 0; i < count; i++) {
+		struct vring_desc *desc;
+		uint16_t sz, idx;
+		int err;
+
+		if (likely(i + 1 < count))
+			rte_prefetch0(&vq->desc[desc_indexes[i + 1]]);
+
+		desc = vq->desc;
+		sz = vq->size;
+		idx = desc_indexes[i];
+
+		pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
+		if (unlikely(pkts[i] == NULL))
+			break;
+
+		err = copy_desc_to_mbuf(dev, vq, desc, sz, pkts[i], idx,
+					mbuf_pool);
+		if (unlikely(err)) {
+			rte_pktmbuf_free(pkts[i]);
+			break;
+		}
+
+	}
+	vq->last_avail_idx += i;
+
+	do_data_copy_dequeue(vq);
+	vq->last_used_idx += i;
+	update_used_idx(vq, i);
+
+	return i;
+}
+
+static uint16_t
+eth_vhostpci_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhostpci_queue *r = q;
+	uint16_t i, nb_rx = 0;
+	uint16_t nb_receive = nb_bufs;
+
+	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&r->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+		goto out;
+
+	/* Dequeue packets from TX queue in the other guest */
+	while (nb_receive) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_receive,
+						 VHOSTPCI_MAX_PKT_BURST);
+
+		nb_pkts = vhostpci_dequeue_burst(r->vpnet, r->virtqueue_id,
+						 r->mb_pool, &bufs[nb_rx],
+						 num);
+
+		nb_rx += nb_pkts;
+		nb_receive -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	r->stats.pkts += nb_rx;
+
+	for (i = 0; likely(i < nb_rx); i++) {
+		bufs[i]->port = r->port_id;
+		r->stats.bytes += bufs[i]->pkt_len;
+	}
+
+out:
+	rte_atomic32_set(&r->while_queuing, 0);
+
+	return nb_rx;
+}
+
 static int
 vhostpci_dev_atomic_read_link_status(struct rte_eth_dev *dev,
 		struct rte_eth_link *link)
@@ -716,6 +1026,7 @@ eth_vhostpci_dev_init(struct rte_eth_dev *eth_dev)
 		rte_intr_callback_register(eth_dev->intr_handle,
 			vhostpci_interrupt_handler, eth_dev);
 
+	eth_dev->rx_pkt_burst = &eth_vhostpci_rx;
 	return 0;
 }
 
-- 
2.13.3



More information about the dev mailing list