[v3,1/4] vhost: simplify async copy completion

Message ID 20200929092955.2848419-2-patrick.fu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series optimize async data path |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Patrick Fu Sept. 29, 2020, 9:29 a.m. UTC
  Current async ops allows check_completed_copies() callback to return
arbitrary number of async iov segments finished from backend async
devices. This design creates complexity for vhost to handle breaking
transfer of a single packet (i.e. transfer completes in the middle
of a async descriptor) and prevents application callbacks from
leveraging hardware capability to offload the work. Thus, this patch
enforces the check_completed_copies() callback to return the number
of async memory descriptors, which is aligned with async transfer
data ops callbacks. vHost async data path are revised to work with
new ops define, which provides a clean and simplified processing.

Signed-off-by: Patrick Fu <patrick.fu@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  15 ++-
 lib/librte_vhost/vhost.c           |  20 ++--
 lib/librte_vhost/vhost.h           |   8 +-
 lib/librte_vhost/vhost_user.c      |   6 +-
 lib/librte_vhost/virtio_net.c      | 151 ++++++++++++-----------------
 5 files changed, 90 insertions(+), 110 deletions(-)
  

Comments

Maxime Coquelin Oct. 5, 2020, 1:46 p.m. UTC | #1
On 9/29/20 11:29 AM, Patrick Fu wrote:
> Current async ops allows check_completed_copies() callback to return
> arbitrary number of async iov segments finished from backend async
> devices. This design creates complexity for vhost to handle breaking
> transfer of a single packet (i.e. transfer completes in the middle
> of a async descriptor) and prevents application callbacks from
> leveraging hardware capability to offload the work. Thus, this patch
> enforces the check_completed_copies() callback to return the number
> of async memory descriptors, which is aligned with async transfer
> data ops callbacks. vHost async data path are revised to work with
> new ops define, which provides a clean and simplified processing.
> 
> Signed-off-by: Patrick Fu <patrick.fu@intel.com>
> ---
>  lib/librte_vhost/rte_vhost_async.h |  15 ++-
>  lib/librte_vhost/vhost.c           |  20 ++--
>  lib/librte_vhost/vhost.h           |   8 +-
>  lib/librte_vhost/vhost_user.c      |   6 +-
>  lib/librte_vhost/virtio_net.c      | 151 ++++++++++++-----------------
>  5 files changed, 90 insertions(+), 110 deletions(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
> index cb6284539..c73bd7c99 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -76,13 +76,26 @@ struct rte_vhost_async_channel_ops {
>  	 * @param max_packets
>  	 *  max number of packets could be completed
>  	 * @return
> -	 *  number of iov segments completed
> +	 *  number of async descs completed
>  	 */
>  	uint32_t (*check_completed_copies)(int vid, uint16_t queue_id,
>  		struct rte_vhost_async_status *opaque_data,
>  		uint16_t max_packets);
>  };
>  
> +/**
> + * inflight async packet information
> + */
> +struct async_inflight_info {
> +	union {
> +		uint32_t info;
> +		struct {
> +			uint16_t descs; /* num of descs inflight */
> +			uint16_t segs; /* iov segs inflight */
> +		};
> +	};

I think you can removing the union.

> +};
> +
>  /**
>   *  dma channel feature bit definition
>   */
> diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> index 8f20a0818..eca507836 100644
> --- a/lib/librte_vhost/vhost.c
> +++ b/lib/librte_vhost/vhost.c
> @@ -333,8 +333,8 @@ free_vq(struct virtio_net *dev, struct vhost_virtqueue *vq)
>  		rte_free(vq->shadow_used_split);
>  		if (vq->async_pkts_pending)
>  			rte_free(vq->async_pkts_pending);
> -		if (vq->async_pending_info)
> -			rte_free(vq->async_pending_info);
> +		if (vq->async_pkts_info)
> +			rte_free(vq->async_pkts_info);
>  	}
>  	rte_free(vq->batch_copy_elems);
>  	rte_mempool_free(vq->iotlb_pool);
> @@ -1573,15 +1573,15 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
>  	vq->async_pkts_pending = rte_malloc(NULL,
>  			vq->size * sizeof(uintptr_t),
>  			RTE_CACHE_LINE_SIZE);
> -	vq->async_pending_info = rte_malloc(NULL,
> -			vq->size * sizeof(uint64_t),
> +	vq->async_pkts_info = rte_malloc(NULL,
> +			vq->size * sizeof(struct async_inflight_info),
>  			RTE_CACHE_LINE_SIZE);
> -	if (!vq->async_pkts_pending || !vq->async_pending_info) {
> +	if (!vq->async_pkts_pending || !vq->async_pkts_info) {
>  		if (vq->async_pkts_pending)
>  			rte_free(vq->async_pkts_pending);
>  
> -		if (vq->async_pending_info)
> -			rte_free(vq->async_pending_info);
> +		if (vq->async_pkts_info)
> +			rte_free(vq->async_pkts_info);
>  
>  		VHOST_LOG_CONFIG(ERR,
>  				"async register failed: cannot allocate memory for vq data "
> @@ -1635,9 +1635,9 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id)
>  		vq->async_pkts_pending = NULL;
>  	}
>  
> -	if (vq->async_pending_info) {
> -		rte_free(vq->async_pending_info);
> -		vq->async_pending_info = NULL;
> +	if (vq->async_pkts_info) {
> +		rte_free(vq->async_pkts_info);
> +		vq->async_pkts_info = NULL;
>  	}
>  
>  	vq->async_ops.transfer_data = NULL;
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 73a1ed889..155a832c1 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -46,8 +46,6 @@
>  
>  #define MAX_PKT_BURST 32
>  
> -#define ASYNC_MAX_POLL_SEG 255
> -
>  #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
>  #define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
>  
> @@ -225,12 +223,10 @@ struct vhost_virtqueue {
>  
>  	/* async data transfer status */
>  	uintptr_t	**async_pkts_pending;
> -	#define		ASYNC_PENDING_INFO_N_MSK 0xFFFF
> -	#define		ASYNC_PENDING_INFO_N_SFT 16
> -	uint64_t	*async_pending_info;
> +	struct async_inflight_info *async_pkts_info;
>  	uint16_t	async_pkts_idx;
>  	uint16_t	async_pkts_inflight_n;
> -	uint16_t	async_last_seg_n;
> +	uint16_t	async_last_pkts_n;
>  
>  	/* vq async features */
>  	bool		async_inorder;
> diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
> index 501218e19..67d2a2d43 100644
> --- a/lib/librte_vhost/vhost_user.c
> +++ b/lib/librte_vhost/vhost_user.c
> @@ -2006,10 +2006,10 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
>  		vq->shadow_used_split = NULL;
>  		if (vq->async_pkts_pending)
>  			rte_free(vq->async_pkts_pending);
> -		if (vq->async_pending_info)
> -			rte_free(vq->async_pending_info);
> +		if (vq->async_pkts_info)
> +			rte_free(vq->async_pkts_info);
>  		vq->async_pkts_pending = NULL;
> -		vq->async_pending_info = NULL;
> +		vq->async_pkts_info = NULL;
>  	}
>  
>  	rte_free(vq->batch_copy_elems);
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index bd9303c8a..68ead9a71 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -1473,37 +1473,6 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
>  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
>  }
>  
> -static __rte_always_inline void
> -virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> -	struct vhost_virtqueue *vq, uint16_t queue_id,
> -	uint16_t last_idx, uint16_t shadow_idx)
> -{
> -	uint16_t start_idx, pkts_idx, vq_size;
> -	uint64_t *async_pending_info;
> -
> -	pkts_idx = vq->async_pkts_idx;
> -	async_pending_info = vq->async_pending_info;
> -	vq_size = vq->size;
> -	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
> -		vq_size, vq->async_pkts_inflight_n);
> -
> -	while (likely((start_idx & (vq_size - 1)) != pkts_idx)) {
> -		uint64_t n_seg =
> -			async_pending_info[(start_idx) & (vq_size - 1)] >>
> -			ASYNC_PENDING_INFO_N_SFT;
> -
> -		while (n_seg)
> -			n_seg -= vq->async_ops.check_completed_copies(dev->vid,
> -				queue_id, 0, 1);
> -	}
> -
> -	vq->async_pkts_inflight_n = 0;
> -	vq->batch_copy_nb_elems = 0;
> -
> -	vq->shadow_used_idx = shadow_idx;
> -	vq->last_avail_idx = last_idx;
> -}
> -
>  static __rte_noinline uint32_t
>  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	struct vhost_virtqueue *vq, uint16_t queue_id,
> @@ -1512,7 +1481,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
>  	uint16_t num_buffers;
>  	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> -	uint16_t avail_head, last_idx, shadow_idx;
> +	uint16_t avail_head;
>  
>  	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
>  	struct iovec *vec_pool = vq->vec_pool;
> @@ -1522,11 +1491,11 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	struct rte_vhost_iov_iter *src_it = it_pool;
>  	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
>  	uint16_t n_free_slot, slot_idx;
> +	uint16_t pkt_err = 0;
> +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
>  	int n_pkts = 0;
>  
>  	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
> -	last_idx = vq->last_avail_idx;
> -	shadow_idx = vq->shadow_used_idx;
>  
>  	/*
>  	 * The ordering between avail index and
> @@ -1565,14 +1534,14 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  		if (src_it->count) {
>  			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
>  			pkt_burst_idx++;
> -			vq->async_pending_info[slot_idx] =
> -				num_buffers | (src_it->nr_segs << 16);
> +			pkts_info[slot_idx].descs = num_buffers;
> +			pkts_info[slot_idx].segs = src_it->nr_segs;
>  			src_iovec += src_it->nr_segs;
>  			dst_iovec += dst_it->nr_segs;
>  			src_it += 2;
>  			dst_it += 2;
>  		} else {
> -			vq->async_pending_info[slot_idx] = num_buffers;
> +			pkts_info[slot_idx].info = num_buffers;
>  			vq->async_pkts_inflight_n++;
>  		}
>  
> @@ -1586,35 +1555,46 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
>  			src_it = it_pool;
>  			dst_it = it_pool + 1;
> +			vq->async_pkts_inflight_n += n_pkts;
>  
>  			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> -				vq->async_pkts_inflight_n +=
> -					n_pkts > 0 ? n_pkts : 0;
> -				virtio_dev_rx_async_submit_split_err(dev,
> -					vq, queue_id, last_idx, shadow_idx);
> -				return 0;
> +				/*
> +				 * log error packets number here and do actual
> +				 * error processing when applications poll
> +				 * completion
> +				 */
> +				pkt_err = pkt_burst_idx - n_pkts;
> +				pkt_burst_idx = 0;
> +				break;
>  			}
>  
>  			pkt_burst_idx = 0;
> -			vq->async_pkts_inflight_n += n_pkts;
>  		}
>  	}
>  
>  	if (pkt_burst_idx) {
>  		n_pkts = vq->async_ops.transfer_data(dev->vid,
>  				queue_id, tdes, 0, pkt_burst_idx);
> -		if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> -			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> -			virtio_dev_rx_async_submit_split_err(dev, vq, queue_id,
> -				last_idx, shadow_idx);
> -			return 0;
> -		}
> -
>  		vq->async_pkts_inflight_n += n_pkts;
> +
> +		if (unlikely(n_pkts < (int)pkt_burst_idx))
> +			pkt_err = pkt_burst_idx - n_pkts;
>  	}
>  
>  	do_data_copy_enqueue(dev, vq);
>  
> +	if (unlikely(pkt_err)) {
> +		do {
> +			if (pkts_info[slot_idx].segs)
> +				pkt_err--;
> +			vq->last_avail_idx -= pkts_info[slot_idx].descs;
> +			vq->shadow_used_idx -= pkts_info[slot_idx].descs;
> +			vq->async_pkts_inflight_n--;
> +			slot_idx--;
> +			pkt_idx--;
> +		} while (pkt_err);

That does not sound really robust.
Maybe you could exit also on slot_idx < 0 to avoid out of range
accesses?

} while (pkt_err && slot_idx >= 0)

> +	}
> +
>  	n_free_slot = vq->size - vq->async_pkts_idx;
>  	if (n_free_slot > pkt_idx) {
>  		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
> @@ -1640,10 +1620,10 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  {
>  	struct virtio_net *dev = get_device(vid);
>  	struct vhost_virtqueue *vq;
> -	uint16_t n_segs_cpl, n_pkts_put = 0, n_descs = 0;
> +	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
>  	uint16_t start_idx, pkts_idx, vq_size;
>  	uint16_t n_inflight;
> -	uint64_t *async_pending_info;
> +	struct async_inflight_info *pkts_info;
>  
>  	if (!dev)
>  		return 0;
> @@ -1657,51 +1637,40 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  
>  	vq = dev->virtqueue[queue_id];
>  
> +	if (unlikely(!vq->async_registered)) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
>  	rte_spinlock_lock(&vq->access_lock);
>  
>  	n_inflight = vq->async_pkts_inflight_n;
>  	pkts_idx = vq->async_pkts_idx;
> -	async_pending_info = vq->async_pending_info;
> +	pkts_info = vq->async_pkts_info;
>  	vq_size = vq->size;
>  	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
>  		vq_size, vq->async_pkts_inflight_n);
>  
> -	n_segs_cpl = vq->async_ops.check_completed_copies(vid, queue_id,
> -		0, ASYNC_MAX_POLL_SEG - vq->async_last_seg_n) +
> -		vq->async_last_seg_n;
> +	if (count > vq->async_last_pkts_n)
> +		n_pkts_cpl = vq->async_ops.check_completed_copies(vid,
> +			queue_id, 0, count - vq->async_last_pkts_n);
> +	n_pkts_cpl += vq->async_last_pkts_n;
>  
>  	rte_smp_wmb();
>  
>  	while (likely((n_pkts_put < count) && n_inflight)) {
> -		uint64_t info = async_pending_info[
> -			(start_idx + n_pkts_put) & (vq_size - 1)];
> -		uint64_t n_segs;
> +		uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
> +		if (n_pkts_cpl && pkts_info[info_idx].segs)
> +			n_pkts_cpl--;
> +		else if (!n_pkts_cpl && pkts_info[info_idx].segs)
> +			break;
>  		n_pkts_put++;
>  		n_inflight--;
> -		n_descs += info & ASYNC_PENDING_INFO_N_MSK;
> -		n_segs = info >> ASYNC_PENDING_INFO_N_SFT;
> -
> -		if (n_segs) {
> -			if (unlikely(n_segs_cpl < n_segs)) {
> -				n_pkts_put--;
> -				n_inflight++;
> -				n_descs -= info & ASYNC_PENDING_INFO_N_MSK;
> -				if (n_segs_cpl) {
> -					async_pending_info[
> -						(start_idx + n_pkts_put) &
> -						(vq_size - 1)] =
> -					((n_segs - n_segs_cpl) <<
> -					 ASYNC_PENDING_INFO_N_SFT) |
> -					(info & ASYNC_PENDING_INFO_N_MSK);
> -					n_segs_cpl = 0;
> -				}
> -				break;
> -			}
> -			n_segs_cpl -= n_segs;
> -		}
> +		n_descs += pkts_info[info_idx].descs;
>  	}
>  
> -	vq->async_last_seg_n = n_segs_cpl;
> +	vq->async_last_pkts_n = n_pkts_cpl;
>  
>  	if (n_pkts_put) {
>  		vq->async_pkts_inflight_n = n_inflight;
> @@ -1710,16 +1679,18 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  					n_descs, __ATOMIC_RELEASE);
>  			vhost_vring_call_split(dev, vq);
>  		}
> -	}
>  
> -	if (start_idx + n_pkts_put <= vq_size) {
> -		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> -			n_pkts_put * sizeof(uintptr_t));
> -	} else {
> -		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> -			(vq_size - start_idx) * sizeof(uintptr_t));
> -		rte_memcpy(&pkts[vq_size - start_idx], vq->async_pkts_pending,
> -			(n_pkts_put - vq_size + start_idx) * sizeof(uintptr_t));
> +		if (start_idx + n_pkts_put <= vq_size) {
> +			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> +				n_pkts_put * sizeof(uintptr_t));
> +		} else {
> +			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> +				(vq_size - start_idx) * sizeof(uintptr_t));
> +			rte_memcpy(&pkts[vq_size - start_idx],
> +				vq->async_pkts_pending,
> +				(n_pkts_put + start_idx - vq_size) *
> +				sizeof(uintptr_t));
> +		}
>  	}
>  
>  	rte_spinlock_unlock(&vq->access_lock);
>
  
Patrick Fu Oct. 9, 2020, 11:16 a.m. UTC | #2
> > On 9/29/20 11:29 AM, Patrick Fu wrote:
> > Current async ops allows check_completed_copies() callback to return
> > arbitrary number of async iov segments finished from backend async
> > devices. This design creates complexity for vhost to handle breaking
> > transfer of a single packet (i.e. transfer completes in the middle
> > of a async descriptor) and prevents application callbacks from
> > leveraging hardware capability to offload the work. Thus, this patch
> > enforces the check_completed_copies() callback to return the number
> > of async memory descriptors, which is aligned with async transfer
> > data ops callbacks. vHost async data path are revised to work with
> > new ops define, which provides a clean and simplified processing.
> >
> > Signed-off-by: Patrick Fu <patrick.fu@intel.com>
> > ---
> >  lib/librte_vhost/rte_vhost_async.h |  15 ++-
> >  lib/librte_vhost/vhost.c           |  20 ++--
> >  lib/librte_vhost/vhost.h           |   8 +-
> >  lib/librte_vhost/vhost_user.c      |   6 +-
> >  lib/librte_vhost/virtio_net.c      | 151 ++++++++++++-----------------
> >  5 files changed, 90 insertions(+), 110 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> b/lib/librte_vhost/rte_vhost_async.h
> > index cb6284539..c73bd7c99 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > @@ -76,13 +76,26 @@ struct rte_vhost_async_channel_ops {
> >  	 * @param max_packets
> >  	 *  max number of packets could be completed
> >  	 * @return
> > -	 *  number of iov segments completed
> > +	 *  number of async descs completed
> >  	 */
> >  	uint32_t (*check_completed_copies)(int vid, uint16_t queue_id,
> >  		struct rte_vhost_async_status *opaque_data,
> >  		uint16_t max_packets);
> >  };
> >
> > +/**
> > + * inflight async packet information
> > + */
> > +struct async_inflight_info {
> > +	union {
> > +		uint32_t info;
> > +		struct {
> > +			uint16_t descs; /* num of descs inflight */
> > +			uint16_t segs; /* iov segs inflight */
> > +		};
> > +	};
> 
> I think you can removing the union.
> 
There are some cases in the code that we want to set descs=N and set segs=0. In such cases, reference info=N directly will be faster than setting descs & segs separately.  So I would prefer to keep the union.

> > +};
> > +
> >  /**
> >   *  dma channel feature bit definition
> >   */
> > diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> > index 8f20a0818..eca507836 100644
> > --- a/lib/librte_vhost/vhost.c
> > +++ b/lib/librte_vhost/vhost.c
> > @@ -333,8 +333,8 @@ free_vq(struct virtio_net *dev, struct
> vhost_virtqueue *vq)
> >  		rte_free(vq->shadow_used_split);
> >  		if (vq->async_pkts_pending)
> >  			rte_free(vq->async_pkts_pending);
> > -		if (vq->async_pending_info)
> > -			rte_free(vq->async_pending_info);
> > +		if (vq->async_pkts_info)
> > +			rte_free(vq->async_pkts_info);
> >  	}
> >  	rte_free(vq->batch_copy_elems);
> >  	rte_mempool_free(vq->iotlb_pool);
> > @@ -1573,15 +1573,15 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
> >  	vq->async_pkts_pending = rte_malloc(NULL,
> >  			vq->size * sizeof(uintptr_t),
> >  			RTE_CACHE_LINE_SIZE);
> > -	vq->async_pending_info = rte_malloc(NULL,
> > -			vq->size * sizeof(uint64_t),
> > +	vq->async_pkts_info = rte_malloc(NULL,
> > +			vq->size * sizeof(struct async_inflight_info),
> >  			RTE_CACHE_LINE_SIZE);
> > -	if (!vq->async_pkts_pending || !vq->async_pending_info) {
> > +	if (!vq->async_pkts_pending || !vq->async_pkts_info) {
> >  		if (vq->async_pkts_pending)
> >  			rte_free(vq->async_pkts_pending);
> >
> > -		if (vq->async_pending_info)
> > -			rte_free(vq->async_pending_info);
> > +		if (vq->async_pkts_info)
> > +			rte_free(vq->async_pkts_info);
> >
> >  		VHOST_LOG_CONFIG(ERR,
> >  				"async register failed: cannot allocate
> memory for vq data "
> > @@ -1635,9 +1635,9 @@ int rte_vhost_async_channel_unregister(int vid,
> uint16_t queue_id)
> >  		vq->async_pkts_pending = NULL;
> >  	}
> >
> > -	if (vq->async_pending_info) {
> > -		rte_free(vq->async_pending_info);
> > -		vq->async_pending_info = NULL;
> > +	if (vq->async_pkts_info) {
> > +		rte_free(vq->async_pkts_info);
> > +		vq->async_pkts_info = NULL;
> >  	}
> >
> >  	vq->async_ops.transfer_data = NULL;
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > index 73a1ed889..155a832c1 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -46,8 +46,6 @@
> >
> >  #define MAX_PKT_BURST 32
> >
> > -#define ASYNC_MAX_POLL_SEG 255
> > -
> >  #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
> >  #define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
> >
> > @@ -225,12 +223,10 @@ struct vhost_virtqueue {
> >
> >  	/* async data transfer status */
> >  	uintptr_t	**async_pkts_pending;
> > -	#define		ASYNC_PENDING_INFO_N_MSK 0xFFFF
> > -	#define		ASYNC_PENDING_INFO_N_SFT 16
> > -	uint64_t	*async_pending_info;
> > +	struct async_inflight_info *async_pkts_info;
> >  	uint16_t	async_pkts_idx;
> >  	uint16_t	async_pkts_inflight_n;
> > -	uint16_t	async_last_seg_n;
> > +	uint16_t	async_last_pkts_n;
> >
> >  	/* vq async features */
> >  	bool		async_inorder;
> > diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
> > index 501218e19..67d2a2d43 100644
> > --- a/lib/librte_vhost/vhost_user.c
> > +++ b/lib/librte_vhost/vhost_user.c
> > @@ -2006,10 +2006,10 @@ vhost_user_get_vring_base(struct virtio_net
> **pdev,
> >  		vq->shadow_used_split = NULL;
> >  		if (vq->async_pkts_pending)
> >  			rte_free(vq->async_pkts_pending);
> > -		if (vq->async_pending_info)
> > -			rte_free(vq->async_pending_info);
> > +		if (vq->async_pkts_info)
> > +			rte_free(vq->async_pkts_info);
> >  		vq->async_pkts_pending = NULL;
> > -		vq->async_pending_info = NULL;
> > +		vq->async_pkts_info = NULL;
> >  	}
> >
> >  	rte_free(vq->batch_copy_elems);
> > diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> > index bd9303c8a..68ead9a71 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -1473,37 +1473,6 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
> >  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
> >  }
> >
> > -static __rte_always_inline void
> > -virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> > -	struct vhost_virtqueue *vq, uint16_t queue_id,
> > -	uint16_t last_idx, uint16_t shadow_idx)
> > -{
> > -	uint16_t start_idx, pkts_idx, vq_size;
> > -	uint64_t *async_pending_info;
> > -
> > -	pkts_idx = vq->async_pkts_idx;
> > -	async_pending_info = vq->async_pending_info;
> > -	vq_size = vq->size;
> > -	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
> > -		vq_size, vq->async_pkts_inflight_n);
> > -
> > -	while (likely((start_idx & (vq_size - 1)) != pkts_idx)) {
> > -		uint64_t n_seg =
> > -			async_pending_info[(start_idx) & (vq_size - 1)] >>
> > -			ASYNC_PENDING_INFO_N_SFT;
> > -
> > -		while (n_seg)
> > -			n_seg -= vq-
> >async_ops.check_completed_copies(dev->vid,
> > -				queue_id, 0, 1);
> > -	}
> > -
> > -	vq->async_pkts_inflight_n = 0;
> > -	vq->batch_copy_nb_elems = 0;
> > -
> > -	vq->shadow_used_idx = shadow_idx;
> > -	vq->last_avail_idx = last_idx;
> > -}
> > -
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	struct vhost_virtqueue *vq, uint16_t queue_id,
> > @@ -1512,7 +1481,7 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> >  	uint16_t num_buffers;
> >  	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > -	uint16_t avail_head, last_idx, shadow_idx;
> > +	uint16_t avail_head;
> >
> >  	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> >  	struct iovec *vec_pool = vq->vec_pool;
> > @@ -1522,11 +1491,11 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  	struct rte_vhost_iov_iter *src_it = it_pool;
> >  	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> >  	uint16_t n_free_slot, slot_idx;
> > +	uint16_t pkt_err = 0;
> > +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> >  	int n_pkts = 0;
> >
> >  	avail_head = __atomic_load_n(&vq->avail->idx,
> __ATOMIC_ACQUIRE);
> > -	last_idx = vq->last_avail_idx;
> > -	shadow_idx = vq->shadow_used_idx;
> >
> >  	/*
> >  	 * The ordering between avail index and
> > @@ -1565,14 +1534,14 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  		if (src_it->count) {
> >  			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
> >  			pkt_burst_idx++;
> > -			vq->async_pending_info[slot_idx] =
> > -				num_buffers | (src_it->nr_segs << 16);
> > +			pkts_info[slot_idx].descs = num_buffers;
> > +			pkts_info[slot_idx].segs = src_it->nr_segs;
> >  			src_iovec += src_it->nr_segs;
> >  			dst_iovec += dst_it->nr_segs;
> >  			src_it += 2;
> >  			dst_it += 2;
> >  		} else {
> > -			vq->async_pending_info[slot_idx] = num_buffers;
> > +			pkts_info[slot_idx].info = num_buffers;
> >  			vq->async_pkts_inflight_n++;
> >  		}
> >
> > @@ -1586,35 +1555,46 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> >  			src_it = it_pool;
> >  			dst_it = it_pool + 1;
> > +			vq->async_pkts_inflight_n += n_pkts;
> >
> >  			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > -				vq->async_pkts_inflight_n +=
> > -					n_pkts > 0 ? n_pkts : 0;
> > -				virtio_dev_rx_async_submit_split_err(dev,
> > -					vq, queue_id, last_idx, shadow_idx);
> > -				return 0;
> > +				/*
> > +				 * log error packets number here and do
> actual
> > +				 * error processing when applications poll
> > +				 * completion
> > +				 */
> > +				pkt_err = pkt_burst_idx - n_pkts;
> > +				pkt_burst_idx = 0;
> > +				break;
> >  			}
> >
> >  			pkt_burst_idx = 0;
> > -			vq->async_pkts_inflight_n += n_pkts;
> >  		}
> >  	}
> >
> >  	if (pkt_burst_idx) {
> >  		n_pkts = vq->async_ops.transfer_data(dev->vid,
> >  				queue_id, tdes, 0, pkt_burst_idx);
> > -		if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > -			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> > -			virtio_dev_rx_async_submit_split_err(dev, vq,
> queue_id,
> > -				last_idx, shadow_idx);
> > -			return 0;
> > -		}
> > -
> >  		vq->async_pkts_inflight_n += n_pkts;
> > +
> > +		if (unlikely(n_pkts < (int)pkt_burst_idx))
> > +			pkt_err = pkt_burst_idx - n_pkts;
> >  	}
> >
> >  	do_data_copy_enqueue(dev, vq);
> >
> > +	if (unlikely(pkt_err)) {
> > +		do {
> > +			if (pkts_info[slot_idx].segs)
> > +				pkt_err--;
> > +			vq->last_avail_idx -= pkts_info[slot_idx].descs;
> > +			vq->shadow_used_idx -= pkts_info[slot_idx].descs;
> > +			vq->async_pkts_inflight_n--;
> > +			slot_idx--;
> > +			pkt_idx--;
> > +		} while (pkt_err);
> 
> That does not sound really robust.
> Maybe you could exit also on slot_idx < 0 to avoid out of range
> accesses?
> 
> } while (pkt_err && slot_idx >= 0)
>
slot_idx is unsigned type and it's design to be working like a ring index. But I do need to send a new patch to handle the case that "slot_idx==0".
For the robust, I can add a check: "pkt_idx > 0".

Thanks,

Patrick
  

Patch

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index cb6284539..c73bd7c99 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -76,13 +76,26 @@  struct rte_vhost_async_channel_ops {
 	 * @param max_packets
 	 *  max number of packets could be completed
 	 * @return
-	 *  number of iov segments completed
+	 *  number of async descs completed
 	 */
 	uint32_t (*check_completed_copies)(int vid, uint16_t queue_id,
 		struct rte_vhost_async_status *opaque_data,
 		uint16_t max_packets);
 };
 
+/**
+ * inflight async packet information
+ */
+struct async_inflight_info {
+	union {
+		uint32_t info;
+		struct {
+			uint16_t descs; /* num of descs inflight */
+			uint16_t segs; /* iov segs inflight */
+		};
+	};
+};
+
 /**
  *  dma channel feature bit definition
  */
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index 8f20a0818..eca507836 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -333,8 +333,8 @@  free_vq(struct virtio_net *dev, struct vhost_virtqueue *vq)
 		rte_free(vq->shadow_used_split);
 		if (vq->async_pkts_pending)
 			rte_free(vq->async_pkts_pending);
-		if (vq->async_pending_info)
-			rte_free(vq->async_pending_info);
+		if (vq->async_pkts_info)
+			rte_free(vq->async_pkts_info);
 	}
 	rte_free(vq->batch_copy_elems);
 	rte_mempool_free(vq->iotlb_pool);
@@ -1573,15 +1573,15 @@  int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->async_pkts_pending = rte_malloc(NULL,
 			vq->size * sizeof(uintptr_t),
 			RTE_CACHE_LINE_SIZE);
-	vq->async_pending_info = rte_malloc(NULL,
-			vq->size * sizeof(uint64_t),
+	vq->async_pkts_info = rte_malloc(NULL,
+			vq->size * sizeof(struct async_inflight_info),
 			RTE_CACHE_LINE_SIZE);
-	if (!vq->async_pkts_pending || !vq->async_pending_info) {
+	if (!vq->async_pkts_pending || !vq->async_pkts_info) {
 		if (vq->async_pkts_pending)
 			rte_free(vq->async_pkts_pending);
 
-		if (vq->async_pending_info)
-			rte_free(vq->async_pending_info);
+		if (vq->async_pkts_info)
+			rte_free(vq->async_pkts_info);
 
 		VHOST_LOG_CONFIG(ERR,
 				"async register failed: cannot allocate memory for vq data "
@@ -1635,9 +1635,9 @@  int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id)
 		vq->async_pkts_pending = NULL;
 	}
 
-	if (vq->async_pending_info) {
-		rte_free(vq->async_pending_info);
-		vq->async_pending_info = NULL;
+	if (vq->async_pkts_info) {
+		rte_free(vq->async_pkts_info);
+		vq->async_pkts_info = NULL;
 	}
 
 	vq->async_ops.transfer_data = NULL;
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 73a1ed889..155a832c1 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -46,8 +46,6 @@ 
 
 #define MAX_PKT_BURST 32
 
-#define ASYNC_MAX_POLL_SEG 255
-
 #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
 #define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
 
@@ -225,12 +223,10 @@  struct vhost_virtqueue {
 
 	/* async data transfer status */
 	uintptr_t	**async_pkts_pending;
-	#define		ASYNC_PENDING_INFO_N_MSK 0xFFFF
-	#define		ASYNC_PENDING_INFO_N_SFT 16
-	uint64_t	*async_pending_info;
+	struct async_inflight_info *async_pkts_info;
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
-	uint16_t	async_last_seg_n;
+	uint16_t	async_last_pkts_n;
 
 	/* vq async features */
 	bool		async_inorder;
diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
index 501218e19..67d2a2d43 100644
--- a/lib/librte_vhost/vhost_user.c
+++ b/lib/librte_vhost/vhost_user.c
@@ -2006,10 +2006,10 @@  vhost_user_get_vring_base(struct virtio_net **pdev,
 		vq->shadow_used_split = NULL;
 		if (vq->async_pkts_pending)
 			rte_free(vq->async_pkts_pending);
-		if (vq->async_pending_info)
-			rte_free(vq->async_pending_info);
+		if (vq->async_pkts_info)
+			rte_free(vq->async_pkts_info);
 		vq->async_pkts_pending = NULL;
-		vq->async_pending_info = NULL;
+		vq->async_pkts_info = NULL;
 	}
 
 	rte_free(vq->batch_copy_elems);
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index bd9303c8a..68ead9a71 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -1473,37 +1473,6 @@  virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
 }
 
-static __rte_always_inline void
-virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
-	struct vhost_virtqueue *vq, uint16_t queue_id,
-	uint16_t last_idx, uint16_t shadow_idx)
-{
-	uint16_t start_idx, pkts_idx, vq_size;
-	uint64_t *async_pending_info;
-
-	pkts_idx = vq->async_pkts_idx;
-	async_pending_info = vq->async_pending_info;
-	vq_size = vq->size;
-	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
-		vq_size, vq->async_pkts_inflight_n);
-
-	while (likely((start_idx & (vq_size - 1)) != pkts_idx)) {
-		uint64_t n_seg =
-			async_pending_info[(start_idx) & (vq_size - 1)] >>
-			ASYNC_PENDING_INFO_N_SFT;
-
-		while (n_seg)
-			n_seg -= vq->async_ops.check_completed_copies(dev->vid,
-				queue_id, 0, 1);
-	}
-
-	vq->async_pkts_inflight_n = 0;
-	vq->batch_copy_nb_elems = 0;
-
-	vq->shadow_used_idx = shadow_idx;
-	vq->last_avail_idx = last_idx;
-}
-
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
@@ -1512,7 +1481,7 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
 	uint16_t num_buffers;
 	struct buf_vector buf_vec[BUF_VECTOR_MAX];
-	uint16_t avail_head, last_idx, shadow_idx;
+	uint16_t avail_head;
 
 	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
 	struct iovec *vec_pool = vq->vec_pool;
@@ -1522,11 +1491,11 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct rte_vhost_iov_iter *src_it = it_pool;
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
 	uint16_t n_free_slot, slot_idx;
+	uint16_t pkt_err = 0;
+	struct async_inflight_info *pkts_info = vq->async_pkts_info;
 	int n_pkts = 0;
 
 	avail_head = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE);
-	last_idx = vq->last_avail_idx;
-	shadow_idx = vq->shadow_used_idx;
 
 	/*
 	 * The ordering between avail index and
@@ -1565,14 +1534,14 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 		if (src_it->count) {
 			async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
 			pkt_burst_idx++;
-			vq->async_pending_info[slot_idx] =
-				num_buffers | (src_it->nr_segs << 16);
+			pkts_info[slot_idx].descs = num_buffers;
+			pkts_info[slot_idx].segs = src_it->nr_segs;
 			src_iovec += src_it->nr_segs;
 			dst_iovec += dst_it->nr_segs;
 			src_it += 2;
 			dst_it += 2;
 		} else {
-			vq->async_pending_info[slot_idx] = num_buffers;
+			pkts_info[slot_idx].info = num_buffers;
 			vq->async_pkts_inflight_n++;
 		}
 
@@ -1586,35 +1555,46 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
 			src_it = it_pool;
 			dst_it = it_pool + 1;
+			vq->async_pkts_inflight_n += n_pkts;
 
 			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
-				vq->async_pkts_inflight_n +=
-					n_pkts > 0 ? n_pkts : 0;
-				virtio_dev_rx_async_submit_split_err(dev,
-					vq, queue_id, last_idx, shadow_idx);
-				return 0;
+				/*
+				 * log error packets number here and do actual
+				 * error processing when applications poll
+				 * completion
+				 */
+				pkt_err = pkt_burst_idx - n_pkts;
+				pkt_burst_idx = 0;
+				break;
 			}
 
 			pkt_burst_idx = 0;
-			vq->async_pkts_inflight_n += n_pkts;
 		}
 	}
 
 	if (pkt_burst_idx) {
 		n_pkts = vq->async_ops.transfer_data(dev->vid,
 				queue_id, tdes, 0, pkt_burst_idx);
-		if (unlikely(n_pkts < (int)pkt_burst_idx)) {
-			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
-			virtio_dev_rx_async_submit_split_err(dev, vq, queue_id,
-				last_idx, shadow_idx);
-			return 0;
-		}
-
 		vq->async_pkts_inflight_n += n_pkts;
+
+		if (unlikely(n_pkts < (int)pkt_burst_idx))
+			pkt_err = pkt_burst_idx - n_pkts;
 	}
 
 	do_data_copy_enqueue(dev, vq);
 
+	if (unlikely(pkt_err)) {
+		do {
+			if (pkts_info[slot_idx].segs)
+				pkt_err--;
+			vq->last_avail_idx -= pkts_info[slot_idx].descs;
+			vq->shadow_used_idx -= pkts_info[slot_idx].descs;
+			vq->async_pkts_inflight_n--;
+			slot_idx--;
+			pkt_idx--;
+		} while (pkt_err);
+	}
+
 	n_free_slot = vq->size - vq->async_pkts_idx;
 	if (n_free_slot > pkt_idx) {
 		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
@@ -1640,10 +1620,10 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 {
 	struct virtio_net *dev = get_device(vid);
 	struct vhost_virtqueue *vq;
-	uint16_t n_segs_cpl, n_pkts_put = 0, n_descs = 0;
+	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
 	uint16_t n_inflight;
-	uint64_t *async_pending_info;
+	struct async_inflight_info *pkts_info;
 
 	if (!dev)
 		return 0;
@@ -1657,51 +1637,40 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 	vq = dev->virtqueue[queue_id];
 
+	if (unlikely(!vq->async_registered)) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
 	rte_spinlock_lock(&vq->access_lock);
 
 	n_inflight = vq->async_pkts_inflight_n;
 	pkts_idx = vq->async_pkts_idx;
-	async_pending_info = vq->async_pending_info;
+	pkts_info = vq->async_pkts_info;
 	vq_size = vq->size;
 	start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
 		vq_size, vq->async_pkts_inflight_n);
 
-	n_segs_cpl = vq->async_ops.check_completed_copies(vid, queue_id,
-		0, ASYNC_MAX_POLL_SEG - vq->async_last_seg_n) +
-		vq->async_last_seg_n;
+	if (count > vq->async_last_pkts_n)
+		n_pkts_cpl = vq->async_ops.check_completed_copies(vid,
+			queue_id, 0, count - vq->async_last_pkts_n);
+	n_pkts_cpl += vq->async_last_pkts_n;
 
 	rte_smp_wmb();
 
 	while (likely((n_pkts_put < count) && n_inflight)) {
-		uint64_t info = async_pending_info[
-			(start_idx + n_pkts_put) & (vq_size - 1)];
-		uint64_t n_segs;
+		uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
+		if (n_pkts_cpl && pkts_info[info_idx].segs)
+			n_pkts_cpl--;
+		else if (!n_pkts_cpl && pkts_info[info_idx].segs)
+			break;
 		n_pkts_put++;
 		n_inflight--;
-		n_descs += info & ASYNC_PENDING_INFO_N_MSK;
-		n_segs = info >> ASYNC_PENDING_INFO_N_SFT;
-
-		if (n_segs) {
-			if (unlikely(n_segs_cpl < n_segs)) {
-				n_pkts_put--;
-				n_inflight++;
-				n_descs -= info & ASYNC_PENDING_INFO_N_MSK;
-				if (n_segs_cpl) {
-					async_pending_info[
-						(start_idx + n_pkts_put) &
-						(vq_size - 1)] =
-					((n_segs - n_segs_cpl) <<
-					 ASYNC_PENDING_INFO_N_SFT) |
-					(info & ASYNC_PENDING_INFO_N_MSK);
-					n_segs_cpl = 0;
-				}
-				break;
-			}
-			n_segs_cpl -= n_segs;
-		}
+		n_descs += pkts_info[info_idx].descs;
 	}
 
-	vq->async_last_seg_n = n_segs_cpl;
+	vq->async_last_pkts_n = n_pkts_cpl;
 
 	if (n_pkts_put) {
 		vq->async_pkts_inflight_n = n_inflight;
@@ -1710,16 +1679,18 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 					n_descs, __ATOMIC_RELEASE);
 			vhost_vring_call_split(dev, vq);
 		}
-	}
 
-	if (start_idx + n_pkts_put <= vq_size) {
-		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-			n_pkts_put * sizeof(uintptr_t));
-	} else {
-		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-			(vq_size - start_idx) * sizeof(uintptr_t));
-		rte_memcpy(&pkts[vq_size - start_idx], vq->async_pkts_pending,
-			(n_pkts_put - vq_size + start_idx) * sizeof(uintptr_t));
+		if (start_idx + n_pkts_put <= vq_size) {
+			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
+				n_pkts_put * sizeof(uintptr_t));
+		} else {
+			rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
+				(vq_size - start_idx) * sizeof(uintptr_t));
+			rte_memcpy(&pkts[vq_size - start_idx],
+				vq->async_pkts_pending,
+				(n_pkts_put + start_idx - vq_size) *
+				sizeof(uintptr_t));
+		}
 	}
 
 	rte_spinlock_unlock(&vq->access_lock);