[dpdk-dev,3/4] eventdev: Add eventdev ethernet Rx adapter

Message ID 1505328781-23456-1-git-send-email-nikhil.rao@intel.com
State New
Headers show

Checks

Context Check Description
checkpatch warning coding style issues
Intel-compilation fail apply patch file failure

Commit Message

Nikhil Rao Sept. 13, 2017, 6:53 p.m.
Add common APIs for configuring packet transfer from ethernet Rx
queues to event devices across HW & SW packet transfer mechanisms.
A detailed description of the adapter is contained in the header's
comments.

The adapter implementation uses eventdev PMDs to configure the packet
transfer if HW support is available and if not, it uses an EAL service
function that reads packets from ethernet Rx queues and injects these
as events into the event device.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
Signed-off-by: Gage Eads <gage.eads@intel.com>
Signed-off-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.h |  386 ++++++++
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 1239 ++++++++++++++++++++++++
 lib/Makefile                                   |    2 +-
 lib/librte_eventdev/Makefile                   |    2 +
 lib/librte_eventdev/rte_eventdev_version.map   |   11 +-
 5 files changed, 1638 insertions(+), 2 deletions(-)
 create mode 100644 lib/librte_eventdev/rte_event_eth_rx_adapter.h
 create mode 100644 lib/librte_eventdev/rte_event_eth_rx_adapter.c

Comments

Nipun Gupta Sept. 15, 2017, 6:07 a.m.
> -----Original Message-----
> From: Nikhil Rao [mailto:nikhil.rao@intel.com]
> Sent: Thursday, September 14, 2017 0:23
> To: jerin.jacob@caviumnetworks.com; bruce.richardson@intel.com
> Cc: gage.eads@intel.com; dev@dpdk.org; thomas@monjalon.net;
> harry.van.haaren@intel.com; Hemant Agrawal <hemant.agrawal@nxp.com>;
> Nipun Gupta <nipun.gupta@nxp.com>; narender.vangati@intel.com;
> erik.g.carrillo@intel.com; abhinandan.gujjar@intel.com
> Subject: [PATCH 3/4] eventdev: Add eventdev ethernet Rx adapter
> 
> Add common APIs for configuring packet transfer from ethernet Rx
> queues to event devices across HW & SW packet transfer mechanisms.
> A detailed description of the adapter is contained in the header's
> comments.
> 
> The adapter implementation uses eventdev PMDs to configure the packet
> transfer if HW support is available and if not, it uses an EAL service
> function that reads packets from ethernet Rx queues and injects these
> as events into the event device.
> 
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Signed-off-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>
> ---
>  lib/librte_eventdev/rte_event_eth_rx_adapter.h |  386 ++++++++
>  lib/librte_eventdev/rte_event_eth_rx_adapter.c | 1239
> ++++++++++++++++++++++++
>  lib/Makefile                                   |    2 +-
>  lib/librte_eventdev/Makefile                   |    2 +
>  lib/librte_eventdev/rte_eventdev_version.map   |   11 +-
>  5 files changed, 1638 insertions(+), 2 deletions(-)
>  create mode 100644 lib/librte_eventdev/rte_event_eth_rx_adapter.h
>  create mode 100644 lib/librte_eventdev/rte_event_eth_rx_adapter.c
> 
> +
> +static inline void
> +fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
> +	uint8_t dev_id,
> +	uint16_t rx_queue_id,
> +	struct rte_mbuf **mbufs,
> +	uint16_t num)
> +{
> +	uint32_t i;
> +	struct eth_device_info *eth_device_info =
> +					&rx_adapter->eth_devices[dev_id];
> +	struct eth_rx_queue_info *eth_rx_queue_info =
> +					&eth_device_info-
> >rx_queue[rx_queue_id];
> +
> +	int32_t qid = eth_rx_queue_info->event_queue_id;
> +	uint8_t sched_type = eth_rx_queue_info->sched_type;
> +	uint8_t priority = eth_rx_queue_info->priority;
> +	uint32_t flow_id;
> +	struct rte_event events[BATCH_SIZE];
> +	struct rte_mbuf *m = mbufs[0];
> +	uint32_t rss_mask;
> +	uint32_t rss;
> +	int do_rss;
> +
> +	/* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
> +	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
> +	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
> +
> +	for (i = 0; i < num; i++) {
> +		m = mbufs[i];
> +		struct rte_event *ev = &events[i];
> +
> +		rss = do_rss ? do_softrss(m) : m->hash.rss;
> +		flow_id =
> +		    eth_rx_queue_info->flow_id &
> +				eth_rx_queue_info->flow_id_mask;
> +		flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
> +
> +		ev->flow_id = flow_id;
> +		ev->op = RTE_EVENT_OP_NEW;
> +		ev->sched_type = sched_type;
> +		ev->queue_id = qid;
> +		ev->event_type = RTE_EVENT_TYPE_ETHDEV;
> +		ev->sub_event_type = 0;
> +		ev->priority = priority;
> +		ev->mbuf = m;

How will the application get to know about the queue and eth_port from where the
packet is received by the eventdev? Is it the user responsibility to have that information
in the flow_id?

> +
> +		buf_event_enqueue(rx_adapter, ev);
> +	}
> +}
> +

<snip>

> +int
> +rte_event_eth_rx_adapter_queue_add(uint8_t id,
> +		uint8_t eth_dev_id,
> +		int32_t rx_queue_id,
> +		const struct rte_event_eth_rx_adapter_queue_conf
> *queue_conf)
> +{
> +	int ret;
> +	uint32_t rx_adapter_cap;
> +	struct rte_event_eth_rx_adapter *rx_adapter;
> +	struct rte_eventdev *dev;
> +	struct eth_device_info *dev_info;
> +	int start_service = 0;
> +
> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
> +
> +	rx_adapter = id_to_rx_adapter(id);
> +	if (!rx_adapter || !queue_conf)
> +		return -EINVAL;
> +
> +	dev = &rte_eventdevs[rx_adapter->eventdev_id];
> +	ret = (*dev->dev_ops->eth_rx_adapter_caps_get)(dev, eth_dev_id,
> +						&rx_adapter_cap);
> +	if (ret) {
> +		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
> +			"eth port %" PRIu8, id, eth_dev_id);
> +		return ret;
> +	}
> +
> +	if (!(rx_adapter_cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_FLOW_ID)
> &&
> +		!(queue_conf->rx_queue_flags &
> +
> 	RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
> +		RTE_EDEV_LOG_ERR("Flow ID required for configuration,"
> +				" eth port: %" PRIu8 " adapter id: %" PRIu8,
> +				eth_dev_id, id);
> +		return -EINVAL;
> +	}
> +
> +	if ((rx_adapter_cap &
> RTE_EVENT_ETH_RX_ADAPTER_CAP_SINGLE_EVENTQ) &&
> +		(rx_queue_id != -1)) {
> +		RTE_EDEV_LOG_ERR("Rx queues can only be connected to
> single "
> +			"event queue id %u eth port %u", id, eth_dev_id);
> +		return -EINVAL;
> +	}
> +
> +	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
> +			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
> +		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
> +			 (uint16_t)rx_queue_id);
> +		return -EINVAL;
> +	}
> +
> +	start_service = 0;
> +	dev_info = &rx_adapter->eth_devices[eth_dev_id];
> +
> +	if (rx_adapter_cap &
> RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
> +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops-
> >eth_rx_adapter_queue_add,
> +					-ENOTSUP);
> +		if (!dev_info->rx_queue) {
> +			dev_info->rx_queue =
> +			    rte_zmalloc_socket(rx_adapter->mem_name,
> +					dev_info->dev->data->nb_rx_queues *
> +					sizeof(struct eth_rx_queue_info), 0,
> +					rx_adapter->socket_id);
> +			if (!dev_info->rx_queue)
> +				return -ENOMEM;
> +		}
> +
> +		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
> eth_dev_id,
> +				rx_queue_id, queue_conf);

IMO, Instead of eth_dev_id rte_eth_dev parameter should be passed to the underlying driver.
Otherwise the underlying driver needs to use rte_eth_devices[] global variable to fetch the device.
This does not seems very right to me.

> +		if (!ret) {
> +			update_queue_info(rx_adapter,
> +					&rx_adapter-
> >eth_devices[eth_dev_id],
> +					rx_queue_id,
> +					1);
> +		}
> +	} else {
> +		rte_spinlock_lock(&rx_adapter->rx_lock);
> +		ret = init_service(rx_adapter, id);
> +		if (!ret)
> +			ret = add_rx_queue(rx_adapter, eth_dev_id,
> rx_queue_id,
> +					queue_conf);
> +		rte_spinlock_unlock(&rx_adapter->rx_lock);
> +		if (!ret)
> +			start_service =
> !!sw_rx_adapter_queue_count(rx_adapter);
> +	}
> +
> +	if (ret)
> +		return ret;
> +
> +	if (start_service)
> +		rte_service_component_runstate_set(rx_adapter->service_id,
> 1);
> +
> +	return 0;
> +}
> +
> +int
> +rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
> +				int32_t rx_queue_id)
> +{
> +	int ret = 0;
> +	struct rte_eventdev *dev;
> +	struct rte_event_eth_rx_adapter *rx_adapter;
> +	struct eth_device_info *dev_info;
> +	uint32_t rx_adapter_cap;
> +	uint16_t i;
> +
> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
> +
> +	rx_adapter = id_to_rx_adapter(id);
> +	if (!rx_adapter)
> +		return -EINVAL;
> +
> +	dev = &rte_eventdevs[rx_adapter->eventdev_id];
> +	ret = dev->dev_ops->eth_rx_adapter_caps_get(dev, eth_dev_id,
> +						&rx_adapter_cap);
> +	if (ret)
> +		return ret;
> +
> +	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
> +		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
> +		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
> +			 (uint16_t)rx_queue_id);
> +		return -EINVAL;
> +	}
> +
> +	dev_info = &rx_adapter->eth_devices[eth_dev_id];
> +
> +	if (rx_adapter_cap &
> RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
> +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops-
> >eth_rx_adapter_queue_del,
> +				 -ENOTSUP);
> +		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
> eth_dev_id,
> +							rx_queue_id);

Here too rte_eth_device shall be passed.

> +		if (!ret) {
> +			update_queue_info(rx_adapter,
> +					&rx_adapter-
> >eth_devices[eth_dev_id],
> +					rx_queue_id,
> +					0);
> +			if (!dev_info->nb_dev_queues) {
> +				rte_free(dev_info->rx_queue);
> +				dev_info->rx_queue = NULL;
> +			}
> +		}
> +	} else {
> +		int rc;
> +		rte_spinlock_lock(&rx_adapter->rx_lock);
> +		if (rx_queue_id == -1) {
> +			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
> +
> 	_rte_event_eth_rx_adapter_queue_del(rx_adapter,
> +								dev_info,
> +								i);
> +		} else {
> +			_rte_event_eth_rx_adapter_queue_del(rx_adapter,
> +							dev_info,
> +
> 	(uint16_t)rx_queue_id);
> +		}
> +
> +		rc = eth_poll_wrr_calc(rx_adapter);
> +		if (rc)
> +			RTE_EDEV_LOG_ERR("WRR recalculation failed %"
> PRId32,
> +					rc);
> +
> +		if (!dev_info->nb_dev_queues) {
> +			rte_free(dev_info->rx_queue);
> +			dev_info->rx_queue = NULL;
> +		}
> +
> +		rte_spinlock_unlock(&rx_adapter->rx_lock);
> +		rte_service_component_runstate_set(rx_adapter->service_id,
> +				sw_rx_adapter_queue_count(rx_adapter));
> +	}
> +
> +	return ret;
> +}
> +
> +
> +int
> +rte_event_eth_rx_adapter_start(uint8_t id)
> +{
> +	return rx_adapter_ctrl(id, 1);
> +}
> +
> +int
> +rte_event_eth_rx_adapter_stop(uint8_t id)
> +{
> +	return rx_adapter_ctrl(id, 0);
> +}
> +
> +int
> +rte_event_eth_rx_adapter_stats_get(uint8_t id,
> +			       struct rte_event_eth_rx_adapter_stats *stats)
> +{
> +	struct rte_event_eth_rx_adapter *rx_adapter;
> +	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
> +	struct rte_event_eth_rx_adapter_stats dev_stats;
> +	struct rte_eventdev *dev;
> +	struct eth_device_info *dev_info;
> +	uint32_t i;
> +	int ret;
> +
> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
> +
> +	rx_adapter = id_to_rx_adapter(id);
> +	if (!rx_adapter || !stats)
> +		return -EINVAL;
> +
> +	dev = &rte_eventdevs[rx_adapter->eventdev_id];
> +	memset(stats, 0, sizeof(*stats));
> +	for (i = 0; i < rte_eth_dev_count(); i++) {
> +		dev_info = &rx_adapter->eth_devices[i];
> +		if (!dev_info->internal_event_port)
> +			continue;
> +		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev, i,
> +						&dev_stats);

Preferable to check if the adapter stats get and stats set API are registered from the driver.

Regards,
Nipun
Santosh Shukla Sept. 15, 2017, 6:10 a.m.
Hi Nikhil,


On Thursday 14 September 2017 12:23 AM, Nikhil Rao wrote:
> Add common APIs for configuring packet transfer from ethernet Rx
> queues to event devices across HW & SW packet transfer mechanisms.
> A detailed description of the adapter is contained in the header's
> comments.
>
> The adapter implementation uses eventdev PMDs to configure the packet
> transfer if HW support is available and if not, it uses an EAL service
> function that reads packets from ethernet Rx queues and injects these
> as events into the event device.
>
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Signed-off-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>
> ---

Can you pl. send whole series as v4.
Thanks.
Nikhil Rao Sept. 15, 2017, 11:10 a.m.
On 9/15/2017 11:40 AM, santosh wrote:
> Hi Nikhil,
> 
> 
> On Thursday 14 September 2017 12:23 AM, Nikhil Rao wrote:
>> Add common APIs for configuring packet transfer from ethernet Rx
>> queues to event devices across HW & SW packet transfer mechanisms.
>> A detailed description of the adapter is contained in the header's
>> comments.
>>
>> The adapter implementation uses eventdev PMDs to configure the packet
>> transfer if HW support is available and if not, it uses an EAL service
>> function that reads packets from ethernet Rx queues and injects these
>> as events into the event device.
>>
>> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
>> Signed-off-by: Gage Eads <gage.eads@intel.com>
>> Signed-off-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>
>> ---
> 
> Can you pl. send whole series as v4.
Is that because the patch fails to compile ?

As I mentioned in the cover letter, this patch does have a dependency on 
the latest service core API
http://dpdk.org/ml/archives/dev/2017-August/073102.html

Nikhil

> Thanks.
> 
>
Santosh Shukla Sept. 15, 2017, 2:26 p.m.
On Friday 15 September 2017 04:40 PM, Rao, Nikhil wrote:
> On 9/15/2017 11:40 AM, santosh wrote:
>> Hi Nikhil,
>>
>>
>> On Thursday 14 September 2017 12:23 AM, Nikhil Rao wrote:
>>> Add common APIs for configuring packet transfer from ethernet Rx
>>> queues to event devices across HW & SW packet transfer mechanisms.
>>> A detailed description of the adapter is contained in the header's
>>> comments.
>>>
>>> The adapter implementation uses eventdev PMDs to configure the packet
>>> transfer if HW support is available and if not, it uses an EAL service
>>> function that reads packets from ethernet Rx queues and injects these
>>> as events into the event device.
>>>
>>> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
>>> Signed-off-by: Gage Eads <gage.eads@intel.com>
>>> Signed-off-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>
>>> ---
>>
>> Can you pl. send whole series as v4.
> Is that because the patch fails to compile ?
>
> As I mentioned in the cover letter, this patch does have a dependency on the latest service core API
> http://dpdk.org/ml/archives/dev/2017-August/073102.html
>
while reviewing I'm not sure [3/4] belongs to v1 or lartest v3,
patch by date seems latest if so then deserve v4. Its upto you to consider suggestion!

> Nikhil
>
>> Thanks.
>>
>>
>
Nikhil Rao Sept. 18, 2017, 4:54 a.m.
On 9/15/2017 11:37 AM, Nipun Gupta wrote:
> 
> 
>> -----Original Message-----
>> From: Nikhil Rao [mailto:nikhil.rao@intel.com]
<snip>
>> +static inline void
>> +fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
>> +	uint8_t dev_id,
>> +	uint16_t rx_queue_id,
>> +	struct rte_mbuf **mbufs,
>> +	uint16_t num)
>> +{
>> +	uint32_t i;
>> +	struct eth_device_info *eth_device_info =
>> +					&rx_adapter->eth_devices[dev_id];
>> +	struct eth_rx_queue_info *eth_rx_queue_info =
>> +					&eth_device_info-
>>> rx_queue[rx_queue_id];
>> +
>> +	int32_t qid = eth_rx_queue_info->event_queue_id;
>> +	uint8_t sched_type = eth_rx_queue_info->sched_type;
>> +	uint8_t priority = eth_rx_queue_info->priority;
>> +	uint32_t flow_id;
>> +	struct rte_event events[BATCH_SIZE];
>> +	struct rte_mbuf *m = mbufs[0];
>> +	uint32_t rss_mask;
>> +	uint32_t rss;
>> +	int do_rss;
>> +
>> +	/* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
>> +	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>> +	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
>> +
>> +	for (i = 0; i < num; i++) {
>> +		m = mbufs[i];
>> +		struct rte_event *ev = &events[i];
>> +
>> +		rss = do_rss ? do_softrss(m) : m->hash.rss;
>> +		flow_id =
>> +		    eth_rx_queue_info->flow_id &
>> +				eth_rx_queue_info->flow_id_mask;
>> +		flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
>> +
>> +		ev->flow_id = flow_id;
>> +		ev->op = RTE_EVENT_OP_NEW;
>> +		ev->sched_type = sched_type;
>> +		ev->queue_id = qid;
>> +		ev->event_type = RTE_EVENT_TYPE_ETHDEV;
>> +		ev->sub_event_type = 0;
>> +		ev->priority = priority;
>> +		ev->mbuf = m;
> 
> How will the application get to know about the queue and eth_port from where the
> packet is received by the eventdev? Is it the user responsibility to have that information
> in the flow_id?
queue: If the application needs to know the queue ID, it would have to 
set the flow ID to the
queue ID at the time of adding the queue.

eth port: input port contained in in the mbuf.

>> +
>> +		buf_event_enqueue(rx_adapter, ev);
>> +	}
>> +}
>> +
> 
> <snip>
> 
>> +int
>> +rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> +		uint8_t eth_dev_id,
>> +		int32_t rx_queue_id,
>> +		const struct rte_event_eth_rx_adapter_queue_conf
>> *queue_conf)
>> +{
>> +	int ret;
>> +	uint32_t rx_adapter_cap;
>> +	struct rte_event_eth_rx_adapter *rx_adapter;
>> +	struct rte_eventdev *dev;
>> +	struct eth_device_info *dev_info;
>> +	int start_service = 0;
>> +
>> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
>> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
>> +
>> +	rx_adapter = id_to_rx_adapter(id);
>> +	if (!rx_adapter || !queue_conf)
>> +		return -EINVAL;
>> +
>> +	dev = &rte_eventdevs[rx_adapter->eventdev_id];
>> +	ret = (*dev->dev_ops->eth_rx_adapter_caps_get)(dev, eth_dev_id,
>> +						&rx_adapter_cap);
>> +	if (ret) {
>> +		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
>> +			"eth port %" PRIu8, id, eth_dev_id);
>> +		return ret;
>> +	}
>> +
>> +	if (!(rx_adapter_cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_FLOW_ID)
>> &&
>> +		!(queue_conf->rx_queue_flags &
>> +
>> 	RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
>> +		RTE_EDEV_LOG_ERR("Flow ID required for configuration,"
>> +				" eth port: %" PRIu8 " adapter id: %" PRIu8,
>> +				eth_dev_id, id);
>> +		return -EINVAL;
>> +	}
>> +
>> +	if ((rx_adapter_cap &
>> RTE_EVENT_ETH_RX_ADAPTER_CAP_SINGLE_EVENTQ) &&
>> +		(rx_queue_id != -1)) {
>> +		RTE_EDEV_LOG_ERR("Rx queues can only be connected to
>> single "
>> +			"event queue id %u eth port %u", id, eth_dev_id);
>> +		return -EINVAL;
>> +	}
>> +
>> +	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
>> +			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
>> +		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
>> +			 (uint16_t)rx_queue_id);
>> +		return -EINVAL;
>> +	}
>> +
>> +	start_service = 0;
>> +	dev_info = &rx_adapter->eth_devices[eth_dev_id];
>> +
>> +	if (rx_adapter_cap &
>> RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
>> +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops-
>>> eth_rx_adapter_queue_add,
>> +					-ENOTSUP);
>> +		if (!dev_info->rx_queue) {
>> +			dev_info->rx_queue =
>> +			    rte_zmalloc_socket(rx_adapter->mem_name,
>> +					dev_info->dev->data->nb_rx_queues *
>> +					sizeof(struct eth_rx_queue_info), 0,
>> +					rx_adapter->socket_id);
>> +			if (!dev_info->rx_queue)
>> +				return -ENOMEM;
>> +		}
>> +
>> +		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
>> eth_dev_id,
>> +				rx_queue_id, queue_conf);
> 
> IMO, Instead of eth_dev_id rte_eth_dev parameter should be passed to the underlying driver.
> Otherwise the underlying driver needs to use rte_eth_devices[] global variable to fetch the device.
> This does not seems very right to me.

Agree, passing struct rte_eth_dev to the PMD is a cleaner approach.

> 
>> +		if (!ret) {
>> +			update_queue_info(rx_adapter,
>> +					&rx_adapter-
>>> eth_devices[eth_dev_id],
>> +					rx_queue_id,
>> +					1);
>> +		}
>> +	} else {
>> +		rte_spinlock_lock(&rx_adapter->rx_lock);
>> +		ret = init_service(rx_adapter, id);
>> +		if (!ret)
>> +			ret = add_rx_queue(rx_adapter, eth_dev_id,
>> rx_queue_id,
>> +					queue_conf);
>> +		rte_spinlock_unlock(&rx_adapter->rx_lock);
>> +		if (!ret)
>> +			start_service =
>> !!sw_rx_adapter_queue_count(rx_adapter);
>> +	}
>> +
>> +	if (ret)
>> +		return ret;
>> +
>> +	if (start_service)
>> +		rte_service_component_runstate_set(rx_adapter->service_id,
>> 1);
>> +
>> +	return 0;
>> +}
>> +
>> +int
>> +rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
>> +				int32_t rx_queue_id)
>> +{
>> +	int ret = 0;
>> +	struct rte_eventdev *dev;
>> +	struct rte_event_eth_rx_adapter *rx_adapter;
>> +	struct eth_device_info *dev_info;
>> +	uint32_t rx_adapter_cap;
>> +	uint16_t i;
>> +
>> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
>> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
>> +
>> +	rx_adapter = id_to_rx_adapter(id);
>> +	if (!rx_adapter)
>> +		return -EINVAL;
>> +
>> +	dev = &rte_eventdevs[rx_adapter->eventdev_id];
>> +	ret = dev->dev_ops->eth_rx_adapter_caps_get(dev, eth_dev_id,
>> +						&rx_adapter_cap);
>> +	if (ret)
>> +		return ret;
>> +
>> +	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
>> +		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
>> +		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
>> +			 (uint16_t)rx_queue_id);
>> +		return -EINVAL;
>> +	}
>> +
>> +	dev_info = &rx_adapter->eth_devices[eth_dev_id];
>> +
>> +	if (rx_adapter_cap &
>> RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
>> +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops-
>>> eth_rx_adapter_queue_del,
>> +				 -ENOTSUP);
>> +		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
>> eth_dev_id,
>> +							rx_queue_id);
> 
> Here too rte_eth_device shall be passed.
OK.

> 
>> +		if (!ret) {
>> +			update_queue_info(rx_adapter,
>> +					&rx_adapter-
>>> eth_devices[eth_dev_id],
>> +					rx_queue_id,
>> +					0);
>> +			if (!dev_info->nb_dev_queues) {
>> +				rte_free(dev_info->rx_queue);
>> +				dev_info->rx_queue = NULL;
>> +			}
>> +		}
>> +	} else {
>> +		int rc;
>> +		rte_spinlock_lock(&rx_adapter->rx_lock);
>> +		if (rx_queue_id == -1) {
>> +			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
>> +
>> 	_rte_event_eth_rx_adapter_queue_del(rx_adapter,
>> +								dev_info,
>> +								i);
>> +		} else {
>> +			_rte_event_eth_rx_adapter_queue_del(rx_adapter,
>> +							dev_info,
>> +
>> 	(uint16_t)rx_queue_id);
>> +		}
>> +
>> +		rc = eth_poll_wrr_calc(rx_adapter);
>> +		if (rc)
>> +			RTE_EDEV_LOG_ERR("WRR recalculation failed %"
>> PRId32,
>> +					rc);
>> +
>> +		if (!dev_info->nb_dev_queues) {
>> +			rte_free(dev_info->rx_queue);
>> +			dev_info->rx_queue = NULL;
>> +		}
>> +
>> +		rte_spinlock_unlock(&rx_adapter->rx_lock);
>> +		rte_service_component_runstate_set(rx_adapter->service_id,
>> +				sw_rx_adapter_queue_count(rx_adapter));
>> +	}
>> +
>> +	return ret;
>> +}
>> +
>> +
>> +int
>> +rte_event_eth_rx_adapter_start(uint8_t id)
>> +{
>> +	return rx_adapter_ctrl(id, 1);
>> +}
>> +
>> +int
>> +rte_event_eth_rx_adapter_stop(uint8_t id)
>> +{
>> +	return rx_adapter_ctrl(id, 0);
>> +}
>> +
>> +int
>> +rte_event_eth_rx_adapter_stats_get(uint8_t id,
>> +			       struct rte_event_eth_rx_adapter_stats *stats)
>> +{
>> +	struct rte_event_eth_rx_adapter *rx_adapter;
>> +	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
>> +	struct rte_event_eth_rx_adapter_stats dev_stats;
>> +	struct rte_eventdev *dev;
>> +	struct eth_device_info *dev_info;
>> +	uint32_t i;
>> +	int ret;
>> +
>> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
>> +
>> +	rx_adapter = id_to_rx_adapter(id);
>> +	if (!rx_adapter || !stats)
>> +		return -EINVAL;
>> +
>> +	dev = &rte_eventdevs[rx_adapter->eventdev_id];
>> +	memset(stats, 0, sizeof(*stats));
>> +	for (i = 0; i < rte_eth_dev_count(); i++) {
>> +		dev_info = &rx_adapter->eth_devices[i];
>> +		if (!dev_info->internal_event_port)
>> +			continue;
>> +		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev, i,
>> +						&dev_stats);
> 
> Preferable to check if the adapter stats get and stats set API are registered from the driver.
>

OK.

Thanks for the review, I will make the changes and post a v4.

Nikhil

> Regards,
> Nipun
>
Harry van Haaren Sept. 18, 2017, 3:36 p.m.
> From: Rao, Nikhil
> Sent: Wednesday, September 13, 2017 7:53 PM
> To: jerin.jacob@caviumnetworks.com; Richardson, Bruce
> <bruce.richardson@intel.com>
> Cc: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org; thomas@monjalon.net; Van
> Haaren, Harry <harry.van.haaren@intel.com>; hemant.agrawal@nxp.com;
> nipun.gupta@nxp.com; Vangati, Narender <narender.vangati@intel.com>; Carrillo,
> Erik G <erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
> <abhinandan.gujjar@intel.com>
> Subject: [PATCH 3/4] eventdev: Add eventdev ethernet Rx adapter
> 
> Add common APIs for configuring packet transfer from ethernet Rx
> queues to event devices across HW & SW packet transfer mechanisms.
> A detailed description of the adapter is contained in the header's
> comments.
> 
> The adapter implementation uses eventdev PMDs to configure the packet
> transfer if HW support is available and if not, it uses an EAL service
> function that reads packets from ethernet Rx queues and injects these
> as events into the event device.
> 
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Signed-off-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>

Generally looks good to me - I'll leave Acking and such to Jerin / others who were more involved in the design process.

<snip header, some implementation review below>

> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> @@ -0,0 +1,1239 @@
> +#include <rte_cycles.h>
> +#include <rte_common.h>
> +#include <rte_dev.h>
> +#include <rte_errno.h>
> +#include <rte_ethdev.h>
> +#include <rte_log.h>
> +#include <rte_malloc.h>
> +#include <rte_service_component.h>
> +#include <rte_thash.h>
> +
> +#include "rte_eventdev.h"
> +#include "rte_eventdev_pmd.h"
> +#include "rte_event_eth_rx_adapter.h"
> +
> +#define BATCH_SIZE		32
> +#define BLOCK_CNT_THRESHOLD	10
> +#define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
> +
> +static const char adapter_mem_name[] = "rx_adapter_mem_";

This bit isn't really DPDK style - usually we allocate a specific size for a buffer,
and then print into it using a safe method such as snprintf()

<snip>

> +struct rte_event_eth_rx_adapter {
> +	/* event device identifier */
> +	uint8_t eventdev_id;
> +	/* per ethernet device structure */
> +	struct eth_device_info *eth_devices;
> +	/* malloc name */
> +	char mem_name[sizeof(adapter_mem_name) + 4];

See above comment, and why the magic + 4?
If we had a statically sized buffer, then this wouldn't be an issue.
There are multiple other instances of adapter_mem_name being accessed,
they have this strange "+ 4" throughout. Please refactor.

Assuming this code is all initialization and configuration only, lets
not save a few bytes at the expense of a few bugs :D

> +
> +#if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
> +#define BE_16(x)	(uint16_t)((x) >> 8 | (x) << 8)
> +#else
> +#define BE_16(x)	(x)
> +#endif
> +
> +#define NETWORK_ORDER(x) BE_16(x)

There is a rte_byteorder.h header file, which handles details such as the above.
I don't have much experience with it - but it looks like there's some duplication here.


> +static int
> +_rte_event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter
> *rx_adapter,
> +				    struct eth_device_info *dev_info,
> +				    uint16_t rx_queue_id)
> +{

Why the _ prefix before the function? This is already a static function, so
it won't be available outside this translation unit. If it is not meant to be
externally visible, don't use the "rte" prefix and voila, you have an internal
visibility function..? Perhaps I missed something.

A few more _functions() like this below.

> +static int
> +_rx_adapter_ctrl(struct rte_event_eth_rx_adapter *rx_adapter, int start)
> +{

This function is only called from one place?
It can probably be placed in that function itself, given it is the non _prefixed version?



> +int
> +rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
> +				rx_adapter_conf_cb conf_cb, void *conf_arg)
> +{
> +	struct rte_event_eth_rx_adapter *rx_adapter;
> +	int ret;
> +	int socket_id;
> +	uint8_t i;
> +	char mem_name[sizeof(adapter_mem_name) + 4];

Same as before.

> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
> +	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
> +	if (!conf_cb)
> +		return -EINVAL;
> +
> +	if (rte_event_eth_rx_adapter == NULL) {
> +		ret = rte_event_eth_rx_adapter_init();
> +		if (ret)
> +			return ret;
> +	}
> +
> +	rx_adapter = id_to_rx_adapter(id);
> +	if (rx_adapter != NULL) {
> +		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
> +		return -EEXIST;
> +	}
> +
> +	socket_id = rte_event_dev_socket_id(dev_id);
> +	snprintf(mem_name, sizeof(adapter_mem_name) + 4, "%s%d",
> +		adapter_mem_name, id);
> +
> +	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
> +			RTE_CACHE_LINE_SIZE, socket_id);
> +	if (rx_adapter == NULL) {
> +		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
> +		return -ENOMEM;
> +	}
> +
> +	rx_adapter->eventdev_id = dev_id;
> +	rx_adapter->socket_id = socket_id;
> +	rx_adapter->conf_cb = conf_cb;
> +	rx_adapter->conf_arg = conf_arg;
> +	strcpy(rx_adapter->mem_name, mem_name);
> +	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
> +					rte_eth_dev_count() *
> +					sizeof(struct eth_device_info), 0,
> +					socket_id);
> +	if (rx_adapter->eth_devices == NULL) {
> +		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
> +		rte_free(rx_adapter);
> +		return -ENOMEM;
> +	}

This (and the previous other -ERROR returns) don't free the resources
that have been taken by this function up to this point. Improved tidying
up a after an error would be good.


There's also some checkpatch warnings on the patch page:
http://dpdk.org/ml/archives/test-report/2017-July/024634.html


In general - looks like really good work - these are just improvements, nothing major discovered.
Nikhil Rao Sept. 19, 2017, 3:25 p.m.
On 9/18/2017 9:06 PM, Van Haaren, Harry wrote:
>> From: Rao, Nikhil
> 
>> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> @@ -0,0 +1,1239 @@
>> +#include <rte_cycles.h>
>> +#include <rte_common.h>
>> +#include <rte_dev.h>
>> +#include <rte_errno.h>
>> +#include <rte_ethdev.h>
>> +#include <rte_log.h>
>> +#include <rte_malloc.h>
>> +#include <rte_service_component.h>
>> +#include <rte_thash.h>
>> +
>> +#include "rte_eventdev.h"
>> +#include "rte_eventdev_pmd.h"
>> +#include "rte_event_eth_rx_adapter.h"
>> +
>> +#define BATCH_SIZE		32
>> +#define BLOCK_CNT_THRESHOLD	10
>> +#define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
>> +
>> +static const char adapter_mem_name[] = "rx_adapter_mem_";
> 
> This bit isn't really DPDK style - usually we allocate a specific size for a buffer,
> and then print into it using a safe method such as snprintf()
> 
> <snip>
> 
>> +struct rte_event_eth_rx_adapter {
>> +	/* event device identifier */
>> +	uint8_t eventdev_id;
>> +	/* per ethernet device structure */
>> +	struct eth_device_info *eth_devices;
>> +	/* malloc name */
>> +	char mem_name[sizeof(adapter_mem_name) + 4];
> 
> See above comment, and why the magic + 4?
> If we had a statically sized buffer, then this wouldn't be an issue.
> There are multiple other instances of adapter_mem_name being accessed,
> they have this strange "+ 4" throughout. Please refactor.
> 
> Assuming this code is all initialization and configuration only, lets
> not save a few bytes at the expense of a few bugs :D
> 

OK.

>> +
>> +#if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
>> +#define BE_16(x)	(uint16_t)((x) >> 8 | (x) << 8)
>> +#else
>> +#define BE_16(x)	(x)
>> +#endif
>> +
>> +#define NETWORK_ORDER(x) BE_16(x)
> 
> There is a rte_byteorder.h header file, which handles details such as the above.
> I don't have much experience with it - but it looks like there's some duplication here.
> 

Agreed, looks redundant. These are compile time conversions, and I see 
that rte_byteorder does handle that case.

>> +static int
>> +_rte_event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter
>> *rx_adapter,
>> +				    struct eth_device_info *dev_info,
>> +				    uint16_t rx_queue_id)
>> +{
> 
> Why the _ prefix before the function? This is already a static function, so
> it won't be available outside this translation unit. If it is not meant to be
> externally visible, don't use the "rte" prefix and voila, you have an internal
> visibility function..? Perhaps I missed something.
> 
> A few more _functions() like this below.
> No particular reason for the _ prefix, as I was refactoring common code, 
it looks like I may have cut & pasted the queue_add/del function names. 
Will implement your suggestion.

>> +static int
>> +_rx_adapter_ctrl(struct rte_event_eth_rx_adapter *rx_adapter, int start)
>> +{
> 
> This function is only called from one place?
> It can probably be placed in that function itself, given it is the non _prefixed version?
> 
The idea here was to separate out the error checking from the core 
logic. I will put it back.

> 
>> +int
>> +rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
>> +				rx_adapter_conf_cb conf_cb, void *conf_arg)
>> +{
>> +	struct rte_event_eth_rx_adapter *rx_adapter;
>> +	int ret;
>> +	int socket_id;
>> +	uint8_t i;
>> +	char mem_name[sizeof(adapter_mem_name) + 4];
> 
> Same as before.
> 
>> +	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
>> +	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
>> +	if (!conf_cb)
>> +		return -EINVAL;
>> +
>> +	if (rte_event_eth_rx_adapter == NULL) {
>> +		ret = rte_event_eth_rx_adapter_init();
>> +		if (ret)
>> +			return ret;
>> +	}
>> +
>> +	rx_adapter = id_to_rx_adapter(id);
>> +	if (rx_adapter != NULL) {
>> +		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
>> +		return -EEXIST;
>> +	}
>> +
>> +	socket_id = rte_event_dev_socket_id(dev_id);
>> +	snprintf(mem_name, sizeof(adapter_mem_name) + 4, "%s%d",
>> +		adapter_mem_name, id);
>> +
>> +	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
>> +			RTE_CACHE_LINE_SIZE, socket_id);
>> +	if (rx_adapter == NULL) {
>> +		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
>> +		return -ENOMEM;
>> +	}
>> +
>> +	rx_adapter->eventdev_id = dev_id;
>> +	rx_adapter->socket_id = socket_id;
>> +	rx_adapter->conf_cb = conf_cb;
>> +	rx_adapter->conf_arg = conf_arg;
>> +	strcpy(rx_adapter->mem_name, mem_name);
>> +	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
>> +					rte_eth_dev_count() *
>> +					sizeof(struct eth_device_info), 0,
>> +					socket_id);
>> +	if (rx_adapter->eth_devices == NULL) {
>> +		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
>> +		rte_free(rx_adapter);
>> +		return -ENOMEM;
>> +	}
> 
> This (and the previous other -ERROR returns) don't free the resources
> that have been taken by this function up to this point. Improved tidying
> up a after an error would be good.
> 
The rte_free() for rx_adapter is the only cleanup needed, unless you are
talking about the cleanup for rte_event_eth_rx_adapter_init() function.

> 
> There's also some checkpatch warnings on the patch page:
> http://dpdk.org/ml/archives/test-report/2017-July/024634.html
> 

These warnings are for the previous version.

> In general - looks like really good work - these are just improvements, nothing major discovered.
> 

Thanks for the review.

Nikhil

Patch hide | download patch | download mbox

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
new file mode 100644
index 0000000..678c0ae
--- /dev/null
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
@@ -0,0 +1,386 @@ 
+/*
+ *   Copyright(c) 2017 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_EVENT_ETH_RX_ADAPTER_
+#define _RTE_EVENT_ETH_RX_ADAPTER_
+
+/**
+ * @file
+ *
+ * RTE Event Ethernet Rx Adapter
+ *
+ * An eventdev-based packet processing application enqueues/dequeues mbufs
+ * to/from the event device. The application uses the adapter APIs to configure
+ * the packet flow between the ethernet devices and event devices. Depending on
+ * on the capabilties of the eventdev PMD, the adapter may use a EAL service
+ * core function for packet transfer or use internal PMD functions to configure
+ * the packet transfer between the ethernet device and the event device.
+ *
+ * The ethernet Rx event adapter's functions are:
+ *  - rte_event_eth_rx_adapter_create_ext()
+ *  - rte_event_eth_rx_adapter_create()/free()
+ *  - rte_event_eth_rx_adapter_queue_add()/del()
+ *  - rte_event_eth_rx_adapter_start()/stop()
+ *  - rte_event_eth_rx_adapter_stats_get()/reset()
+ *
+ * The applicaton creates an event to ethernet adapter using
+ * rte_event_eth_rx_adapter_create_ext() or rte_event_eth_rx_adapter_create()
+ * functions.
+ * The adapter needs to know which ethernet rx queues to poll for mbufs as well
+ * as event device parameters such as the event queue identifier, event
+ * priority and scheduling type that the adapter should use when constructing
+ * events. The rte_event_eth_rx_adapter_queue_add() function is provided for
+ * this purpose.
+ * The servicing weight parameter in the rte_event_eth_rx_adapter_queue_conf
+ * is applicable when the Rx adapter uses a service core function and is
+ * intended to provide application control of the polling frequency of ethernet
+ * device receive queues, for example, the application may want to poll higher
+ * priority queues with a higher frequency but at the same time not starve
+ * lower priority queues completely. If this parameter is zero and the receive
+ * interrupt is enabled when configuring the device, the receive queue is
+ * interrupt driven; else, the queue is assigned a servicing weight of one.
+ *
+ * If the adapter uses a rte_service function, then the application is also
+ * required to assign a core to the service function and control the service
+ * core using the rte_service APIs. The rte_event_eth_rx_adapter_service_id_get
+ * function can be used to retrieve the service function ID of the adapter in
+ * this case.
+ *
+ * Note: Interrupt driven receive queues are currentely unimplemented.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <rte_service.h>
+
+#include "rte_eventdev.h"
+
+#define RTE_EVENT_ETH_RX_ADAPTER_NAME_FORMAT	"rte_event_eth_rx_adapter_%d"
+
+#define RTE_MAX_EVENT_ETH_RX_ADAPTER_INSTANCE 32
+
+/* struct rte_event_eth_rx_adapter_queue_conf flags definitions */
+#define RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID	0x1
+/**< This flag indicates the flow identifier is valid
+ * @see rte_event_eth_rx_adapter_queue_conf
+ */
+
+struct rte_event_eth_rx_adapter_conf {
+	uint8_t event_port_id;
+	/**< Event port identifier, the adapter enqueues mbuf events to this
+	 * port
+	 */
+	uint32_t max_nb_rx;
+	/**< The adapter can return early if it has processed at least
+	 * max_nb_rx mbufs. This isn't treated as a requirement; batching may
+	 * cause the adapter to process more than max_nb_rx mbufs
+	 */
+};
+
+/**
+ * Function type used for adapter configuration callback. The callback is
+ * used to fill in members of the struct rte_event_eth_rx_adapter_conf, this
+ * callback is invoked when creating a SW service for packet transfer from
+ * ethdev queues to the event device. The SW service is created within the
+ * rte_event_eth_rx_adapter_queue_add() function if packet required.
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @param dev_id
+ *  Event device identifier.
+ *
+ * @conf
+ *  Structure that needs to be populated by this callback.
+ *
+ * @arg
+ *  Argument to the callback. This is the same as the conf_arg passed to the
+ *  rte_event_eth_rx_adapter_create_ext()
+ */
+typedef int (*rx_adapter_conf_cb) (uint8_t id, uint8_t dev_id,
+			struct rte_event_eth_rx_adapter_conf *conf,
+			void *arg);
+
+/** Rx queue configuration structure */
+struct rte_event_eth_rx_adapter_queue_conf {
+	uint32_t rx_queue_flags;
+	 /**< Flags for handling received packets
+	  * @see RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID
+	  */
+	uint16_t servicing_weight;
+	/**< Relative polling frequency of ethernet receive queue, if this
+	 * is set to zero, the Rx queue is interrupt driven (unless rx queue
+	 * interrupts are not enabled for the ethernet device)
+	 */
+	struct rte_event ev;
+	/**<
+	 *  The values from the following event fields will be used when
+	 *  enqueuing mbuf events:
+	 *   - event_queue_id: Targeted event queue ID for received packets.
+	 *   - event_priority: Event priority of packets from this Rx queue in
+	 *                     the event queue relative to other events.
+	 *   - sched_type: Scheduling type for packets from this Rx queue.
+	 *   - flow_id: If the RTE_ETH_RX_EVENT_ADAPTER_QUEUE_FLOW_ID_VALID bit
+	 *		is set in rx_queue_flags, this flow_id is used for all
+	 *		packets received from this queue. Otherwise the flow ID
+	 *		is set to the RSS hash of the src and dst IPv4/6
+	 *		address.
+	 *
+	 * The event adapter sets ev.event_type to RTE_EVENT_TYPE_ETHDEV in the
+	 * enqueued event
+	 */
+};
+
+struct rte_event_eth_rx_adapter_stats {
+	uint64_t rx_poll_count;
+	/**< Receive queue poll count */
+	uint64_t rx_packets;
+	/**< Received packet count */
+	uint64_t rx_enq_count;
+	/**< Eventdev enqueue count */
+	uint64_t rx_enq_retry;
+	/**< Eventdev enqueue retry count */
+	uint64_t rx_enq_start_ts;
+	/**< Rx enqueue start timestamp */
+	uint64_t rx_enq_block_cycles;
+	/**< Cycles for which the service is blocked by the event device,
+	 * i.e, the service fails to enqueue to the event device.
+	 */
+	uint64_t rx_enq_end_ts;
+	/**< Latest timestamp at which the service is unblocked
+	 * by the event device. The start, end timestamps and
+	 * block cycles can be used to compute the percentage of
+	 * cycles the service is blocked by the event device.
+	 */
+};
+
+/**
+ * Create a new ethernet Rx event adapter with the specified identifier.
+ *
+ * @param id
+ *  The identifier of the ethernet Rx event adapter.
+ *
+ * @dev_id
+ *  The identifier of the device to configure.
+ *
+ * @eth_port_id
+ *  The identifier of the ethernet device.
+ *
+ * @param conf_cb
+ *  Callback function that fills in members of a
+ *  struct rte_event_eth_rx_adapter_conf struct passed into
+	 *    it.
+ *
+ * @param conf_arg
+ *  Argument that is passed to the conf_cb function.
+ *
+ * @return
+ *   - 0: Success
+ *   - <0: Error code on failure
+ */
+int rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
+					rx_adapter_conf_cb conf_cb,
+					void *conf_arg);
+
+/**
+ * Create a new ethernet Rx event adapter with the specified identifier.
+ * This function uses an internal configuration function that creates an event
+ * port. This default function reconfigures the event device with an
+ * additional event port and setups up the event port using the port_config
+ * parameter passed into this function. In case the application needs more
+ * control in configuration of the service, it should use the
+ * rte_event_eth_rx_adapter_create_ext() version.
+ *
+ * @param id
+ *  The identifier of the ethernet Rx event adapter.
+ *
+ * @dev_id
+ *  The identifier of the device to configure.
+ *
+ * @eth_port_id
+ *  The identifier of the ethernet device.
+ *
+ * @param conf_cb
+ *  Callback function that fills in members of a
+ *  struct rte_event_eth_rx_adapter_conf struct passed into
+ *  it.
+ *
+ * @param conf_arg
+ *  Argument of type *rte_event_port_conf* that is passed to the conf_cb
+ *  function.
+ *
+ * @return
+ *   - 0: Success
+ *   - <0: Error code on failure
+ */
+int rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
+				struct rte_event_port_conf *port_config);
+
+/**
+ * Free an event adapter
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @return
+ *   - 0: Success
+ *   - <0: Error code on failure, If the adapter still has Rx queues
+ *      added to it, the function returns -EBUSY.
+ */
+int rte_event_eth_rx_adapter_free(uint8_t id);
+
+/**
+ * Add receive queue to an event adapter. After a queue has been
+ * added to the event adapter, the result of the application calling
+ * rte_eth_rx_burst(eth_dev_id, rx_queue_id, ..) is undefined.
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @param eth_dev_id
+ *  Port identifier of Ethernet device.
+ *
+ * @param rx_queue_id
+ *  Ethernet device receive queue index.
+ *  If rx_queue_id is -1, then all Rx queues configured for
+ *  the device are added. If the ethdev Rx queues can only be
+ *  connected to a single event queue then rx_queue_id is
+ *  required to be -1.
+ *
+ * @param conf
+ *  Additonal configuration structure of type *rte_event_eth_rx_adapte_conf*
+ *
+ * @see
+ * @return
+ *  - 0: Success, Receive queue added correctly.
+ *  - <0: Error code on failure.
+ */
+int rte_event_eth_rx_adapter_queue_add(uint8_t id,
+			uint8_t eth_dev_id,
+			int32_t rx_queue_id,
+			const struct rte_event_eth_rx_adapter_queue_conf *conf);
+
+/**
+ * Delete receive queue from an event adapter.
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @param eth_dev_id
+ *  Port identifier of Ethernet device.
+ *
+ * @param rx_queue_id
+ *  Ethernet device receive queue index.
+ *  If rx_queue_id is -1, then all Rx queues configured for
+ *  the device are deleted. If the ethdev Rx queues can only be
+ *  connected to a single event queue then rx_queue_id is
+ *  required to be -1.
+ *
+ * @return
+ *  - 0: Success, Receive queue deleted correctly.
+ *  - <0: Error code on failure.
+ */
+int rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
+				       int32_t rx_queue_id);
+
+/**
+ * Start  ethernet Rx event adapter
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @return
+ *  - 0: Success, Adapter started correctly.
+ *  - <0: Error code on failure.
+ */
+int rte_event_eth_rx_adapter_start(uint8_t id);
+
+/**
+ * Stop  ethernet Rx event adapter
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @return
+ *  - 0: Success, Adapter started correctly.
+ *  - <0: Error code on failure.
+ */
+int rte_event_eth_rx_adapter_stop(uint8_t id);
+
+/**
+ * Retrieve statistics for an adapter
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @param stats
+ *  A pointer to structure used to retrieve statistics for an adapter.
+ *
+ * @return
+ *  - 0: Success, retrieved successfully.
+ *  - <0: Error code on failure.
+ */
+int rte_event_eth_rx_adapter_stats_get(uint8_t id,
+				struct rte_event_eth_rx_adapter_stats *stats);
+
+/**
+ * Reset statistics for an adapter
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @return
+ *  - 0: Success, statistics reset successfully.
+ *  - <0: Error code on failure.
+ */
+int rte_event_eth_rx_adapter_stats_reset(uint8_t id);
+
+/**
+ * Retrieve the service ID of an adapter. If the adapter doesn't use
+ * a rte_service function, this function returns -ESRCH
+ *
+ * @param id
+ *  Adapter identifier.
+ *
+ * @return
+ *  - 0: Success, statistics reset successfully.
+ *  - <0: Error code on failure, if the adapter doesn't use a rte_service
+ * function, this function returns -ESRCH.
+ */
+int rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id);
+
+#ifdef __cplusplus
+}
+#endif
+#endif	/* _RTE_EVENT_ETH_RX_ADAPTER_ */
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
new file mode 100644
index 0000000..cd19e7c
--- /dev/null
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -0,0 +1,1239 @@ 
+#include <rte_cycles.h>
+#include <rte_common.h>
+#include <rte_dev.h>
+#include <rte_errno.h>
+#include <rte_ethdev.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_service_component.h>
+#include <rte_thash.h>
+
+#include "rte_eventdev.h"
+#include "rte_eventdev_pmd.h"
+#include "rte_event_eth_rx_adapter.h"
+
+#define BATCH_SIZE		32
+#define BLOCK_CNT_THRESHOLD	10
+#define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
+
+static const char adapter_mem_name[] = "rx_adapter_mem_";
+/*
+ * There is an instance of this struct per polled Rx queue added to the
+ * adapter
+ */
+struct eth_rx_poll_entry {
+	/* eth port to poll */
+	uint8_t eth_dev_id;
+	/* eth rx queue to poll */
+	uint16_t eth_rx_qid;
+};
+
+/* Instance per adapter */
+struct rte_eth_event_enqueue_buffer {
+	/* Count of events in this buffer */
+	uint16_t count;
+	/* Array of events in this buffer */
+	struct rte_event events[ETH_EVENT_BUFFER_SIZE];
+};
+
+struct rte_event_eth_rx_adapter {
+	/* event device identifier */
+	uint8_t eventdev_id;
+	/* per ethernet device structure */
+	struct eth_device_info *eth_devices;
+	/* malloc name */
+	char mem_name[sizeof(adapter_mem_name) + 4];
+	/* socket identifier cached from eventdev */
+	int socket_id;
+
+	/* elements below are used by SW service */
+
+	/* event port identifier */
+	uint8_t event_port_id;
+	/* per adapter EAL service */
+	uint32_t service_id;
+	/* lock to serialize config updates with service function */
+	rte_spinlock_t rx_lock;
+	/* max mbufs processed in any service function invocation */
+	uint32_t max_nb_rx;
+	/* Receive queues that need to be polled */
+	struct eth_rx_poll_entry *eth_rx_poll;
+	/* size of the eth_rx_poll array */
+	uint16_t num_rx_polled;
+	/* Weighted round robin schedule */
+	uint32_t *wrr_sched;
+	/* wrr_sched[] size */
+	uint32_t wrr_len;
+	/* Next entry in wrr[] to begin polling */
+	uint32_t wrr_pos;
+	/* Event burst buffer */
+	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
+	/* per adapter stats */
+	struct rte_event_eth_rx_adapter_stats stats;
+	/* Block count, counts upto BLOCK_CNT_THRESHOLD */
+	uint16_t enq_block_count;
+	/* Block start ts */
+	uint64_t rx_enq_block_start_ts;
+	/* Configuration callback for rte_service configuration */
+	rx_adapter_conf_cb conf_cb;
+	/* Configuration callback argument */
+	void *conf_arg;
+	/* Service initialization state */
+	uint8_t service_inited;
+	/* Total count of Rx queues in adapter */
+	uint32_t nb_queues;
+} __rte_cache_aligned;
+
+/* Per eth device */
+struct eth_device_info {
+	struct rte_eth_dev *dev;
+	struct eth_rx_queue_info *rx_queue;
+	/* Set if ethdev->eventdev packet transfer uses a
+	 * hardware mechanism
+	 */
+	uint8_t internal_event_port;
+	/* set if the adapter is processing rx queues for
+	 * this eth device and packet processing has been
+	 * started, allows for the code to know if the PMD
+	 * rx_adapter_stop callback needs to be invoked
+	 */
+	uint8_t dev_rx_started;
+	/* if nb_dev_queues > 0, the start callback will
+	 * be invoked if not already invoked
+	 */
+	uint16_t nb_dev_queues;
+};
+
+/* Per Rx queue */
+struct eth_rx_queue_info {
+	int queue_enabled;	/* true if added */
+	uint16_t wt;		/* polling weight */
+	uint8_t event_queue_id;	/* Event queue to enqueue packets to */
+	uint8_t sched_type;	/* sched type for events */
+	uint8_t priority;	/* event priority */
+	uint32_t flow_id;	/* app provided flow identifier */
+	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
+};
+
+static struct rte_event_eth_rx_adapter **rte_event_eth_rx_adapter;
+static struct rte_event_port_conf
+		create_port_conf[RTE_MAX_EVENT_ETH_RX_ADAPTER_INSTANCE];
+
+static uint8_t default_rss_key[] = {
+	0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
+	0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
+	0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
+	0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
+	0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
+};
+static uint8_t *rss_key_be;
+
+static inline int
+valid_id(uint8_t id)
+{
+	return id < RTE_MAX_EVENT_ETH_RX_ADAPTER_INSTANCE;
+}
+
+#define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
+	if (!valid_id(id)) { \
+		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
+		return retval; \
+	} \
+} while (0)
+
+static inline int
+sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	return rx_adapter->num_rx_polled;
+}
+
+/* Greatest common divisor */
+static uint16_t gcd_u16(uint16_t a, uint16_t b)
+{
+	uint16_t r = a % b;
+
+	return r ? gcd_u16(b, r) : b;
+}
+
+/* Returns the next queue in the polling sequence
+ *
+ * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
+ */
+static int
+wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
+	 unsigned int n, int *cw,
+	 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
+	 uint16_t gcd, int prev)
+{
+	int i = prev;
+	uint16_t w;
+
+	while (1) {
+		uint16_t q;
+		uint8_t d;
+
+		i = (i + 1) % n;
+		if (i == 0) {
+			*cw = *cw - gcd;
+			if (*cw <= 0)
+				*cw = max_wt;
+		}
+
+		q = eth_rx_poll[i].eth_rx_qid;
+		d = eth_rx_poll[i].eth_dev_id;
+		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
+
+		if ((int)w >= *cw)
+			return i;
+	}
+}
+
+/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static int
+eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	uint8_t d;
+	uint16_t q;
+	unsigned int i;
+
+	/* Initialize variables for calculaton of wrr schedule */
+	uint16_t max_wrr_pos = 0;
+	unsigned int poll_q = 0;
+	uint16_t max_wt = 0;
+	uint16_t gcd = 0;
+
+	struct eth_rx_poll_entry *rx_poll = NULL;
+	uint32_t *rx_wrr = NULL;
+
+	if (rx_adapter->num_rx_polled) {
+		size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
+				sizeof(*rx_adapter->eth_rx_poll),
+				RTE_CACHE_LINE_SIZE);
+		rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
+					     len,
+					     RTE_CACHE_LINE_SIZE,
+					     rx_adapter->socket_id);
+		if (!rx_poll)
+			return -ENOMEM;
+
+		/* Generate array of all queues to poll, the size of this
+		 * array is poll_q
+		 */
+		for (d = 0; d < rte_eth_dev_count(); d++) {
+			uint16_t nb_rx_queues;
+			struct eth_device_info *dev_info =
+					&rx_adapter->eth_devices[d];
+			nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+			if (!dev_info->rx_queue)
+				continue;
+			for (q = 0; q < nb_rx_queues; q++) {
+				struct eth_rx_queue_info *queue_info =
+					&dev_info->rx_queue[q];
+				if (!queue_info->queue_enabled)
+					continue;
+
+				uint16_t wt = queue_info->wt;
+				rx_poll[poll_q].eth_dev_id = d;
+				rx_poll[poll_q].eth_rx_qid = q;
+				max_wrr_pos += wt;
+				max_wt = RTE_MAX(max_wt, wt);
+				gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
+				poll_q++;
+			}
+		}
+
+		len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
+				RTE_CACHE_LINE_SIZE);
+		rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
+					    len,
+					    RTE_CACHE_LINE_SIZE,
+					    rx_adapter->socket_id);
+		if (!rx_wrr) {
+			rte_free(rx_poll);
+			return -ENOMEM;
+		}
+
+		/* Generate polling sequence based on weights */
+		int prev = -1;
+		int cw = -1;
+		for (i = 0; i < max_wrr_pos; i++) {
+			rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
+					     rx_poll, max_wt, gcd, prev);
+			prev = rx_wrr[i];
+		}
+	}
+
+	rte_free(rx_adapter->eth_rx_poll);
+	rte_free(rx_adapter->wrr_sched);
+
+	rx_adapter->eth_rx_poll = rx_poll;
+	rx_adapter->wrr_sched = rx_wrr;
+	rx_adapter->wrr_len = max_wrr_pos;
+
+	return 0;
+}
+
+#if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
+#define BE_16(x)	(uint16_t)((x) >> 8 | (x) << 8)
+#else
+#define BE_16(x)	(x)
+#endif
+
+#define NETWORK_ORDER(x) BE_16(x)
+
+static inline void
+mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
+	struct ipv6_hdr **ipv6_hdr)
+{
+	struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
+	struct vlan_hdr *vlan_hdr;
+
+	*ipv4_hdr = NULL;
+	*ipv6_hdr = NULL;
+
+	switch (eth_hdr->ether_type) {
+	case NETWORK_ORDER(ETHER_TYPE_IPv4):
+		*ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
+		break;
+
+	case NETWORK_ORDER(ETHER_TYPE_IPv6):
+		*ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
+		break;
+
+	case NETWORK_ORDER(ETHER_TYPE_VLAN):
+		vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
+		switch (vlan_hdr->eth_proto) {
+		case NETWORK_ORDER(ETHER_TYPE_IPv4):
+			*ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
+			break;
+		case NETWORK_ORDER(ETHER_TYPE_IPv6):
+			*ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
+			break;
+		default:
+			break;
+		}
+		break;
+
+	default:
+		break;
+	}
+}
+
+/* Calculate RSS hash for IPv4/6 */
+static inline uint32_t
+do_softrss(struct rte_mbuf *m)
+{
+	uint32_t input_len;
+	void *tuple;
+	struct rte_ipv4_tuple ipv4_tuple;
+	struct rte_ipv6_tuple ipv6_tuple;
+	struct ipv4_hdr *ipv4_hdr;
+	struct ipv6_hdr *ipv6_hdr;
+
+	mtoip(m, &ipv4_hdr, &ipv6_hdr);
+
+	if (ipv4_hdr) {
+		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
+		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
+		tuple = &ipv4_tuple;
+		input_len = RTE_THASH_V4_L3_LEN;
+	} else if (ipv6_hdr) {
+		rte_thash_load_v6_addrs(ipv6_hdr,
+					(union rte_thash_tuple *)&ipv6_tuple);
+		tuple = &ipv6_tuple;
+		input_len = RTE_THASH_V6_L3_LEN;
+	} else
+		return 0;
+
+	return rte_softrss_be(tuple, input_len, rss_key_be);
+}
+
+static inline int
+rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	return !!rx_adapter->enq_block_count;
+}
+
+static inline void
+rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	if (rx_adapter->rx_enq_block_start_ts)
+		return;
+
+	rx_adapter->enq_block_count++;
+	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
+		return;
+
+	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
+}
+
+static inline void
+rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
+		    struct rte_event_eth_rx_adapter_stats *stats)
+{
+	if (unlikely(!stats->rx_enq_start_ts))
+		stats->rx_enq_start_ts = rte_get_tsc_cycles();
+
+	if (likely(!rx_enq_blocked(rx_adapter)))
+		return;
+
+	rx_adapter->enq_block_count = 0;
+	if (rx_adapter->rx_enq_block_start_ts) {
+		stats->rx_enq_end_ts = rte_get_tsc_cycles();
+		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
+		    rx_adapter->rx_enq_block_start_ts;
+		rx_adapter->rx_enq_block_start_ts = 0;
+	}
+}
+
+/* Add event to buffer, free space check is done prior to calling
+ * this function
+ */
+static inline void
+buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
+		  struct rte_event *ev)
+{
+	struct rte_eth_event_enqueue_buffer *buf =
+	    &rx_adapter->event_enqueue_buffer;
+	rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
+}
+
+/* Enqueue buffered events to event device */
+static inline uint16_t
+flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	struct rte_eth_event_enqueue_buffer *buf =
+	    &rx_adapter->event_enqueue_buffer;
+	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+
+	uint16_t n = rte_event_enqueue_burst(rx_adapter->eventdev_id,
+					rx_adapter->event_port_id,
+					buf->events,
+					buf->count);
+	if (n != buf->count) {
+		memmove(buf->events,
+			&buf->events[n],
+			(buf->count - n) * sizeof(struct rte_event));
+		stats->rx_enq_retry++;
+	}
+
+	n ? rx_enq_block_end_ts(rx_adapter, stats) :
+		rx_enq_block_start_ts(rx_adapter);
+
+	buf->count -= n;
+	stats->rx_enq_count += n;
+
+	return n;
+}
+
+static inline void
+fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
+	uint8_t dev_id,
+	uint16_t rx_queue_id,
+	struct rte_mbuf **mbufs,
+	uint16_t num)
+{
+	uint32_t i;
+	struct eth_device_info *eth_device_info =
+					&rx_adapter->eth_devices[dev_id];
+	struct eth_rx_queue_info *eth_rx_queue_info =
+					&eth_device_info->rx_queue[rx_queue_id];
+
+	int32_t qid = eth_rx_queue_info->event_queue_id;
+	uint8_t sched_type = eth_rx_queue_info->sched_type;
+	uint8_t priority = eth_rx_queue_info->priority;
+	uint32_t flow_id;
+	struct rte_event events[BATCH_SIZE];
+	struct rte_mbuf *m = mbufs[0];
+	uint32_t rss_mask;
+	uint32_t rss;
+	int do_rss;
+
+	/* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
+	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
+	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
+
+	for (i = 0; i < num; i++) {
+		m = mbufs[i];
+		struct rte_event *ev = &events[i];
+
+		rss = do_rss ? do_softrss(m) : m->hash.rss;
+		flow_id =
+		    eth_rx_queue_info->flow_id &
+				eth_rx_queue_info->flow_id_mask;
+		flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
+
+		ev->flow_id = flow_id;
+		ev->op = RTE_EVENT_OP_NEW;
+		ev->sched_type = sched_type;
+		ev->queue_id = qid;
+		ev->event_type = RTE_EVENT_TYPE_ETHDEV;
+		ev->sub_event_type = 0;
+		ev->priority = priority;
+		ev->mbuf = m;
+
+		buf_event_enqueue(rx_adapter, ev);
+	}
+}
+
+/*
+ * Polls receive queues added to the event adapter and enqueues received
+ * packets to the event device.
+ *
+ * The receive code enqueues initially to a temporary buffer, the
+ * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
+ *
+ * If there isn't space available in the temporary buffer, packets from the
+ * Rx queue arent dequeued from the eth device, this backpressures the
+ * eth device, in virtual device enviroments this backpressure is relayed to the
+ * hypervisor's switching layer where adjustments can be made to deal with
+ * it.
+ */
+static inline uint32_t
+eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	uint32_t num_queue;
+	uint16_t n;
+	uint32_t nb_rx = 0;
+	struct rte_mbuf *mbufs[BATCH_SIZE];
+	struct rte_eth_event_enqueue_buffer *buf;
+	uint32_t wrr_pos;
+	uint32_t max_nb_rx;
+
+	wrr_pos = rx_adapter->wrr_pos;
+	max_nb_rx = rx_adapter->max_nb_rx;
+	buf = &rx_adapter->event_enqueue_buffer;
+	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+
+	/* Iterate through a WRR sequence */
+	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
+		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
+		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
+		uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
+
+		/* Don't do a batch dequeue from the rx queue if there isn't
+		 * enough space in the enqueue buffer.
+		 */
+		if (buf->count >= BATCH_SIZE)
+			flush_event_buffer(rx_adapter);
+		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count))
+			break;
+
+		stats->rx_poll_count++;
+		n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
+
+		if (n) {
+			stats->rx_packets += n;
+			/* The check before rte_eth_rx_burst() ensures that
+			 * all n mbufs can be buffered
+			 */
+			fill_event_buffer(rx_adapter, d, qid, mbufs, n);
+			nb_rx += n;
+			if (nb_rx > max_nb_rx) {
+				rx_adapter->wrr_pos =
+				    (wrr_pos + 1) % rx_adapter->wrr_len;
+				return nb_rx;
+			}
+		}
+
+		if (++wrr_pos == rx_adapter->wrr_len)
+			wrr_pos = 0;
+	}
+
+	return nb_rx;
+}
+
+static int
+event_eth_rx_adapter_service_func(void *args)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter = args;
+	struct rte_eth_event_enqueue_buffer *buf;
+
+	buf = &rx_adapter->event_enqueue_buffer;
+	if (!rte_spinlock_trylock(&rx_adapter->rx_lock))
+		return 0;
+	if (eth_rx_poll(rx_adapter) == 0 && buf->count)
+		flush_event_buffer(rx_adapter);
+	rte_spinlock_unlock(&rx_adapter->rx_lock);
+	return 0;
+}
+
+static int
+rte_event_eth_rx_adapter_init(void)
+{
+	const char *name = "rte_event_eth_rx_adapter_array";
+	const struct rte_memzone *mz;
+	unsigned int sz;
+	unsigned int rss_key_off;
+
+	sz = sizeof(*rte_event_eth_rx_adapter) *
+	    RTE_MAX_EVENT_ETH_RX_ADAPTER_INSTANCE;
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	rss_key_off = sz;
+	sz = RTE_ALIGN(sz + sizeof(default_rss_key), RTE_CACHE_LINE_SIZE);
+
+	mz = rte_memzone_lookup(name);
+	if (!mz) {
+		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
+						 RTE_CACHE_LINE_SIZE);
+		if (mz) {
+			rte_convert_rss_key((uint32_t *)default_rss_key,
+			    (uint32_t *)(uintptr_t)(mz->addr_64 + rss_key_off),
+			    RTE_DIM(default_rss_key));
+		} else {
+			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
+					PRId32, rte_errno);
+			return -rte_errno;
+		}
+	}
+
+	rte_event_eth_rx_adapter = mz->addr;
+	rss_key_be = (uint8_t *)(mz->addr_64 + rss_key_off);
+	return 0;
+}
+
+static inline struct rte_event_eth_rx_adapter *
+id_to_rx_adapter(uint8_t id)
+{
+	return rte_event_eth_rx_adapter ?
+		rte_event_eth_rx_adapter[id] : NULL;
+}
+
+static int
+default_conf_cb(uint8_t id, uint8_t dev_id,
+		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
+{
+	int ret;
+	struct rte_eventdev *dev;
+	struct rte_event_dev_config dev_conf;
+	int started;
+	uint8_t port_id;
+	struct rte_event_port_conf *port_conf = arg;
+	struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
+
+	dev = &rte_eventdevs[rx_adapter->eventdev_id];
+	dev_conf = dev->data->dev_conf;
+
+	started = dev->data->dev_started;
+	if (started)
+		rte_event_dev_stop(dev_id);
+	port_id = dev_conf.nb_event_ports;
+	dev_conf.nb_event_ports += 1;
+	ret = rte_event_dev_configure(dev_id, &dev_conf);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
+						dev_id);
+		/* Conf. failed, OK to start ? */
+		if (started)
+			rte_event_dev_start(dev_id);
+		return ret;
+	}
+
+	ret = rte_event_port_setup(dev_id, port_id, port_conf);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
+					port_id);
+	} else {
+		conf->event_port_id = port_id;
+		conf->max_nb_rx = 128;
+	}
+
+	if (started)
+		rte_event_dev_start(dev_id);
+	return ret;
+}
+
+static int
+init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
+{
+	int ret;
+	struct rte_service_spec service;
+	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
+
+	if (rx_adapter->service_inited)
+		return 0;
+
+	memset(&service, 0, sizeof(service));
+	sprintf(service.name, RTE_EVENT_ETH_RX_ADAPTER_NAME_FORMAT, id);
+	service.socket_id = rx_adapter->socket_id;
+	service.callback = event_eth_rx_adapter_service_func;
+	service.callback_userdata = rx_adapter;
+	/* Service function handles locking for queue add/del updates */
+	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
+	ret = rte_service_component_register(&service, &rx_adapter->service_id);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
+			service.name, ret);
+		return ret;
+	}
+
+	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
+		&rx_adapter_conf, rx_adapter->conf_arg);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("confguration callback failed err = %" PRId32,
+			ret);
+		goto err_done;
+	}
+	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
+	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
+	rx_adapter->service_inited = 1;
+	return 0;
+
+err_done:
+	rte_service_component_unregister(rx_adapter->service_id);
+	return ret;
+}
+
+static void
+update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int32_t rx_queue_id,
+		uint8_t add)
+{
+	struct eth_rx_queue_info *queue_info;
+	int enabled;
+	uint16_t i;
+
+	if (!dev_info->rx_queue)
+		return;
+
+	if (rx_queue_id == -1) {
+		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
+			queue_info = &dev_info->rx_queue[i];
+			enabled = queue_info->queue_enabled;
+			if (add) {
+				rx_adapter->nb_queues += !enabled;
+				dev_info->nb_dev_queues += !enabled;
+			} else {
+				rx_adapter->nb_queues -= enabled;
+				dev_info->nb_dev_queues -= enabled;
+			}
+			queue_info->queue_enabled = !!add;
+		}
+	} else {
+		queue_info = &dev_info->rx_queue[rx_queue_id];
+		enabled = queue_info->queue_enabled;
+		if (add) {
+			rx_adapter->nb_queues += !enabled;
+			dev_info->nb_dev_queues += !enabled;
+		} else {
+			rx_adapter->nb_queues -= enabled;
+			dev_info->nb_dev_queues -= enabled;
+		}
+		queue_info->queue_enabled = !!add;
+	}
+}
+
+static int
+_rte_event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
+				    struct eth_device_info *dev_info,
+				    uint16_t rx_queue_id)
+{
+	struct eth_rx_queue_info *queue_info;
+
+	if (!rx_adapter->nb_queues)
+		return 0;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	rx_adapter->num_rx_polled -= queue_info->queue_enabled;
+	update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
+	return 0;
+}
+
+static void
+_rte_event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		uint16_t rx_queue_id,
+		const struct rte_event_eth_rx_adapter_queue_conf *conf)
+
+{
+	struct eth_rx_queue_info *queue_info;
+	const struct rte_event *ev = &conf->ev;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	queue_info->event_queue_id = ev->queue_id;
+	queue_info->sched_type = ev->sched_type;
+	queue_info->priority = ev->priority;
+	queue_info->wt = conf->servicing_weight;
+
+	if (conf->
+	    rx_queue_flags & RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
+		queue_info->flow_id = ev->flow_id;
+		queue_info->flow_id_mask = ~0;
+	}
+
+	/* The same queue can be added more than once */
+	rx_adapter->num_rx_polled += !queue_info->queue_enabled;
+	update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
+}
+
+static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+		uint8_t eth_dev_id,
+		int rx_queue_id,
+		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
+{
+	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
+	uint32_t i;
+	int ret;
+
+	if (queue_conf->servicing_weight == 0) {
+		struct rte_event_eth_rx_adapter_queue_conf temp_conf;
+
+		struct rte_eth_dev_data *data = dev_info->dev->data;
+		if (data->dev_conf.intr_conf.rxq) {
+			RTE_EDEV_LOG_ERR("Interrupt driven queues"
+					" not supported");
+			return -ENOTSUP;
+		}
+		temp_conf = *queue_conf;
+		temp_conf.servicing_weight = 1;
+		/* If Rx interrupts are disabled set wt = 1 */
+		queue_conf = &temp_conf;
+	}
+
+	if (!dev_info->rx_queue) {
+		dev_info->rx_queue =
+		    rte_zmalloc_socket(rx_adapter->mem_name,
+				       dev_info->dev->data->nb_rx_queues *
+				       sizeof(struct eth_rx_queue_info), 0,
+				       rx_adapter->socket_id);
+		if (!dev_info->rx_queue)
+			return -ENOMEM;
+	}
+
+	if (rx_queue_id == -1) {
+		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
+			_rte_event_eth_rx_adapter_queue_add(rx_adapter,
+							dev_info, i,
+							queue_conf);
+	} else {
+		_rte_event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
+						  (uint16_t)rx_queue_id,
+						  queue_conf);
+	}
+
+	ret = eth_poll_wrr_calc(rx_adapter);
+	if (ret) {
+		_rte_event_eth_rx_adapter_queue_del(rx_adapter,
+					dev_info, rx_queue_id);
+		return ret;
+	}
+
+	return ret;
+}
+
+static int
+_rx_adapter_ctrl(struct rte_event_eth_rx_adapter *rx_adapter, int start)
+{
+	struct rte_eventdev *dev;
+	struct eth_device_info *dev_info;
+	uint32_t i;
+	int use_service = 0;
+	int stop = !start;
+
+	dev = &rte_eventdevs[rx_adapter->eventdev_id];
+
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		dev_info = &rx_adapter->eth_devices[i];
+		/* if start  check for num dev queues */
+		if (start && !dev_info->nb_dev_queues)
+			continue;
+		/* if stop check if dev has been started */
+		if (stop && !dev_info->dev_rx_started)
+			continue;
+		use_service |= !dev_info->internal_event_port;
+		dev_info->dev_rx_started = start;
+		if (!dev_info->internal_event_port)
+			continue;
+		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev, i) :
+			(*dev->dev_ops->eth_rx_adapter_stop)(dev, i);
+	}
+
+	if (use_service)
+		rte_service_runstate_set(rx_adapter->service_id, start);
+
+	return 0;
+}
+
+static int
+rx_adapter_ctrl(uint8_t id, int start)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (!rx_adapter)
+		return -EINVAL;
+	return _rx_adapter_ctrl(rx_adapter, start);
+}
+
+int
+rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
+				rx_adapter_conf_cb conf_cb, void *conf_arg)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter;
+	int ret;
+	int socket_id;
+	uint8_t i;
+	char mem_name[sizeof(adapter_mem_name) + 4];
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
+	if (!conf_cb)
+		return -EINVAL;
+
+	if (rte_event_eth_rx_adapter == NULL) {
+		ret = rte_event_eth_rx_adapter_init();
+		if (ret)
+			return ret;
+	}
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (rx_adapter != NULL) {
+		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
+		return -EEXIST;
+	}
+
+	socket_id = rte_event_dev_socket_id(dev_id);
+	snprintf(mem_name, sizeof(adapter_mem_name) + 4, "%s%d",
+		adapter_mem_name, id);
+
+	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
+			RTE_CACHE_LINE_SIZE, socket_id);
+	if (rx_adapter == NULL) {
+		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
+		return -ENOMEM;
+	}
+
+	rx_adapter->eventdev_id = dev_id;
+	rx_adapter->socket_id = socket_id;
+	rx_adapter->conf_cb = conf_cb;
+	rx_adapter->conf_arg = conf_arg;
+	strcpy(rx_adapter->mem_name, mem_name);
+	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
+					rte_eth_dev_count() *
+					sizeof(struct eth_device_info), 0,
+					socket_id);
+	if (rx_adapter->eth_devices == NULL) {
+		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
+		rte_free(rx_adapter);
+		return -ENOMEM;
+	}
+	rte_spinlock_init(&rx_adapter->rx_lock);
+	for (i = 0; i < rte_eth_dev_count(); i++)
+		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
+
+	rte_event_eth_rx_adapter[id] = rx_adapter;
+	return 0;
+}
+
+int
+rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
+		struct rte_event_port_conf *port_config)
+{
+	if (!port_config)
+		return -EINVAL;
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	create_port_conf[id] = *port_config;
+	return rte_event_eth_rx_adapter_create_ext(id, dev_id,
+					default_conf_cb,
+					&create_port_conf[id]);
+}
+
+int
+rte_event_eth_rx_adapter_free(uint8_t id)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (!rx_adapter)
+		return -EINVAL;
+
+	if (rx_adapter->nb_queues) {
+		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
+				rx_adapter->nb_queues);
+		return -EBUSY;
+	}
+
+	rte_free(rx_adapter->eth_devices);
+	rte_free(rx_adapter);
+	rte_event_eth_rx_adapter[id] = NULL;
+
+	return 0;
+}
+
+int
+rte_event_eth_rx_adapter_queue_add(uint8_t id,
+		uint8_t eth_dev_id,
+		int32_t rx_queue_id,
+		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
+{
+	int ret;
+	uint32_t rx_adapter_cap;
+	struct rte_event_eth_rx_adapter *rx_adapter;
+	struct rte_eventdev *dev;
+	struct eth_device_info *dev_info;
+	int start_service = 0;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (!rx_adapter || !queue_conf)
+		return -EINVAL;
+
+	dev = &rte_eventdevs[rx_adapter->eventdev_id];
+	ret = (*dev->dev_ops->eth_rx_adapter_caps_get)(dev, eth_dev_id,
+						&rx_adapter_cap);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
+			"eth port %" PRIu8, id, eth_dev_id);
+		return ret;
+	}
+
+	if (!(rx_adapter_cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_FLOW_ID) &&
+		!(queue_conf->rx_queue_flags &
+			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
+		RTE_EDEV_LOG_ERR("Flow ID required for configuration,"
+				" eth port: %" PRIu8 " adapter id: %" PRIu8,
+				eth_dev_id, id);
+		return -EINVAL;
+	}
+
+	if ((rx_adapter_cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_SINGLE_EVENTQ) &&
+		(rx_queue_id != -1)) {
+		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
+			"event queue id %u eth port %u", id, eth_dev_id);
+		return -EINVAL;
+	}
+
+	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
+			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
+		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
+			 (uint16_t)rx_queue_id);
+		return -EINVAL;
+	}
+
+	start_service = 0;
+	dev_info = &rx_adapter->eth_devices[eth_dev_id];
+
+	if (rx_adapter_cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
+		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
+					-ENOTSUP);
+		if (!dev_info->rx_queue) {
+			dev_info->rx_queue =
+			    rte_zmalloc_socket(rx_adapter->mem_name,
+					dev_info->dev->data->nb_rx_queues *
+					sizeof(struct eth_rx_queue_info), 0,
+					rx_adapter->socket_id);
+			if (!dev_info->rx_queue)
+				return -ENOMEM;
+		}
+
+		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev, eth_dev_id,
+				rx_queue_id, queue_conf);
+		if (!ret) {
+			update_queue_info(rx_adapter,
+					&rx_adapter->eth_devices[eth_dev_id],
+					rx_queue_id,
+					1);
+		}
+	} else {
+		rte_spinlock_lock(&rx_adapter->rx_lock);
+		ret = init_service(rx_adapter, id);
+		if (!ret)
+			ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
+					queue_conf);
+		rte_spinlock_unlock(&rx_adapter->rx_lock);
+		if (!ret)
+			start_service = !!sw_rx_adapter_queue_count(rx_adapter);
+	}
+
+	if (ret)
+		return ret;
+
+	if (start_service)
+		rte_service_component_runstate_set(rx_adapter->service_id, 1);
+
+	return 0;
+}
+
+int
+rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
+				int32_t rx_queue_id)
+{
+	int ret = 0;
+	struct rte_eventdev *dev;
+	struct rte_event_eth_rx_adapter *rx_adapter;
+	struct eth_device_info *dev_info;
+	uint32_t rx_adapter_cap;
+	uint16_t i;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (!rx_adapter)
+		return -EINVAL;
+
+	dev = &rte_eventdevs[rx_adapter->eventdev_id];
+	ret = dev->dev_ops->eth_rx_adapter_caps_get(dev, eth_dev_id,
+						&rx_adapter_cap);
+	if (ret)
+		return ret;
+
+	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
+		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
+		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
+			 (uint16_t)rx_queue_id);
+		return -EINVAL;
+	}
+
+	dev_info = &rx_adapter->eth_devices[eth_dev_id];
+
+	if (rx_adapter_cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
+		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
+				 -ENOTSUP);
+		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev, eth_dev_id,
+							rx_queue_id);
+		if (!ret) {
+			update_queue_info(rx_adapter,
+					&rx_adapter->eth_devices[eth_dev_id],
+					rx_queue_id,
+					0);
+			if (!dev_info->nb_dev_queues) {
+				rte_free(dev_info->rx_queue);
+				dev_info->rx_queue = NULL;
+			}
+		}
+	} else {
+		int rc;
+		rte_spinlock_lock(&rx_adapter->rx_lock);
+		if (rx_queue_id == -1) {
+			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
+				_rte_event_eth_rx_adapter_queue_del(rx_adapter,
+								dev_info,
+								i);
+		} else {
+			_rte_event_eth_rx_adapter_queue_del(rx_adapter,
+							dev_info,
+							(uint16_t)rx_queue_id);
+		}
+
+		rc = eth_poll_wrr_calc(rx_adapter);
+		if (rc)
+			RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
+					rc);
+
+		if (!dev_info->nb_dev_queues) {
+			rte_free(dev_info->rx_queue);
+			dev_info->rx_queue = NULL;
+		}
+
+		rte_spinlock_unlock(&rx_adapter->rx_lock);
+		rte_service_component_runstate_set(rx_adapter->service_id,
+				sw_rx_adapter_queue_count(rx_adapter));
+	}
+
+	return ret;
+}
+
+
+int
+rte_event_eth_rx_adapter_start(uint8_t id)
+{
+	return rx_adapter_ctrl(id, 1);
+}
+
+int
+rte_event_eth_rx_adapter_stop(uint8_t id)
+{
+	return rx_adapter_ctrl(id, 0);
+}
+
+int
+rte_event_eth_rx_adapter_stats_get(uint8_t id,
+			       struct rte_event_eth_rx_adapter_stats *stats)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter;
+	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
+	struct rte_event_eth_rx_adapter_stats dev_stats;
+	struct rte_eventdev *dev;
+	struct eth_device_info *dev_info;
+	uint32_t i;
+	int ret;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (!rx_adapter || !stats)
+		return -EINVAL;
+
+	dev = &rte_eventdevs[rx_adapter->eventdev_id];
+	memset(stats, 0, sizeof(*stats));
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		dev_info = &rx_adapter->eth_devices[i];
+		if (!dev_info->internal_event_port)
+			continue;
+		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev, i,
+						&dev_stats);
+		if (ret)
+			continue;
+		dev_stats_sum.rx_packets += dev_stats.rx_packets;
+		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
+	}
+
+	if (rx_adapter->service_inited)
+		*stats = rx_adapter->stats;
+
+	stats->rx_packets += dev_stats_sum.rx_packets;
+	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
+	return 0;
+}
+
+int
+rte_event_eth_rx_adapter_stats_reset(uint8_t id)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter;
+	struct rte_eventdev *dev;
+	struct eth_device_info *dev_info;
+	uint32_t i;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (!rx_adapter)
+		return -EINVAL;
+
+	dev = &rte_eventdevs[rx_adapter->eventdev_id];
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		dev_info = &rx_adapter->eth_devices[i];
+		if (!dev_info->internal_event_port)
+			continue;
+		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev, i);
+	}
+
+	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
+	return 0;
+}
+
+int
+rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter;
+
+	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	rx_adapter = id_to_rx_adapter(id);
+	if (!rx_adapter || !service_id)
+		return -EINVAL;
+
+	if (rx_adapter->service_inited)
+		*service_id = rx_adapter->service_id;
+
+	return rx_adapter->service_inited ? 0 : -ESRCH;
+}
diff --git a/lib/Makefile b/lib/Makefile
index 86caba1..dbe9b3d 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -52,7 +52,7 @@  DIRS-$(CONFIG_RTE_LIBRTE_CRYPTODEV) += librte_cryptodev
 DEPDIRS-librte_cryptodev := librte_eal librte_mempool librte_ring librte_mbuf
 DEPDIRS-librte_cryptodev += librte_kvargs
 DIRS-$(CONFIG_RTE_LIBRTE_EVENTDEV) += librte_eventdev
-DEPDIRS-librte_eventdev := librte_eal librte_ring
+DEPDIRS-librte_eventdev := librte_eal librte_ring librte_hash librte_ether
 DIRS-$(CONFIG_RTE_LIBRTE_VHOST) += librte_vhost
 DEPDIRS-librte_vhost := librte_eal librte_mempool librte_mbuf librte_ether
 DIRS-$(CONFIG_RTE_LIBRTE_HASH) += librte_hash
diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index 410578a..c404d67 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -43,6 +43,7 @@  CFLAGS += $(WERROR_FLAGS)
 # library source files
 SRCS-y += rte_eventdev.c
 SRCS-y += rte_event_ring.c
+SRCS-y += rte_event_eth_rx_adapter.c
 
 # export include files
 SYMLINK-y-include += rte_eventdev.h
@@ -50,6 +51,7 @@  SYMLINK-y-include += rte_eventdev_pmd.h
 SYMLINK-y-include += rte_eventdev_pmd_pci.h
 SYMLINK-y-include += rte_eventdev_pmd_vdev.h
 SYMLINK-y-include += rte_event_ring.h
+SYMLINK-y-include += rte_event_eth_rx_adapter.h
 
 # versioning export map
 EXPORT_MAP := rte_eventdev_version.map
diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map
index 996b361..e10546f 100644
--- a/lib/librte_eventdev/rte_eventdev_version.map
+++ b/lib/librte_eventdev/rte_eventdev_version.map
@@ -56,6 +56,15 @@  DPDK_17.08 {
 DPDK_17.11 {
 	global:
 
+	rte_event_eth_rx_adapter_create_ext;
+	rte_event_eth_rx_adapter_create;
+	rte_event_eth_rx_adapter_free;
+	rte_event_eth_rx_adapter_queue_add;
+	rte_event_eth_rx_adapter_queue_del;
+	rte_event_eth_rx_adapter_start;
+	rte_event_eth_rx_adapter_stop;
+	rte_event_eth_rx_adapter_stats_get;
+	rte_event_eth_rx_adapter_stats_reset;
 	rte_event_eth_rx_adapter_caps_get;
-
+	rte_event_eth_rx_adapter_service_id_get;
 } DPDK_17.08;