[dpdk-dev,v5,09/20] event/sw: add worker core functions

Message ID 1490374395-149320-10-git-send-email-harry.van.haaren@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Van Haaren, Harry March 24, 2017, 4:53 p.m. UTC
  From: Bruce Richardson <bruce.richardson@intel.com>

add the event enqueue, dequeue and release functions to the eventdev.
These also include tracking of stats for observability in the load of
the scheduler.
Internally in the enqueue function, the various types of enqueue
operations, to forward an existing event, to send a new event, to
drop a previous event, are converted to a series of flags which will
be used by the scheduler code to perform the needed actions for that
event.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Gage Eads <gage.eads@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
---
 drivers/event/sw/Makefile          |   1 +
 drivers/event/sw/sw_evdev.c        |   5 +
 drivers/event/sw/sw_evdev.h        |  32 +++++++
 drivers/event/sw/sw_evdev_worker.c | 188 +++++++++++++++++++++++++++++++++++++
 4 files changed, 226 insertions(+)
 create mode 100644 drivers/event/sw/sw_evdev_worker.c
  

Comments

Jerin Jacob March 27, 2017, 1:50 p.m. UTC | #1
On Fri, Mar 24, 2017 at 04:53:04PM +0000, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richardson@intel.com>
> 
> add the event enqueue, dequeue and release functions to the eventdev.
> These also include tracking of stats for observability in the load of
> the scheduler.
> Internally in the enqueue function, the various types of enqueue
> operations, to forward an existing event, to send a new event, to
> drop a previous event, are converted to a series of flags which will
> be used by the scheduler code to perform the needed actions for that
> event.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> ---
>  drivers/event/sw/Makefile          |   1 +
>  drivers/event/sw/sw_evdev.c        |   5 +
>  drivers/event/sw/sw_evdev.h        |  32 +++++++
>  drivers/event/sw/sw_evdev_worker.c | 188 +++++++++++++++++++++++++++++++++++++
>  4 files changed, 226 insertions(+)
>  create mode 100644 drivers/event/sw/sw_evdev_worker.c
> 
> diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
> index d6836e3..b6ecd91 100644
> --- a/drivers/event/sw/Makefile
> +++ b/drivers/event/sw/Makefile
> @@ -53,6 +53,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
>  
>  # library source files
>  SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
>  
>  # export include files
>  SYMLINK-y-include +=
> diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
> index 82ac3bd..9b2816d 100644
> --- a/drivers/event/sw/sw_evdev.c
> +++ b/drivers/event/sw/sw_evdev.c
> @@ -412,6 +412,7 @@ sw_dev_configure(const struct rte_eventdev *dev)
>  	sw->qid_count = conf->nb_event_queues;
>  	sw->port_count = conf->nb_event_ports;
>  	sw->nb_events_limit = conf->nb_events_limit;
> +	rte_atomic32_set(&sw->inflights, 0);
>  
>  	return 0;
>  }
> @@ -550,6 +551,10 @@ sw_probe(const char *name, const char *params)
>  		return -EFAULT;
>  	}
>  	dev->dev_ops = &evdev_sw_ops;
> +	dev->enqueue = sw_event_enqueue;
> +	dev->enqueue_burst = sw_event_enqueue_burst;
> +	dev->dequeue = sw_event_dequeue;
> +	dev->dequeue_burst = sw_event_dequeue_burst;

Is all the code in the sw_probe() valid for multi process? If not, after
function pointer assignment it can return[1] from sw_probe. Just like
another PMD's, we will support configuration API and fastpath API in primary
process and secondary process will be limited to fast path functions.

[1]
        if (rte_eal_process_type() != RTE_PROC_PRIMARY)
		return 0;

>  
>  	sw = dev->data->dev_private;
>  	sw->data = dev->data;
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index f5515e1..ab372fd 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -55,12 +55,36 @@
>  #define SCHED_DEQUEUE_BURST_SIZE 32
>  
> +
> +static inline void
> +sw_event_release(struct sw_port *p, uint8_t index)
> +{
> +	/*
> +	 * Drops the next outstanding event in our history. Used on dequeue
> +	 * to clear any history before dequeuing more events.
> +	 */
> +	RTE_SET_USED(index);
> +
> +	/* create drop message */
> +	struct rte_event ev = {
> +		.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
> +	};
> +
> +	uint16_t free_count;
> +	qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
> +
> +	/* each release returns one credit */
> +	p->outstanding_releases--;
> +	p->inflight_credits++;
> +}
> +
> +uint16_t
> +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
> +{
> +	int32_t i;
> +	uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
> +	struct sw_port *p = port;
> +	struct sw_evdev *sw = (void *)p->sw;
> +	uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
> +
> +	if (p->inflight_max < sw_inflights)
> +		return 0;

likely and unlikely attributes are missing in fastpath functions.
Worth to consider in using those in worker file.

> +	if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
> +		num = PORT_ENQUEUE_MAX_BURST_SIZE;
> +
> +	if (p->inflight_credits < num) {
> +		/* Check if sending events would bring instance over the
> +		 * max events threshold
> +		 */
> +		uint32_t credit_update_quanta = sw->credit_update_quanta;
> +		if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
> +			return 0;
> +
> +		rte_atomic32_add(&sw->inflights, credit_update_quanta);
> +		p->inflight_credits += (credit_update_quanta);
> +
> +		if (p->inflight_credits < num)
> +			return 0;
> +	}
> +
> +	for (i = 0; i < num; i++) {
> +		int op = ev[i].op;
> +		int outstanding = p->outstanding_releases > 0;
> +		const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
> +
> +		p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
> +		p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
> +					outstanding;
> +
> +		new_ops[i] = sw_qe_flag_map[op];
> +		new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
> +
> +		/* FWD and RELEASE packets will both resolve to taken (assuming
> +		 * correct usage of the API), providing very high correct
> +		 * prediction rate.
> +		 */
> +		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
> +			p->outstanding_releases--;
> +		/* Branch to avoid touching p->stats except error case */
> +		if (invalid_qid)
> +			p->stats.rx_dropped++;
> +	}
> +
> +	/* returns number of events actually enqueued */
> +	uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
> +					     new_ops);
> +	if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
> +		uint64_t burst_ticks = rte_get_timer_cycles() -
> +				p->last_dequeue_ticks;
> +		uint64_t burst_pkt_ticks =
> +			burst_ticks / p->last_dequeue_burst_sz;
> +		p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES;
> +		p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
> +		p->last_dequeue_ticks = 0;
> +	}
> +	return enq;
> +}
> +
> +uint16_t
> +sw_event_enqueue(void *port, const struct rte_event *ev)
> +{
> +	return sw_event_enqueue_burst(port, ev, 1);
> +}
> +
> +uint16_t
> +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
> +		uint64_t wait)
> +{
> +	RTE_SET_USED(wait);
> +	struct sw_port *p = (void *)port;
> +	struct sw_evdev *sw = (void *)p->sw;
> +	struct qe_ring *ring = p->cq_worker_ring;
> +	uint32_t credit_update_quanta = sw->credit_update_quanta;
> +
> +	/* check that all previous dequeues have been released */
> +	if (!p->is_directed) {
> +		uint16_t out_rels = p->outstanding_releases;
> +		uint16_t i;
> +		for (i = 0; i < out_rels; i++)
> +			sw_event_release(p, i);
> +	}
> +
> +	/* Intel modification: may not be in final API */
> +	if (ev == 0)
> +		return 0;

May be we can remove this one in fastpath. Maybe under DEBUG in common code
we can add this.

> +
> +	/* returns number of events actually dequeued */
> +	uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
> +	if (ndeq == 0) {
> +		p->outstanding_releases = 0;
> +		p->zero_polls++;
> +		p->total_polls++;
> +		goto end;
> +	}
> +
> +	/* only add credits for directed ports - LB ports send RELEASEs */
> +	p->inflight_credits += ndeq * p->is_directed;
> +	p->outstanding_releases = ndeq;
> +	p->last_dequeue_burst_sz = ndeq;
> +	p->last_dequeue_ticks = rte_get_timer_cycles();
> +	p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
> +	p->total_polls++;
> +
> +end:
> +	if (p->inflight_credits >= credit_update_quanta * 2 &&
> +			p->inflight_credits > credit_update_quanta + ndeq) {
> +		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
> +		p->inflight_credits -= credit_update_quanta;
> +	}
> +	return ndeq;
> +}
> +
> +uint16_t
> +sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait)
> +{
> +	return sw_event_dequeue_burst(port, ev, 1, wait);
> +}
> -- 
> 2.7.4
>
  
Van Haaren, Harry March 28, 2017, 4:17 p.m. UTC | #2
> From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> Sent: Monday, March 27, 2017 2:51 PM
> To: Van Haaren, Harry <harry.van.haaren@intel.com>
> Cc: dev@dpdk.org; Richardson, Bruce <bruce.richardson@intel.com>; Eads, Gage
> <gage.eads@intel.com>
> Subject: Re: [PATCH v5 09/20] event/sw: add worker core functions
> 
> On Fri, Mar 24, 2017 at 04:53:04PM +0000, Harry van Haaren wrote:
> > From: Bruce Richardson <bruce.richardson@intel.com>
> >
> > add the event enqueue, dequeue and release functions to the eventdev.
> > These also include tracking of stats for observability in the load of
> > the scheduler.
> > Internally in the enqueue function, the various types of enqueue
> > operations, to forward an existing event, to send a new event, to
> > drop a previous event, are converted to a series of flags which will
> > be used by the scheduler code to perform the needed actions for that
> > event.
> >
> > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
> > Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> > ---
> >  drivers/event/sw/Makefile          |   1 +
> >  drivers/event/sw/sw_evdev.c        |   5 +
> >  drivers/event/sw/sw_evdev.h        |  32 +++++++
> >  drivers/event/sw/sw_evdev_worker.c | 188 +++++++++++++++++++++++++++++++++++++
> >  4 files changed, 226 insertions(+)
> >  create mode 100644 drivers/event/sw/sw_evdev_worker.c
> >

<snip>

> > @@ -550,6 +551,10 @@ sw_probe(const char *name, const char *params)
> >  		return -EFAULT;
> >  	}
> >  	dev->dev_ops = &evdev_sw_ops;
> > +	dev->enqueue = sw_event_enqueue;
> > +	dev->enqueue_burst = sw_event_enqueue_burst;
> > +	dev->dequeue = sw_event_dequeue;
> > +	dev->dequeue_burst = sw_event_dequeue_burst;
> 
> Is all the code in the sw_probe() valid for multi process? If not, after
> function pointer assignment it can return[1] from sw_probe. Just like
> another PMD's, we will support configuration API and fastpath API in primary
> process and secondary process will be limited to fast path functions.
> 
> [1]
>         if (rte_eal_process_type() != RTE_PROC_PRIMARY)
> 		return 0;


Yes, will be fixed in v6.


> >  	sw = dev->data->dev_private;
> >  	sw->data = dev->data;
> > diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> > index f5515e1..ab372fd 100644
> > --- a/drivers/event/sw/sw_evdev.h
> > +++ b/drivers/event/sw/sw_evdev.h
> > @@ -55,12 +55,36 @@
> >  #define SCHED_DEQUEUE_BURST_SIZE 32
> >
> > +
> > +static inline void
> > +sw_event_release(struct sw_port *p, uint8_t index)
> > +{
> > +	/*
> > +	 * Drops the next outstanding event in our history. Used on dequeue
> > +	 * to clear any history before dequeuing more events.
> > +	 */
> > +	RTE_SET_USED(index);
> > +
> > +	/* create drop message */
> > +	struct rte_event ev = {
> > +		.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
> > +	};
> > +
> > +	uint16_t free_count;
> > +	qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
> > +
> > +	/* each release returns one credit */
> > +	p->outstanding_releases--;
> > +	p->inflight_credits++;
> > +}
> > +
> > +uint16_t
> > +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
> > +{
> > +	int32_t i;
> > +	uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
> > +	struct sw_port *p = port;
> > +	struct sw_evdev *sw = (void *)p->sw;
> > +	uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
> > +
> > +	if (p->inflight_max < sw_inflights)
> > +		return 0;
> 
> likely and unlikely attributes are missing in fastpath functions.
> Worth to consider in using those in worker file.


Initial (obvious ones) done in v6. Perhaps a candidate for future patches after initial merge, as these are only performance improvements, not functional.


> > +uint16_t
> > +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
> > +		uint64_t wait)
> > +{
> > +	RTE_SET_USED(wait);
> > +	struct sw_port *p = (void *)port;
> > +	struct sw_evdev *sw = (void *)p->sw;
> > +	struct qe_ring *ring = p->cq_worker_ring;
> > +	uint32_t credit_update_quanta = sw->credit_update_quanta;
> > +
> > +	/* check that all previous dequeues have been released */
> > +	if (!p->is_directed) {
> > +		uint16_t out_rels = p->outstanding_releases;
> > +		uint16_t i;
> > +		for (i = 0; i < out_rels; i++)
> > +			sw_event_release(p, i);
> > +	}
> > +
> > +	/* Intel modification: may not be in final API */
> > +	if (ev == 0)
> > +		return 0;
> 
> May be we can remove this one in fastpath. Maybe under DEBUG in common code
> we can add this.


Done, removed in v6.


<snip to end>
  

Patch

diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
index d6836e3..b6ecd91 100644
--- a/drivers/event/sw/Makefile
+++ b/drivers/event/sw/Makefile
@@ -53,6 +53,7 @@  EXPORT_MAP := rte_pmd_evdev_sw_version.map
 
 # library source files
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
 
 # export include files
 SYMLINK-y-include +=
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index 82ac3bd..9b2816d 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -412,6 +412,7 @@  sw_dev_configure(const struct rte_eventdev *dev)
 	sw->qid_count = conf->nb_event_queues;
 	sw->port_count = conf->nb_event_ports;
 	sw->nb_events_limit = conf->nb_events_limit;
+	rte_atomic32_set(&sw->inflights, 0);
 
 	return 0;
 }
@@ -550,6 +551,10 @@  sw_probe(const char *name, const char *params)
 		return -EFAULT;
 	}
 	dev->dev_ops = &evdev_sw_ops;
+	dev->enqueue = sw_event_enqueue;
+	dev->enqueue_burst = sw_event_enqueue_burst;
+	dev->dequeue = sw_event_dequeue;
+	dev->dequeue_burst = sw_event_dequeue_burst;
 
 	sw = dev->data->dev_private;
 	sw->data = dev->data;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index f5515e1..ab372fd 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -55,12 +55,36 @@ 
 #define SCHED_DEQUEUE_BURST_SIZE 32
 
 #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */
+#define NUM_SAMPLES 64 /* how many data points use for average stats */
 
 #define EVENTDEV_NAME_SW_PMD event_sw
 #define SW_PMD_NAME RTE_STR(event_sw)
 
 #define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
 
+enum {
+	QE_FLAG_VALID_SHIFT = 0,
+	QE_FLAG_COMPLETE_SHIFT,
+	QE_FLAG_NOT_EOP_SHIFT,
+	_QE_FLAG_COUNT
+};
+
+#define QE_FLAG_VALID    (1 << QE_FLAG_VALID_SHIFT)    /* for NEW FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP  */
+#define QE_FLAG_NOT_EOP  (1 << QE_FLAG_NOT_EOP_SHIFT)  /* set for FRAG only  */
+
+static const uint8_t sw_qe_flag_map[] = {
+		QE_FLAG_VALID /* NEW Event */,
+		QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */,
+		QE_FLAG_COMPLETE /* RELEASE Event */,
+
+		/* Values which can be used for future support for partial
+		 * events, i.e. where one event comes back to the scheduler
+		 * as multiple which need to be tracked together
+		 */
+		QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
 #ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG
 #define SW_LOG_INFO(fmt, args...) \
 	RTE_LOG(INFO, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \
@@ -241,4 +265,12 @@  sw_pmd_priv_const(const struct rte_eventdev *eventdev)
 	return eventdev->data->dev_private;
 }
 
+uint16_t sw_event_enqueue(void *port, const struct rte_event *ev);
+uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[],
+		uint16_t num);
+
+uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
+			uint64_t wait);
+
 #endif /* _SW_EVDEV_H_ */
diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
new file mode 100644
index 0000000..aed1597
--- /dev/null
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -0,0 +1,188 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+
+#include "sw_evdev.h"
+#include "event_ring.h"
+
+#define PORT_ENQUEUE_MAX_BURST_SIZE 64
+
+static inline void
+sw_event_release(struct sw_port *p, uint8_t index)
+{
+	/*
+	 * Drops the next outstanding event in our history. Used on dequeue
+	 * to clear any history before dequeuing more events.
+	 */
+	RTE_SET_USED(index);
+
+	/* create drop message */
+	struct rte_event ev = {
+		.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
+	};
+
+	uint16_t free_count;
+	qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
+
+	/* each release returns one credit */
+	p->outstanding_releases--;
+	p->inflight_credits++;
+}
+
+uint16_t
+sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
+{
+	int32_t i;
+	uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
+	struct sw_port *p = port;
+	struct sw_evdev *sw = (void *)p->sw;
+	uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
+
+	if (p->inflight_max < sw_inflights)
+		return 0;
+	if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
+		num = PORT_ENQUEUE_MAX_BURST_SIZE;
+
+	if (p->inflight_credits < num) {
+		/* Check if sending events would bring instance over the
+		 * max events threshold
+		 */
+		uint32_t credit_update_quanta = sw->credit_update_quanta;
+		if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
+			return 0;
+
+		rte_atomic32_add(&sw->inflights, credit_update_quanta);
+		p->inflight_credits += (credit_update_quanta);
+
+		if (p->inflight_credits < num)
+			return 0;
+	}
+
+	for (i = 0; i < num; i++) {
+		int op = ev[i].op;
+		int outstanding = p->outstanding_releases > 0;
+		const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
+
+		p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
+		p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
+					outstanding;
+
+		new_ops[i] = sw_qe_flag_map[op];
+		new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
+
+		/* FWD and RELEASE packets will both resolve to taken (assuming
+		 * correct usage of the API), providing very high correct
+		 * prediction rate.
+		 */
+		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
+			p->outstanding_releases--;
+		/* Branch to avoid touching p->stats except error case */
+		if (invalid_qid)
+			p->stats.rx_dropped++;
+	}
+
+	/* returns number of events actually enqueued */
+	uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
+					     new_ops);
+	if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
+		uint64_t burst_ticks = rte_get_timer_cycles() -
+				p->last_dequeue_ticks;
+		uint64_t burst_pkt_ticks =
+			burst_ticks / p->last_dequeue_burst_sz;
+		p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES;
+		p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
+		p->last_dequeue_ticks = 0;
+	}
+	return enq;
+}
+
+uint16_t
+sw_event_enqueue(void *port, const struct rte_event *ev)
+{
+	return sw_event_enqueue_burst(port, ev, 1);
+}
+
+uint16_t
+sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
+		uint64_t wait)
+{
+	RTE_SET_USED(wait);
+	struct sw_port *p = (void *)port;
+	struct sw_evdev *sw = (void *)p->sw;
+	struct qe_ring *ring = p->cq_worker_ring;
+	uint32_t credit_update_quanta = sw->credit_update_quanta;
+
+	/* check that all previous dequeues have been released */
+	if (!p->is_directed) {
+		uint16_t out_rels = p->outstanding_releases;
+		uint16_t i;
+		for (i = 0; i < out_rels; i++)
+			sw_event_release(p, i);
+	}
+
+	/* Intel modification: may not be in final API */
+	if (ev == 0)
+		return 0;
+
+	/* returns number of events actually dequeued */
+	uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
+	if (ndeq == 0) {
+		p->outstanding_releases = 0;
+		p->zero_polls++;
+		p->total_polls++;
+		goto end;
+	}
+
+	/* only add credits for directed ports - LB ports send RELEASEs */
+	p->inflight_credits += ndeq * p->is_directed;
+	p->outstanding_releases = ndeq;
+	p->last_dequeue_burst_sz = ndeq;
+	p->last_dequeue_ticks = rte_get_timer_cycles();
+	p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
+	p->total_polls++;
+
+end:
+	if (p->inflight_credits >= credit_update_quanta * 2 &&
+			p->inflight_credits > credit_update_quanta + ndeq) {
+		rte_atomic32_sub(&sw->inflights, credit_update_quanta);
+		p->inflight_credits -= credit_update_quanta;
+	}
+	return ndeq;
+}
+
+uint16_t
+sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait)
+{
+	return sw_event_dequeue_burst(port, ev, 1, wait);
+}