[2/9] vhost: provide helpers for virtio ring relay

Message ID 20181128094607.106173-3-xiao.w.wang@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series support SW assisted VDPA live migration |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Xiao Wang Nov. 28, 2018, 9:46 a.m. UTC
  This patch provides two helpers for vdpa device driver to perform a
relay between the guest virtio ring and a mediate virtio ring.

The available ring relay will synchronize the available entries, and
helps to do desc validity checking.

The used ring relay will synchronize the used entries from mediate ring
to guest ring, and helps to do dirty page logging for live migration.

The next patch will leverage these two helpers.

Signed-off-by: Xiao Wang <xiao.w.wang@intel.com>
---
 lib/librte_vhost/rte_vdpa.h            |  38 ++++++++
 lib/librte_vhost/rte_vhost_version.map |   2 +
 lib/librte_vhost/vdpa.c                | 173 +++++++++++++++++++++++++++++++++
 lib/librte_vhost/vhost.h               |  40 ++++++++
 lib/librte_vhost/virtio_net.c          |  39 --------
 5 files changed, 253 insertions(+), 39 deletions(-)
  

Comments

Tiwei Bie Dec. 4, 2018, 6:22 a.m. UTC | #1
On Wed, Nov 28, 2018 at 05:46:00PM +0800, Xiao Wang wrote:
[...]
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Synchronize the available ring from guest to mediate ring, help to
> + * check desc validity to protect against malicious guest driver.
> + *
> + * @param vid
> + *  vhost device id
> + * @param qid
> + *  vhost queue id
> + * @param m_vring
> + *  mediate virtio ring pointer
> + * @return
> + *  number of synced available entries on success, -1 on failure
> + */
> +int __rte_experimental
> +rte_vdpa_relay_avail_ring(int vid, int qid, struct vring *m_vring);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Synchronize the used ring from mediate ring to guest, log dirty
> + * page for each Rx buffer used.
> + *
> + * @param vid
> + *  vhost device id
> + * @param qid
> + *  vhost queue id
> + * @param m_vring
> + *  mediate virtio ring pointer
> + * @return
> + *  number of synced used entries on success, -1 on failure
> + */
> +int __rte_experimental
> +rte_vdpa_relay_used_ring(int vid, int qid, struct vring *m_vring);

Above APIs are split ring specific. We also need to take
packed ring into consideration.

>  #endif /* _RTE_VDPA_H_ */
[...]
> diff --git a/lib/librte_vhost/vdpa.c b/lib/librte_vhost/vdpa.c
> index e7d849ee0..e41117776 100644
> --- a/lib/librte_vhost/vdpa.c
> +++ b/lib/librte_vhost/vdpa.c
> @@ -122,3 +122,176 @@ rte_vdpa_get_device_num(void)
>  {
>  	return vdpa_device_num;
>  }
> +
> +static int
> +invalid_desc_check(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
> +{
> +	uint64_t desc_addr, desc_chunck_len;
> +
> +	while (desc_len) {
> +		desc_chunck_len = desc_len;
> +		desc_addr = vhost_iova_to_vva(dev, vq,
> +				desc_iova,
> +				&desc_chunck_len,
> +				perm);
> +
> +		if (!desc_addr)
> +			return -1;
> +
> +		desc_len -= desc_chunck_len;
> +		desc_iova += desc_chunck_len;
> +	}
> +
> +	return 0;
> +}
> +
> +int
> +rte_vdpa_relay_avail_ring(int vid, int qid, struct vring *m_vring)
> +{
> +	struct virtio_net *dev = get_device(vid);
> +	uint16_t idx, idx_m, desc_id;
> +	struct vring_desc desc;
> +	struct vhost_virtqueue *vq;
> +	struct vring_desc *desc_ring;
> +	struct vring_desc *idesc = NULL;
> +	uint64_t dlen;
> +	int ret;
> +
> +	if (!dev)
> +		return -1;
> +
> +	vq = dev->virtqueue[qid];

Better to also validate qid.

> +	idx = vq->avail->idx;
> +	idx_m = m_vring->avail->idx;
> +	ret = idx - idx_m;

Need to cast (idx - idx_m) to uint16_t.

> +
> +	while (idx_m != idx) {
> +		/* avail entry copy */
> +		desc_id = vq->avail->ring[idx_m % vq->size];

idx_m & (vq->size - 1) should be faster.

> +		m_vring->avail->ring[idx_m % vq->size] = desc_id;
> +		desc_ring = vq->desc;
> +
> +		if (vq->desc[desc_id].flags & VRING_DESC_F_INDIRECT) {
> +			dlen = vq->desc[desc_id].len;
> +			desc_ring = (struct vring_desc *)(uintptr_t)
> +			vhost_iova_to_vva(dev, vq, vq->desc[desc_id].addr,

The indent needs to be fixed.

> +						&dlen,
> +						VHOST_ACCESS_RO);
> +			if (unlikely(!desc_ring))
> +				return -1;
> +
> +			if (unlikely(dlen < vq->desc[idx].len)) {
> +				idesc = alloc_copy_ind_table(dev, vq,
> +					vq->desc[idx].addr, vq->desc[idx].len);
> +				if (unlikely(!idesc))
> +					return -1;
> +
> +				desc_ring = idesc;
> +			}
> +
> +			desc_id = 0;
> +		}
> +
> +		/* check if the buf addr is within the guest memory */
> +		do {
> +			desc = desc_ring[desc_id];
> +			if (invalid_desc_check(dev, vq, desc.addr, desc.len,
> +						VHOST_ACCESS_RW))

Should check with < 0, otherwise should return bool.

We may just have RO access.

> +				return -1;

The memory allocated for idesc if any will leak in this case.

> +			desc_id = desc.next;
> +		} while (desc.flags & VRING_DESC_F_NEXT);
> +
> +		if (unlikely(!!idesc)) {

The !! isn't needed.

> +			free_ind_table(idesc);
> +			idesc = NULL;
> +		}
> +
> +		idx_m++;
> +	}
> +

Barrier is needed here.

> +	m_vring->avail->idx = idx;
> +
> +	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))
> +		vhost_avail_event(vq) = vq->avail->idx;

Need to use idx instead of vq->avail->idx which may
have already been changed by driver.

> +
> +	return ret;
> +}
> +
> +int
> +rte_vdpa_relay_used_ring(int vid, int qid, struct vring *m_vring)
> +{
> +	struct virtio_net *dev = get_device(vid);
> +	uint16_t idx, idx_m, desc_id;
> +	struct vhost_virtqueue *vq;
> +	struct vring_desc desc;
> +	struct vring_desc *desc_ring;
> +	struct vring_desc *idesc = NULL;
> +	uint64_t dlen;
> +	int ret;
> +
> +	if (!dev)
> +		return -1;
> +
> +	vq = dev->virtqueue[qid];

Better to also validate qid.

> +	idx = vq->used->idx;
> +	idx_m = m_vring->used->idx;
> +	ret = idx_m - idx;

Need to cast (idx_m - idx) to uint16_t.

> +
> +	while (idx != idx_m) {
> +		/* copy used entry, used ring logging is not covered here */

The used ring logging has been covered here by the following call
to vhost_log_used_vring() after used ring is changed.

> +		vq->used->ring[idx % vq->size] =

idx & (vq->size - 1) should be faster.

> +			m_vring->used->ring[idx % vq->size];
> +
> +		/* dirty page logging for used ring */
> +		vhost_log_used_vring(dev, vq,
> +			offsetof(struct vring_used, ring[idx % vq->size]),
> +			sizeof(struct vring_used_elem));
> +
> +		desc_id = vq->used->ring[idx % vq->size].id;
> +		desc_ring = vq->desc;
> +
> +		if (vq->desc[desc_id].flags & VRING_DESC_F_INDIRECT) {
> +			dlen = vq->desc[desc_id].len;
> +			desc_ring = (struct vring_desc *)(uintptr_t)
> +			vhost_iova_to_vva(dev, vq, vq->desc[desc_id].addr,

The indent needs to be fixed.

> +						&dlen,
> +						VHOST_ACCESS_RO);
> +			if (unlikely(!desc_ring))
> +				return -1;
> +
> +			if (unlikely(dlen < vq->desc[idx].len)) {
> +				idesc = alloc_copy_ind_table(dev, vq,
> +					vq->desc[idx].addr, vq->desc[idx].len);
> +				if (unlikely(!idesc))
> +					return -1;
> +
> +				desc_ring = idesc;
> +			}
> +
> +			desc_id = 0;
> +		}
> +
> +		/* dirty page logging for Rx buffer */

Rx is for net, this API isn't net specific.

> +		do {
> +			desc = desc_ring[desc_id];
> +			if (desc.flags & VRING_DESC_F_WRITE)
> +				vhost_log_write(dev, desc.addr, desc.len);
> +			desc_id = desc.next;
> +		} while (desc.flags & VRING_DESC_F_NEXT);
> +
> +		if (unlikely(!!idesc)) {

The !! isn't needed.

> +			free_ind_table(idesc);
> +			idesc = NULL;
> +		}
> +
> +		idx++;
> +	}
> +

Barrier is needed here.

> +	vq->used->idx = idx_m;
> +
> +	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))
> +		vring_used_event(m_vring) = m_vring->used->idx;
> +
> +	return ret;
> +}
[...]
  
Xiao Wang Dec. 12, 2018, 6:51 a.m. UTC | #2
Hi,

> -----Original Message-----
> From: Bie, Tiwei
> Sent: Monday, December 3, 2018 10:23 PM
> To: Wang, Xiao W <xiao.w.wang@intel.com>
> Cc: maxime.coquelin@redhat.com; dev@dpdk.org; Wang, Zhihong
> <zhihong.wang@intel.com>; Ye, Xiaolong <xiaolong.ye@intel.com>
> Subject: Re: [PATCH 2/9] vhost: provide helpers for virtio ring relay
> 
> On Wed, Nov 28, 2018 at 05:46:00PM +0800, Xiao Wang wrote:
> [...]
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Synchronize the available ring from guest to mediate ring, help to
> > + * check desc validity to protect against malicious guest driver.
> > + *
> > + * @param vid
> > + *  vhost device id
> > + * @param qid
> > + *  vhost queue id
> > + * @param m_vring
> > + *  mediate virtio ring pointer
> > + * @return
> > + *  number of synced available entries on success, -1 on failure
> > + */
> > +int __rte_experimental
> > +rte_vdpa_relay_avail_ring(int vid, int qid, struct vring *m_vring);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Synchronize the used ring from mediate ring to guest, log dirty
> > + * page for each Rx buffer used.
> > + *
> > + * @param vid
> > + *  vhost device id
> > + * @param qid
> > + *  vhost queue id
> > + * @param m_vring
> > + *  mediate virtio ring pointer
> > + * @return
> > + *  number of synced used entries on success, -1 on failure
> > + */
> > +int __rte_experimental
> > +rte_vdpa_relay_used_ring(int vid, int qid, struct vring *m_vring);
> 
> Above APIs are split ring specific. We also need to take
> packed ring into consideration.

After some study on the current packed ring description, several ideas:
1. These APIs are used as helpers to setup a mediate relay layer to help do dirty page logging, we may not need
 this kind of ring relay for packed ring at all. The target of a mediate SW layer is to help device do dirty page
 logging, so this SW-assisted VDPA tries to find a way to intercept the frontend-backend communication, as you
 can see in this patch set, SW captures the device interrupt and then parse the vring and log dirty page
 afterwards. We set up this mediate vring to make sure the relay SW can intercept the device interrupt, as you
 know, this way we can control the mediate vring's interrupt suppression structure.

2.One new point about the packed ring is that it separates out the event suppression structure from the
description ring. So in this case, we can just set up a mediate event suppression structure to intercept event
 notification.

BTW, I find one troublesome point about the packed ring is that it's hard for a mediate SW to quickly handle the
 "buffer id", guest virtio driver understands this id well, it keeps some internal info about each id, e.g. chain list
 length, but the relay SW has to parse the packed ring again, which is not efficient.

3. In the split vring, relay SW reuses the guest desc vring, and desc is not writed by DMA, so no log for the desc.
 But in the packed vring, desc is writed by DMA, desc ring's logging is a new thing.
Packed ring is quite different, it could be a very different mechanism, other than following a vring relay API. Also
 from testing point of view, if we come out with a new efficient implementation for packed ring VDPA, it's hard to
 test it with HW. Testing need a HW supporting packed ring DMA and the get_vring_base/set_vring_base
 interface.

> 
> >  #endif /* _RTE_VDPA_H_ */
> [...]
> > diff --git a/lib/librte_vhost/vdpa.c b/lib/librte_vhost/vdpa.c
> > index e7d849ee0..e41117776 100644
> > --- a/lib/librte_vhost/vdpa.c
> > +++ b/lib/librte_vhost/vdpa.c
> > @@ -122,3 +122,176 @@ rte_vdpa_get_device_num(void)
> >  {
> >  	return vdpa_device_num;
> >  }
> > +
> > +static int
> > +invalid_desc_check(struct virtio_net *dev, struct vhost_virtqueue *vq,
> > +		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
> > +{
> > +	uint64_t desc_addr, desc_chunck_len;
> > +
> > +	while (desc_len) {
> > +		desc_chunck_len = desc_len;
> > +		desc_addr = vhost_iova_to_vva(dev, vq,
> > +				desc_iova,
> > +				&desc_chunck_len,
> > +				perm);
> > +
> > +		if (!desc_addr)
> > +			return -1;
> > +
> > +		desc_len -= desc_chunck_len;
> > +		desc_iova += desc_chunck_len;
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> > +int
> > +rte_vdpa_relay_avail_ring(int vid, int qid, struct vring *m_vring)
> > +{
> > +	struct virtio_net *dev = get_device(vid);
> > +	uint16_t idx, idx_m, desc_id;
> > +	struct vring_desc desc;
> > +	struct vhost_virtqueue *vq;
> > +	struct vring_desc *desc_ring;
> > +	struct vring_desc *idesc = NULL;
> > +	uint64_t dlen;
> > +	int ret;
> > +
> > +	if (!dev)
> > +		return -1;
> > +
> > +	vq = dev->virtqueue[qid];
> 
> Better to also validate qid.
> 
> > +	idx = vq->avail->idx;
> > +	idx_m = m_vring->avail->idx;
> > +	ret = idx - idx_m;
> 
> Need to cast (idx - idx_m) to uint16_t.
> 
> > +
> > +	while (idx_m != idx) {
> > +		/* avail entry copy */
> > +		desc_id = vq->avail->ring[idx_m % vq->size];
> 
> idx_m & (vq->size - 1) should be faster.
> 
> > +		m_vring->avail->ring[idx_m % vq->size] = desc_id;
> > +		desc_ring = vq->desc;
> > +
> > +		if (vq->desc[desc_id].flags & VRING_DESC_F_INDIRECT) {
> > +			dlen = vq->desc[desc_id].len;
> > +			desc_ring = (struct vring_desc *)(uintptr_t)
> > +			vhost_iova_to_vva(dev, vq, vq->desc[desc_id].addr,
> 
> The indent needs to be fixed.
> 
> > +						&dlen,
> > +						VHOST_ACCESS_RO);
> > +			if (unlikely(!desc_ring))
> > +				return -1;
> > +
> > +			if (unlikely(dlen < vq->desc[idx].len)) {
> > +				idesc = alloc_copy_ind_table(dev, vq,
> > +					vq->desc[idx].addr, vq->desc[idx].len);
> > +				if (unlikely(!idesc))
> > +					return -1;
> > +
> > +				desc_ring = idesc;
> > +			}
> > +
> > +			desc_id = 0;
> > +		}
> > +
> > +		/* check if the buf addr is within the guest memory */
> > +		do {
> > +			desc = desc_ring[desc_id];
> > +			if (invalid_desc_check(dev, vq, desc.addr, desc.len,
> > +						VHOST_ACCESS_RW))
> 
> Should check with < 0, otherwise should return bool.
> 
> We may just have RO access.

The desc may refers to a transmit buffer as well as receive buffer. Agree on the comments and nice catches elsewhere above, will send new version.

[...]

BRs,
Xiao
  

Patch

diff --git a/lib/librte_vhost/rte_vdpa.h b/lib/librte_vhost/rte_vdpa.h
index 89c5bb6b3..0c44b9080 100644
--- a/lib/librte_vhost/rte_vdpa.h
+++ b/lib/librte_vhost/rte_vdpa.h
@@ -173,4 +173,42 @@  rte_vdpa_get_device_num(void);
  */
 int __rte_experimental
 rte_vhost_host_notifier_ctrl(int vid, bool enable);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Synchronize the available ring from guest to mediate ring, help to
+ * check desc validity to protect against malicious guest driver.
+ *
+ * @param vid
+ *  vhost device id
+ * @param qid
+ *  vhost queue id
+ * @param m_vring
+ *  mediate virtio ring pointer
+ * @return
+ *  number of synced available entries on success, -1 on failure
+ */
+int __rte_experimental
+rte_vdpa_relay_avail_ring(int vid, int qid, struct vring *m_vring);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Synchronize the used ring from mediate ring to guest, log dirty
+ * page for each Rx buffer used.
+ *
+ * @param vid
+ *  vhost device id
+ * @param qid
+ *  vhost queue id
+ * @param m_vring
+ *  mediate virtio ring pointer
+ * @return
+ *  number of synced used entries on success, -1 on failure
+ */
+int __rte_experimental
+rte_vdpa_relay_used_ring(int vid, int qid, struct vring *m_vring);
 #endif /* _RTE_VDPA_H_ */
diff --git a/lib/librte_vhost/rte_vhost_version.map b/lib/librte_vhost/rte_vhost_version.map
index 22302e972..0ad0fbea2 100644
--- a/lib/librte_vhost/rte_vhost_version.map
+++ b/lib/librte_vhost/rte_vhost_version.map
@@ -84,4 +84,6 @@  EXPERIMENTAL {
 	rte_vhost_crypto_set_zero_copy;
 	rte_vhost_va_from_guest_pa;
 	rte_vhost_host_notifier_ctrl;
+	rte_vdpa_relay_avail_ring;
+	rte_vdpa_relay_used_ring;
 };
diff --git a/lib/librte_vhost/vdpa.c b/lib/librte_vhost/vdpa.c
index e7d849ee0..e41117776 100644
--- a/lib/librte_vhost/vdpa.c
+++ b/lib/librte_vhost/vdpa.c
@@ -122,3 +122,176 @@  rte_vdpa_get_device_num(void)
 {
 	return vdpa_device_num;
 }
+
+static int
+invalid_desc_check(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		uint64_t desc_iova, uint64_t desc_len, uint8_t perm)
+{
+	uint64_t desc_addr, desc_chunck_len;
+
+	while (desc_len) {
+		desc_chunck_len = desc_len;
+		desc_addr = vhost_iova_to_vva(dev, vq,
+				desc_iova,
+				&desc_chunck_len,
+				perm);
+
+		if (!desc_addr)
+			return -1;
+
+		desc_len -= desc_chunck_len;
+		desc_iova += desc_chunck_len;
+	}
+
+	return 0;
+}
+
+int
+rte_vdpa_relay_avail_ring(int vid, int qid, struct vring *m_vring)
+{
+	struct virtio_net *dev = get_device(vid);
+	uint16_t idx, idx_m, desc_id;
+	struct vring_desc desc;
+	struct vhost_virtqueue *vq;
+	struct vring_desc *desc_ring;
+	struct vring_desc *idesc = NULL;
+	uint64_t dlen;
+	int ret;
+
+	if (!dev)
+		return -1;
+
+	vq = dev->virtqueue[qid];
+	idx = vq->avail->idx;
+	idx_m = m_vring->avail->idx;
+	ret = idx - idx_m;
+
+	while (idx_m != idx) {
+		/* avail entry copy */
+		desc_id = vq->avail->ring[idx_m % vq->size];
+		m_vring->avail->ring[idx_m % vq->size] = desc_id;
+		desc_ring = vq->desc;
+
+		if (vq->desc[desc_id].flags & VRING_DESC_F_INDIRECT) {
+			dlen = vq->desc[desc_id].len;
+			desc_ring = (struct vring_desc *)(uintptr_t)
+			vhost_iova_to_vva(dev, vq, vq->desc[desc_id].addr,
+						&dlen,
+						VHOST_ACCESS_RO);
+			if (unlikely(!desc_ring))
+				return -1;
+
+			if (unlikely(dlen < vq->desc[idx].len)) {
+				idesc = alloc_copy_ind_table(dev, vq,
+					vq->desc[idx].addr, vq->desc[idx].len);
+				if (unlikely(!idesc))
+					return -1;
+
+				desc_ring = idesc;
+			}
+
+			desc_id = 0;
+		}
+
+		/* check if the buf addr is within the guest memory */
+		do {
+			desc = desc_ring[desc_id];
+			if (invalid_desc_check(dev, vq, desc.addr, desc.len,
+						VHOST_ACCESS_RW))
+				return -1;
+			desc_id = desc.next;
+		} while (desc.flags & VRING_DESC_F_NEXT);
+
+		if (unlikely(!!idesc)) {
+			free_ind_table(idesc);
+			idesc = NULL;
+		}
+
+		idx_m++;
+	}
+
+	m_vring->avail->idx = idx;
+
+	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))
+		vhost_avail_event(vq) = vq->avail->idx;
+
+	return ret;
+}
+
+int
+rte_vdpa_relay_used_ring(int vid, int qid, struct vring *m_vring)
+{
+	struct virtio_net *dev = get_device(vid);
+	uint16_t idx, idx_m, desc_id;
+	struct vhost_virtqueue *vq;
+	struct vring_desc desc;
+	struct vring_desc *desc_ring;
+	struct vring_desc *idesc = NULL;
+	uint64_t dlen;
+	int ret;
+
+	if (!dev)
+		return -1;
+
+	vq = dev->virtqueue[qid];
+	idx = vq->used->idx;
+	idx_m = m_vring->used->idx;
+	ret = idx_m - idx;
+
+	while (idx != idx_m) {
+		/* copy used entry, used ring logging is not covered here */
+		vq->used->ring[idx % vq->size] =
+			m_vring->used->ring[idx % vq->size];
+
+		/* dirty page logging for used ring */
+		vhost_log_used_vring(dev, vq,
+			offsetof(struct vring_used, ring[idx % vq->size]),
+			sizeof(struct vring_used_elem));
+
+		desc_id = vq->used->ring[idx % vq->size].id;
+		desc_ring = vq->desc;
+
+		if (vq->desc[desc_id].flags & VRING_DESC_F_INDIRECT) {
+			dlen = vq->desc[desc_id].len;
+			desc_ring = (struct vring_desc *)(uintptr_t)
+			vhost_iova_to_vva(dev, vq, vq->desc[desc_id].addr,
+						&dlen,
+						VHOST_ACCESS_RO);
+			if (unlikely(!desc_ring))
+				return -1;
+
+			if (unlikely(dlen < vq->desc[idx].len)) {
+				idesc = alloc_copy_ind_table(dev, vq,
+					vq->desc[idx].addr, vq->desc[idx].len);
+				if (unlikely(!idesc))
+					return -1;
+
+				desc_ring = idesc;
+			}
+
+			desc_id = 0;
+		}
+
+		/* dirty page logging for Rx buffer */
+		do {
+			desc = desc_ring[desc_id];
+			if (desc.flags & VRING_DESC_F_WRITE)
+				vhost_log_write(dev, desc.addr, desc.len);
+			desc_id = desc.next;
+		} while (desc.flags & VRING_DESC_F_NEXT);
+
+		if (unlikely(!!idesc)) {
+			free_ind_table(idesc);
+			idesc = NULL;
+		}
+
+		idx++;
+	}
+
+	vq->used->idx = idx_m;
+
+	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))
+		vring_used_event(m_vring) = m_vring->used->idx;
+
+	return ret;
+}
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 5218f1b12..2164cd6d9 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -18,6 +18,7 @@ 
 #include <rte_log.h>
 #include <rte_ether.h>
 #include <rte_rwlock.h>
+#include <rte_malloc.h>
 
 #include "rte_vhost.h"
 #include "rte_vdpa.h"
@@ -753,4 +754,43 @@  vhost_vring_call_packed(struct virtio_net *dev, struct vhost_virtqueue *vq)
 		eventfd_write(vq->callfd, (eventfd_t)1);
 }
 
+static __rte_always_inline void *
+alloc_copy_ind_table(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		uint64_t desc_addr, uint64_t desc_len)
+{
+	void *idesc;
+	uint64_t src, dst;
+	uint64_t len, remain = desc_len;
+
+	idesc = rte_malloc(__func__, desc_len, 0);
+	if (unlikely(!idesc))
+		return 0;
+
+	dst = (uint64_t)(uintptr_t)idesc;
+
+	while (remain) {
+		len = remain;
+		src = vhost_iova_to_vva(dev, vq, desc_addr, &len,
+				VHOST_ACCESS_RO);
+		if (unlikely(!src || !len)) {
+			rte_free(idesc);
+			return 0;
+		}
+
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src, len);
+
+		remain -= len;
+		dst += len;
+		desc_addr += len;
+	}
+
+	return idesc;
+}
+
+static __rte_always_inline void
+free_ind_table(void *idesc)
+{
+	rte_free(idesc);
+}
+
 #endif /* _VHOST_NET_CDEV_H_ */
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 5e1a1a727..8c657a101 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -37,45 +37,6 @@  is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
 	return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
 }
 
-static __rte_always_inline void *
-alloc_copy_ind_table(struct virtio_net *dev, struct vhost_virtqueue *vq,
-		uint64_t desc_addr, uint64_t desc_len)
-{
-	void *idesc;
-	uint64_t src, dst;
-	uint64_t len, remain = desc_len;
-
-	idesc = rte_malloc(__func__, desc_len, 0);
-	if (unlikely(!idesc))
-		return 0;
-
-	dst = (uint64_t)(uintptr_t)idesc;
-
-	while (remain) {
-		len = remain;
-		src = vhost_iova_to_vva(dev, vq, desc_addr, &len,
-				VHOST_ACCESS_RO);
-		if (unlikely(!src || !len)) {
-			rte_free(idesc);
-			return 0;
-		}
-
-		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src, len);
-
-		remain -= len;
-		dst += len;
-		desc_addr += len;
-	}
-
-	return idesc;
-}
-
-static __rte_always_inline void
-free_ind_table(void *idesc)
-{
-	rte_free(idesc);
-}
-
 static __rte_always_inline void
 do_flush_shadow_used_ring_split(struct virtio_net *dev,
 			struct vhost_virtqueue *vq,