[dpdk-dev,v4,1/6] lib: distributor performance enhancements

Message ID 1483948248-91364-2-git-send-email-david.hunt@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel compilation success Compilation OK

Commit Message

Hunt, David Jan. 9, 2017, 7:50 a.m. UTC
  Now sends bursts of up to 8 mbufs to each worker, and tracks
the in-flight flow-ids (atomic scheduling)

New file with a new api, similar to the old API except with _burst
at the end of the function names

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_distributor/Makefile                    |   2 +
 lib/librte_distributor/rte_distributor.c           |  72 +--
 lib/librte_distributor/rte_distributor_burst.c     | 558 +++++++++++++++++++++
 lib/librte_distributor/rte_distributor_burst.h     | 255 ++++++++++
 lib/librte_distributor/rte_distributor_priv.h      | 189 +++++++
 lib/librte_distributor/rte_distributor_version.map |   9 +
 6 files changed, 1014 insertions(+), 71 deletions(-)
 create mode 100644 lib/librte_distributor/rte_distributor_burst.c
 create mode 100644 lib/librte_distributor/rte_distributor_burst.h
 create mode 100644 lib/librte_distributor/rte_distributor_priv.h
  

Comments

Bruce Richardson Jan. 16, 2017, 4:36 p.m. UTC | #1
On Mon, Jan 09, 2017 at 07:50:43AM +0000, David Hunt wrote:
> Now sends bursts of up to 8 mbufs to each worker, and tracks
> the in-flight flow-ids (atomic scheduling)
> 
> New file with a new api, similar to the old API except with _burst
> at the end of the function names
>

Can you explain why this is necessary, and also how the new version
works compared to the old. I know this is explained in the cover letter,
but the cover letter does not make the git commit log.

> Signed-off-by: David Hunt <david.hunt@intel.com>
> ---
<snip>
> diff --git a/lib/librte_distributor/rte_distributor_burst.c b/lib/librte_distributor/rte_distributor_burst.c
> new file mode 100644
> index 0000000..ae7cf9d
> --- /dev/null
> +++ b/lib/librte_distributor/rte_distributor_burst.c
> @@ -0,0 +1,558 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2016 Intel Corporation. All rights reserved.

Update year since we aren't in 2016 any more.

> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <stdio.h>
> +#include <sys/queue.h>
> +#include <string.h>
> +#include <rte_mbuf.h>
> +#include <rte_memory.h>
> +#include <rte_cycles.h>
> +#include <rte_memzone.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_eal_memconfig.h>
> +#include "rte_distributor_priv.h"
> +#include "rte_distributor_burst.h"
> +
> +TAILQ_HEAD(rte_dist_burst_list, rte_distributor_burst);
> +
> +static struct rte_tailq_elem rte_dist_burst_tailq = {
> +	.name = "RTE_DIST_BURST",
> +};
> +EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
> +
> +/**** APIs called by workers ****/
> +
> +/**** Burst Packet APIs called by workers ****/
> +
> +/* This function should really be called return_pkt_burst() */
1) Why should it be? 
2) Why isn't it called that? 
Please explain the naming. :-)

> +void
> +rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
> +		unsigned int worker_id, struct rte_mbuf **oldpkt,
> +		unsigned int count)
> +{
> +	struct rte_distributor_buffer_burst *buf = &(d->bufs[worker_id]);
> +	unsigned int i;
> +
> +	volatile int64_t *retptr64;
> +
> +
> +	/* if we dont' have any packets to return, return. */
> +	if (count == 0)
> +		return;
> +
So if we don't return anything we don't get any more packets, right?
What happens if we return fewer packets than we were previously given?
If that is allowed, why the restriction on returning at least one?

> +	retptr64 = &(buf->retptr64[0]);
<snip>
> +
> +int
> +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
> +		unsigned int worker_id, struct rte_mbuf **pkts,
> +		struct rte_mbuf **oldpkt, unsigned int return_count)
> +{
> +	unsigned int count;
> +	uint64_t retries = 0;
> +
> +	rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
> +
> +	count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +	while (count == 0) {
> +		rte_pause();
> +		retries++;
> +		if (retries > 1000)
> +			return 0;

This behaviour is different to the original get_pkt() behaviour in that
it has a timeout. Why the change to add the timeout, and should the
timeout not be user configurable in some way?

> +
> +		uint64_t t = rte_rdtsc()+100;

need spaces around the "+"

> +
> +		while (rte_rdtsc() < t)
> +			rte_pause();
> +
> +		count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +	}
> +	return count;
> +}
> +
> +int
> +rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
> +		unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
> +{
> +	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
> +	unsigned int i;
> +
> +	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
> +		/* Switch off the return bit first */
> +		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
> +
> +	for (i = num; i-- > 0; )
> +		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
> +			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
> +
> +	/* set the GET_BUF but even if we got no returns */
> +	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;

Does this mean we are requesting more packets here?

> +
> +	return 0;
> +}
> +
> +/**** APIs called on distributor core ***/
> +
<snip>
> +
> +static unsigned int
> +release(struct rte_distributor_burst *d, unsigned int wkr)

I think this function needs a comment describing what it is doing,
and where is it called from and why. Other functions on distributor side
probably need the same thing too.

> +{
> +	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
> +	unsigned int i;
> +
> +	if (d->backlog[wkr].count == 0)
> +		return 0;
> +
> +	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
> +		rte_pause();
> +
> +	handle_returns(d, wkr);
> +
> +	buf->count = 0;
> +
> +	for (i = 0; i < d->backlog[wkr].count; i++) {
> +		d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
> +				RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
> +		d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
> +	}
> +	buf->count = i;
> +	for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
> +		buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
> +		d->in_flight_tags[wkr][i] = 0;
> +	}
> +
> +	d->backlog[wkr].count = 0;
> +
> +	/* Clear the GET bit */
> +	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
> +	return  buf->count;
> +
> +}
<snip>
> +/**
> + * API called by a worker to get new packets to process. Any previous packets
> + * given to the worker is assumed to have completed processing, and may be
> + * optionally returned to the distributor via the oldpkt parameter.
> + *
> + * @param d
> + *   The distributor instance to be used
> + * @param worker_id
> + *   The worker instance number to use - must be less that num_workers passed
> + *   at distributor creation time.
> + * @param pkts
> + *   The mbufs pointer array to be filled in (up to 8 packets)
> + * @param oldpkt
> + *   The previous packet, if any, being processed by the worker
> + * @param retcount
> + *   The number of packets being returneda

I think you need to document that it can't be zero, if I read the above
C implementation correctly.

> + *
> + * @return
> + *   The number of packets in the pkts array
> + */
> +int
> +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
> +	unsigned int worker_id, struct rte_mbuf **pkts,
> +	struct rte_mbuf **oldpkt, unsigned int retcount);
> +
> +/**
<snip>
> +
> +/**
> + * Number of packets to deal with in bursts. Needs to be 8 so as to
> + * fit in one cache line.
> + */
> +#define RTE_DIST_BURST_SIZE (sizeof(__m128i) / sizeof(uint16_t))

Does this compile for non-x86 with the references to __m128i?

> +
<snip>
> +
> +	struct rte_distributor_returned_pkts returns;
> +};
> +
> +/* All different signature compare functions */
> +enum rte_distributor_match_function {
> +	RTE_DIST_MATCH_SCALAR = 0,
> +	RTE_DIST_MATCH_NUM

I think this last entry should be "RTE_DIST_NUM_MATCH_FNS", as
"NUM" is not a match function, and the define doesn't ready right.

> +};
> +
> +struct rte_distributor_burst {
> +	TAILQ_ENTRY(rte_distributor_burst) next;    /**< Next in list. */
> +
> +	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
> +	unsigned int num_workers;             /**< Number of workers polling */
> +
> +	/**>
> +	 * First cache line in the this array are the tags inflight
> +	 * on the worker core. Second cache line are the backlog
> +	 * that are going to go to the worker core.
> +	 */
> +	uint16_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS][RTE_DIST_BURST_SIZE*2]
> +			__rte_cache_aligned;
> +
> +	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]
> +			__rte_cache_aligned;
> +
> +	struct rte_distributor_buffer_burst bufs[RTE_DISTRIB_MAX_WORKERS];
> +
> +	struct rte_distributor_returned_pkts returns;
> +
> +	enum rte_distributor_match_function dist_match_fn;
> +};
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> diff --git a/lib/librte_distributor/rte_distributor_version.map b/lib/librte_distributor/rte_distributor_version.map
> index 73fdc43..39795a1 100644
> --- a/lib/librte_distributor/rte_distributor_version.map
> +++ b/lib/librte_distributor/rte_distributor_version.map
> @@ -2,14 +2,23 @@ DPDK_2.0 {
>  	global:
>  
>  	rte_distributor_clear_returns;
> +	rte_distributor_clear_returns_burst;
>  	rte_distributor_create;
> +	rte_distributor_create_burst;
>  	rte_distributor_flush;
> +	rte_distributor_flush_burst;
>  	rte_distributor_get_pkt;
> +	rte_distributor_get_pkt_burst;
>  	rte_distributor_poll_pkt;
> +	rte_distributor_poll_pkt_burst;
>  	rte_distributor_process;
> +	rte_distributor_process_burst;
>  	rte_distributor_request_pkt;
> +	rte_distributor_request_pkt_burst;
>  	rte_distributor_return_pkt;
> +	rte_distributor_return_pkt_burst;
>  	rte_distributor_returned_pkts;
> +	rte_distributor_returned_pkts_burst;
>  
>  	local: *;
>  };

The new functions are not present in DPDK 2.0, so you need a new node
for the 17.02 release.

Regards,
/Bruce
  
Hunt, David Jan. 19, 2017, 12:07 p.m. UTC | #2
Thanks for the comments Bruce. Addressed below.


On 16/1/2017 4:36 PM, Bruce Richardson wrote:
> On Mon, Jan 09, 2017 at 07:50:43AM +0000, David Hunt wrote:
>> Now sends bursts of up to 8 mbufs to each worker, and tracks
>> the in-flight flow-ids (atomic scheduling)
>>
>> New file with a new api, similar to the old API except with _burst
>> at the end of the function names
>>
> Can you explain why this is necessary, and also how the new version
> works compared to the old. I know this is explained in the cover letter,
> but the cover letter does not make the git commit log.

Sure. I'll add extra comments into the git comment. The main reason is to
preserve the original API. This gives the user the choice to migrate to 
the new
API should they wish to.

>> Signed-off-by: David Hunt <david.hunt@intel.com>
>> ---
> <snip>
>> diff --git a/lib/librte_distributor/rte_distributor_burst.c 
>> b/lib/librte_distributor/rte_distributor_burst.c
>> new file mode 100644
>> index 0000000..ae7cf9d
>> --- /dev/null
>> +++ b/lib/librte_distributor/rte_distributor_burst.c
>> @@ -0,0 +1,558 @@
>> +/*-
>> + *   BSD LICENSE
>> + *
>> + *   Copyright(c) 2016 Intel Corporation. All rights reserved.
> Update year since we aren't in 2016 any more.
>
>> + *
>> + *   Redistribution and use in source and binary forms, with or without
>> + *   modification, are permitted provided that the following conditions
>> + *   are met:
>> + *
>> + *     * Redistributions of source code must retain the above copyright
>> + *       notice, this list of conditions and the following disclaimer.
>> + *     * Redistributions in binary form must reproduce the above 
>> copyright
>> + *       notice, this list of conditions and the following 
>> disclaimer in
>> + *       the documentation and/or other materials provided with the
>> + *       distribution.
>> + *     * Neither the name of Intel Corporation nor the names of its
>> + *       contributors may be used to endorse or promote products 
>> derived
>> + *       from this software without specific prior written permission.
>> + *
>> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND 
>> CONTRIBUTORS
>> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
>> FITNESS FOR
>> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
>> COPYRIGHT
>> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
>> INCIDENTAL,
>> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS 
>> OF USE,
>> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 
>> ON ANY
>> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 
>> TORT
>> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
>> THE USE
>> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 
>> DAMAGE.
>> + */
>> +
>> +#include <stdio.h>
>> +#include <sys/queue.h>
>> +#include <string.h>
>> +#include <rte_mbuf.h>
>> +#include <rte_memory.h>
>> +#include <rte_cycles.h>
>> +#include <rte_memzone.h>
>> +#include <rte_errno.h>
>> +#include <rte_string_fns.h>
>> +#include <rte_eal_memconfig.h>
>> +#include "rte_distributor_priv.h"
>> +#include "rte_distributor_burst.h"
>> +
>> +TAILQ_HEAD(rte_dist_burst_list, rte_distributor_burst);
>> +
>> +static struct rte_tailq_elem rte_dist_burst_tailq = {
>> +    .name = "RTE_DIST_BURST",
>> +};
>> +EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
>> +
>> +/**** APIs called by workers ****/
>> +
>> +/**** Burst Packet APIs called by workers ****/
>> +
>> +/* This function should really be called return_pkt_burst() */
> 1) Why should it be?
> 2) Why isn't it called that?
> Please explain the naming.

It seemed to me that the main use of this function was to return
the packets from the worker rather than requesting new packets,
whilst also toggling the bit to tell the distributor to send more packets.
So I guess it's OK as it is. I've removed the comment to remove this 
confusion.

>> +void
>> +rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
>> +        unsigned int worker_id, struct rte_mbuf **oldpkt,
>> +        unsigned int count)
>> +{
>> +    struct rte_distributor_buffer_burst *buf = &(d->bufs[worker_id]);
>> +    unsigned int i;
>> +
>> +    volatile int64_t *retptr64;
>> +
>> +
>> +    /* if we dont' have any packets to return, return. */
>> +    if (count == 0)
>> +        return;
>> +
> So if we don't return anything we don't get any more packets, right?
> What happens if we return fewer packets than we were previously given?
> If that is allowed, why the restriction on returning at least one?

You are correct. We should be able to return 0, and still flip the 
handshake
bit to request more packets. This check will be removed.

>> +    retptr64 = &(buf->retptr64[0]);
> <snip>
>> +
>> +int
>> +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
>> +        unsigned int worker_id, struct rte_mbuf **pkts,
>> +        struct rte_mbuf **oldpkt, unsigned int return_count)
>> +{
>> +    unsigned int count;
>> +    uint64_t retries = 0;
>> +
>> +    rte_distributor_request_pkt_burst(d, worker_id, oldpkt, 
>> return_count);
>> +
>> +    count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
>> +    while (count == 0) {
>> +        rte_pause();
>> +        retries++;
>> +        if (retries > 1000)
>> +            return 0;
> This behaviour is different to the original get_pkt() behaviour in that
> it has a timeout. Why the change to add the timeout, and should the
> timeout not be user configurable in some way?

I had another look at this, and managed to clean up this logic. There is 
no longer a need for the retry.

In the old logic, the poll_pkt function returned a pointer, or NULL when 
the handshake bit was not ready.
In the new logic, up until now, I had similar logic, but return 0 for 
both the case where the bit was not ready,
and the bit was ready and the number of valid pointers was 0. This meant 
that there was no way for the loop
to break out when the application was exiting or flushing. I've now 
introduced a -1 return when the bit
is not ready, so will continue looping. But when the distributor sets 
the bot with no packets, the poll_pkt
function will return 0, allowing the loop to exit and return to the caller.

Thanks for that comment, Bruce, it's fixed a major shortcoming in the 
logic.


>> +
>> +        uint64_t t = rte_rdtsc()+100;
> need spaces around the "+"

Done

>> +
>> +        while (rte_rdtsc() < t)
>> +            rte_pause();
>> +
>> +        count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
>> +    }
>> +    return count;
>> +}
>> +
>> +int
>> +rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
>> +        unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
>> +{
>> +    struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
>> +    unsigned int i;
>> +
>> +    for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
>> +        /* Switch off the return bit first */
>> +        buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
>> +
>> +    for (i = num; i-- > 0; )
>> +        buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
>> +            RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
>> +
>> +    /* set the GET_BUF but even if we got no returns */
>> +    buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
> Does this mean we are requesting more packets here?

No, we're setting retptr which means that the distributor will start 
processing the returns cacheline.
The only way to request more packets is
         buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;

This is usually called when you are shutting down a thread and want to 
return what you have, and not
request any new packets from the distributor.

>
>> +
>> +    return 0;
>> +}
>> +
>> +/**** APIs called on distributor core ***/
>> +
> <snip>
>> +
>> +static unsigned int
>> +release(struct rte_distributor_burst *d, unsigned int wkr)
> I think this function needs a comment describing what it is doing,
> and where is it called from and why. Other functions on distributor side
> probably need the same thing too.

Done.

>> +{
>> +    struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
>> +    unsigned int i;
>> +
>> +    if (d->backlog[wkr].count == 0)
>> +        return 0;
>> +
>> +    while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
>> +        rte_pause();
>> +
>> +    handle_returns(d, wkr);
>> +
>> +    buf->count = 0;
>> +
>> +    for (i = 0; i < d->backlog[wkr].count; i++) {
>> +        d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
>> +                RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
>> +        d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
>> +    }
>> +    buf->count = i;
>> +    for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
>> +        buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
>> +        d->in_flight_tags[wkr][i] = 0;
>> +    }
>> +
>> +    d->backlog[wkr].count = 0;
>> +
>> +    /* Clear the GET bit */
>> +    buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
>> +    return  buf->count;
>> +
>> +}
> <snip>
>> +/**
>> + * API called by a worker to get new packets to process. Any 
>> previous packets
>> + * given to the worker is assumed to have completed processing, and 
>> may be
>> + * optionally returned to the distributor via the oldpkt parameter.
>> + *
>> + * @param d
>> + *   The distributor instance to be used
>> + * @param worker_id
>> + *   The worker instance number to use - must be less that 
>> num_workers passed
>> + *   at distributor creation time.
>> + * @param pkts
>> + *   The mbufs pointer array to be filled in (up to 8 packets)
>> + * @param oldpkt
>> + *   The previous packet, if any, being processed by the worker
>> + * @param retcount
>> + *   The number of packets being returneda
> I think you need to document that it can't be zero, if I read the above
> C implementation correctly.

Can be zero now, after resolving some issues indicated above. We should 
be able to return zero
to indicated that we've processed all in the burst but are not returning 
any (i.e. drop)

>> + *
>> + * @return
>> + *   The number of packets in the pkts array
>> + */
>> +int
>> +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
>> +    unsigned int worker_id, struct rte_mbuf **pkts,
>> +    struct rte_mbuf **oldpkt, unsigned int retcount);
>> +
>> +/**
> <snip>
>> +
>> +/**
>> + * Number of packets to deal with in bursts. Needs to be 8 so as to
>> + * fit in one cache line.
>> + */
>> +#define RTE_DIST_BURST_SIZE (sizeof(__m128i) / sizeof(uint16_t))
> Does this compile for non-x86 with the references to __m128i?

Changed to rte_xmm_t


>> +
> <snip>
>> +
>> +    struct rte_distributor_returned_pkts returns;
>> +};
>> +
>> +/* All different signature compare functions */
>> +enum rte_distributor_match_function {
>> +    RTE_DIST_MATCH_SCALAR = 0,
>> +    RTE_DIST_MATCH_NUM
> I think this last entry should be "RTE_DIST_NUM_MATCH_FNS", as
> "NUM" is not a match function, and the define doesn't ready right.

Done.

>> +};
>> +
>> +struct rte_distributor_burst {
>> +    TAILQ_ENTRY(rte_distributor_burst) next;    /**< Next in list. */
>> +
>> +    char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
>> +    unsigned int num_workers;             /**< Number of workers 
>> polling */
>> +
>> +    /**>
>> +     * First cache line in the this array are the tags inflight
>> +     * on the worker core. Second cache line are the backlog
>> +     * that are going to go to the worker core.
>> +     */
>> +    uint16_t 
>> in_flight_tags[RTE_DISTRIB_MAX_WORKERS][RTE_DIST_BURST_SIZE*2]
>> +            __rte_cache_aligned;
>> +
>> +    struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]
>> +            __rte_cache_aligned;
>> +
>> +    struct rte_distributor_buffer_burst bufs[RTE_DISTRIB_MAX_WORKERS];
>> +
>> +    struct rte_distributor_returned_pkts returns;
>> +
>> +    enum rte_distributor_match_function dist_match_fn;
>> +};
>> +
>> +#ifdef __cplusplus
>> +}
>> +#endif
>> +
>> +#endif
>> diff --git a/lib/librte_distributor/rte_distributor_version.map 
>> b/lib/librte_distributor/rte_distributor_version.map
>> index 73fdc43..39795a1 100644
>> --- a/lib/librte_distributor/rte_distributor_version.map
>> +++ b/lib/librte_distributor/rte_distributor_version.map
>> @@ -2,14 +2,23 @@ DPDK_2.0 {
>>       global:
>>         rte_distributor_clear_returns;
>> +    rte_distributor_clear_returns_burst;
>>       rte_distributor_create;
>> +    rte_distributor_create_burst;
>>       rte_distributor_flush;
>> +    rte_distributor_flush_burst;
>>       rte_distributor_get_pkt;
>> +    rte_distributor_get_pkt_burst;
>>       rte_distributor_poll_pkt;
>> +    rte_distributor_poll_pkt_burst;
>>       rte_distributor_process;
>> +    rte_distributor_process_burst;
>>       rte_distributor_request_pkt;
>> +    rte_distributor_request_pkt_burst;
>>       rte_distributor_return_pkt;
>> +    rte_distributor_return_pkt_burst;
>>       rte_distributor_returned_pkts;
>> +    rte_distributor_returned_pkts_burst;
>>         local: *;
>>   };
> The new functions are not present in DPDK 2.0, so you need a new node
> for the 17.02 release.

Sure.

> Regards,
> /Bruce
>

Thanks Bruce. I'll get a new revision up later today.

Regards,
Dave.
  
Hunt, David Jan. 20, 2017, 9:18 a.m. UTC | #3
This patch aims to improve the throughput of the distributor library.

It uses a similar handshake mechanism to the previous version of
the library, in that bits are used to indicate when packets are ready
to be sent to a worker and ready to be returned from a worker. One main
difference is that instead of sending one packet in a cache line, it makes
use of the 7 free spaces in the same cache line in order to send up to
8 packets at a time to/from a worker.

The flow matching algorithm has had significant re-work, and now keeps an
array of inflight flows and an array of backlog flows, and matches incoming
flows to the inflight/backlog flows of all workers so that flow pinning to
workers can be maintained.

The Flow Match algorithm has both scalar and a vector versions, and a
function pointer is used to select the post appropriate function at run time,
depending on the presence of the SSE2 cpu flag. On non-x86 platforms, the
the scalar match function is selected, which should still gives a good boost
in performance over the non-burst API.

v2 changes:
  * Created a common distributor_priv.h header file with common
    definitions and structures.
  * Added a scalar version so it can be built and used on machines without
    sse2 instruction set
  * Added unit autotests
  * Added perf autotest

v3 changes:
  * Addressed mailing list review comments
  * Test code removal
  * Split out SSE match into separate file to facilitate NEON addition
  * Cleaned up conditional compilation flags for SSE2
  * Addressed c99 style compilation errors
  * rebased on latest head (Jan 2 2017, Happy New Year to all)

v4 changes:
   * fixed issue building shared libraries

v5 changes:
   * Removed some un-needed code around retries in worker API calls
   * Cleanup due to review comments on mailing list
   * Cleanup of non-x86 platform compilation, fallback to scalar match

Notes:
   Apps must now work in bursts, as up to 8 are given to a worker at a time
   For performance in matching, Flow ID's are 15-bits
   Original API (and code) is kept for backward compatibility

Performance Gains
   2.2GHz Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz
   2 x XL710 40GbE NICS to 2 x 40Gbps traffic generator channels 64b packets
   separate cores for rx, tx, distributor
    1 worker  - 4.8x
    4 workers - 2.9x
    8 workers - 1.8x
   12 workers - 2.1x
   16 workers - 1.8x

[PATCH v5 1/6] lib: distributor performance enhancements
[PATCH v5 2/6] lib: add distributor vector flow matching
[PATCH v5 3/6] test: unit tests for new distributor burst API
[PATCH v5 4/6] test: add distributor perf autotest
[PATCH v5 5/6] examples/distributor_app: showing burst API
[PATCH v5 6/6] doc: distributor library changes for new burst API
  

Patch

diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
index 4c9af17..2acc54d 100644
--- a/lib/librte_distributor/Makefile
+++ b/lib/librte_distributor/Makefile
@@ -43,9 +43,11 @@  LIBABIVER := 1
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_burst.c
 
 # install this header file
 SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include += rte_distributor_burst.h
 
 # this lib needs eal
 DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index f3f778c..c05f6e3 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -40,79 +40,9 @@ 
 #include <rte_errno.h>
 #include <rte_string_fns.h>
 #include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
 #include "rte_distributor.h"
 
-#define NO_FLAGS 0
-#define RTE_DISTRIB_PREFIX "DT_"
-
-/* we will use the bottom four bits of pointer for flags, shifting out
- * the top four bits to make room (since a 64-bit pointer actually only uses
- * 48 bits). An arithmetic-right-shift will then appropriately restore the
- * original pointer value with proper sign extension into the top bits. */
-#define RTE_DISTRIB_FLAG_BITS 4
-#define RTE_DISTRIB_FLAGS_MASK (0x0F)
-#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
-#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
-#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
-
-#define RTE_DISTRIB_BACKLOG_SIZE 8
-#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
-
-#define RTE_DISTRIB_MAX_RETURNS 128
-#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
-
-/**
- * Maximum number of workers allowed.
- * Be aware of increasing the limit, becaus it is limited by how we track
- * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
- */
-#define RTE_DISTRIB_MAX_WORKERS	64
-
-/**
- * Buffer structure used to pass the pointer data between cores. This is cache
- * line aligned, but to improve performance and prevent adjacent cache-line
- * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
- * the next cache line to worker 0, we pad this out to three cache lines.
- * Only 64-bits of the memory is actually used though.
- */
-union rte_distributor_buffer {
-	volatile int64_t bufptr64;
-	char pad[RTE_CACHE_LINE_SIZE*3];
-} __rte_cache_aligned;
-
-struct rte_distributor_backlog {
-	unsigned start;
-	unsigned count;
-	int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
-};
-
-struct rte_distributor_returned_pkts {
-	unsigned start;
-	unsigned count;
-	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
-};
-
-struct rte_distributor {
-	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
-
-	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
-	unsigned num_workers;                 /**< Number of workers polling */
-
-	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
-		/**< Tracks the tag being processed per core */
-	uint64_t in_flight_bitmask;
-		/**< on/off bits for in-flight tags.
-		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
-		 * the bitmask has to expand.
-		 */
-
-	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
-
-	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
-
-	struct rte_distributor_returned_pkts returns;
-};
-
 TAILQ_HEAD(rte_distributor_list, rte_distributor);
 
 static struct rte_tailq_elem rte_distributor_tailq = {
diff --git a/lib/librte_distributor/rte_distributor_burst.c b/lib/librte_distributor/rte_distributor_burst.c
new file mode 100644
index 0000000..ae7cf9d
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.c
@@ -0,0 +1,558 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memory.h>
+#include <rte_cycles.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
+#include "rte_distributor_burst.h"
+
+TAILQ_HEAD(rte_dist_burst_list, rte_distributor_burst);
+
+static struct rte_tailq_elem rte_dist_burst_tailq = {
+	.name = "RTE_DIST_BURST",
+};
+EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
+
+/**** APIs called by workers ****/
+
+/**** Burst Packet APIs called by workers ****/
+
+/* This function should really be called return_pkt_burst() */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[worker_id]);
+	unsigned int i;
+
+	volatile int64_t *retptr64;
+
+
+	/* if we dont' have any packets to return, return. */
+	if (count == 0)
+		return;
+
+	retptr64 = &(buf->retptr64[0]);
+	/* Spin while handshake bits are set (scheduler clears it) */
+	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
+		rte_pause();
+		uint64_t t = rte_rdtsc()+100;
+
+		while (rte_rdtsc() < t)
+			rte_pause();
+	}
+
+	/*
+	 * OK, if we've got here, then the scheduler has just cleared the
+	 * handshake bits. Populate the retptrs with returning packets.
+	 */
+
+	for (i = count; i < RTE_DIST_BURST_SIZE; i++)
+		buf->retptr64[i] = 0;
+
+	/* Set Return bit for each packet returned */
+	for (i = count; i-- > 0; )
+		buf->retptr64[i] =
+			(((int64_t)(uintptr_t)(oldpkt[i])) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/*
+	 * Finally, set the GET_BUF  to signal to distributor that cache
+	 * line is ready for processing
+	 */
+	*retptr64 |= RTE_DISTRIB_GET_BUF;
+}
+
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	uint64_t ret;
+	int count = 0;
+	unsigned int i;
+
+	/* If bit is set, return */
+	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
+		return 0;
+
+	/* since bufptr64 is signed, this should be an arithmetic shift */
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+		if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
+			ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
+			pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
+		}
+	}
+
+	/*
+	 * so now we've got the contents of the cacheline into an  array of
+	 * mbuf pointers, so toggle the bit so scheduler can start working
+	 * on the next cacheline while we're working.
+	 */
+	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+
+	return count;
+}
+
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts,
+		struct rte_mbuf **oldpkt, unsigned int return_count)
+{
+	unsigned int count;
+	uint64_t retries = 0;
+
+	rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
+
+	count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	while (count == 0) {
+		rte_pause();
+		retries++;
+		if (retries > 1000)
+			return 0;
+
+		uint64_t t = rte_rdtsc()+100;
+
+		while (rte_rdtsc() < t)
+			rte_pause();
+
+		count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	}
+	return count;
+}
+
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	unsigned int i;
+
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+		/* Switch off the return bit first */
+		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+
+	for (i = num; i-- > 0; )
+		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/* set the GET_BUF but even if we got no returns */
+	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+	return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor_burst *d,
+		unsigned int *ret_start, unsigned int *ret_count)
+{
+	if (!oldbuf)
+		return;
+	/* store returns in a circular buffer */
+	d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
+			= (void *)oldbuf;
+	*ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
+	*ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
+}
+
+static inline void
+find_match_scalar(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr)
+{
+	struct rte_distributor_backlog *bl;
+	uint16_t i, j, w;
+
+	/*
+	 * Function overview:
+	 * 1. Loop through all worker ID's
+	 * 2. Compare the current inflights to the incoming tags
+	 * 3. Compare the current backlog to the incoming tags
+	 * 4. Add any matches to the output
+	 */
+
+	for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
+		output_ptr[j] = 0;
+
+	for (i = 0; i < d->num_workers; i++) {
+		bl = &d->backlog[i];
+
+		for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
+			for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+				if (d->in_flight_tags[i][j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+		for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
+			for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+				if (bl->tags[j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+	}
+
+	/*
+	 * At this stage, the output contains 8 16-bit values, with
+	 * each non-zero value containing the worker ID on which the
+	 * corresponding flow is pinned to.
+	 */
+}
+
+
+
+static unsigned int
+handle_returns(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	uintptr_t oldbuf;
+	unsigned int ret_start = d->returns.start,
+			ret_count = d->returns.count;
+	unsigned int count = 0;
+	unsigned int i;
+	/*
+	 * wait for the GET_BUF bit to go high, otherwise we can't send
+	 * the packets to the worker
+	 */
+
+	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
+		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
+					RTE_DISTRIB_FLAG_BITS));
+				/* store returns in a circular buffer */
+				store_return(oldbuf, d, &ret_start, &ret_count);
+				count++;
+				buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+			}
+		}
+		d->returns.start = ret_start;
+		d->returns.count = ret_count;
+		/* Clear for the worker to populate with more returns */
+		buf->retptr64[0] = 0;
+	}
+	return count;
+}
+
+static unsigned int
+release(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	unsigned int i;
+
+	if (d->backlog[wkr].count == 0)
+		return 0;
+
+	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+		rte_pause();
+
+	handle_returns(d, wkr);
+
+	buf->count = 0;
+
+	for (i = 0; i < d->backlog[wkr].count; i++) {
+		d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
+				RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
+		d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
+	}
+	buf->count = i;
+	for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
+		buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
+		d->in_flight_tags[wkr][i] = 0;
+	}
+
+	d->backlog[wkr].count = 0;
+
+	/* Clear the GET bit */
+	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
+	return  buf->count;
+
+}
+
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs)
+{
+	unsigned int next_idx = 0;
+	static unsigned int wkr;
+	struct rte_mbuf *next_mb = NULL;
+	int64_t next_value = 0;
+	uint16_t new_tag = 0;
+	uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
+	unsigned int i, wid;
+	int j, w;
+
+	if (unlikely(num_mbufs == 0)) {
+		/* Flush out all non-full cache-lines to workers. */
+		for (wid = 0 ; wid < d->num_workers; wid++) {
+			if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) {
+				release(d, wid);
+				handle_returns(d, wid);
+			}
+		}
+		return 0;
+	}
+
+	while (next_idx < num_mbufs) {
+		uint16_t matches[RTE_DIST_BURST_SIZE];
+		int pkts;
+
+		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
+			d->bufs[wkr].count = 0;
+
+		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (mbufs[next_idx + i]) {
+				/* flows have to be non-zero */
+				flows[i] = mbufs[next_idx + i]->hash.usr | 1;
+			} else
+				flows[i] = 0;
+		}
+
+		switch (d->dist_match_fn) {
+		default:
+			find_match_scalar(d, &flows[0], &matches[0]);
+		}
+
+		/*
+		 * Matches array now contain the intended worker ID (+1) of
+		 * the incoming packets. Any zeroes need to be assigned
+		 * workers.
+		 */
+
+		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
+			pkts = num_mbufs - next_idx;
+		else
+			pkts = RTE_DIST_BURST_SIZE;
+
+		for (j = 0; j < pkts; j++) {
+
+			next_mb = mbufs[next_idx++];
+			next_value = (((int64_t)(uintptr_t)next_mb) <<
+					RTE_DISTRIB_FLAG_BITS);
+			/*
+			 * User is advocated to set tag vaue for each
+			 * mbuf before calling rte_distributor_process.
+			 * User defined tags are used to identify flows,
+			 * or sessions.
+			 */
+			/* flows MUST be non-zero */
+			new_tag = (uint16_t)(next_mb->hash.usr) | 1;
+
+			/*
+			 * Uncommenting the next line will cause the find_match
+			 * function to be optimised out, making this function
+			 * do parallel (non-atomic) distribution
+			 */
+			/* matches[j] = 0; */
+
+			if (matches[j]) {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[matches[j]-1];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, matches[j]-1);
+				}
+
+				/* Add to worker that already has flow */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+
+			} else {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[wkr];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, wkr);
+				}
+
+				/* Add to current worker worker */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+				/*
+				 * Now that we've just added an unpinned flow
+				 * to a worker, we need to ensure that all
+				 * other packets with that same flow will go
+				 * to the same worker in this burst.
+				 */
+				for (w = j; w < pkts; w++)
+					if (flows[w] == new_tag)
+						matches[w] = wkr+1;
+			}
+		}
+		wkr++;
+		if (wkr >= d->num_workers)
+			wkr = 0;
+	}
+
+	/* Flush out all non-full cache-lines to workers. */
+	for (wid = 0 ; wid < d->num_workers; wid++)
+		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+			release(d, wid);
+
+	return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs)
+{
+	struct rte_distributor_returned_pkts *returns = &d->returns;
+	unsigned int retval = (max_mbufs < returns->count) ?
+			max_mbufs : returns->count;
+	unsigned int i;
+
+	for (i = 0; i < retval; i++) {
+		unsigned int idx = (returns->start + i) &
+				RTE_DISTRIB_RETURNS_MASK;
+
+		mbufs[i] = returns->mbufs[idx];
+	}
+	returns->start += i;
+	returns->count -= i;
+
+	return retval;
+}
+
+/*
+ * Return the number of packets in-flight in a distributor, i.e. packets
+ * being workered on or queued up in a backlog.
+ */
+static inline unsigned int
+total_outstanding(const struct rte_distributor_burst *d)
+{
+	unsigned int wkr, total_outstanding = 0;
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		total_outstanding += d->backlog[wkr].count;
+
+	return total_outstanding;
+}
+
+/*
+ * Flush the distributor, so that there are no outstanding packets in flight or
+ * queued up.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d)
+{
+	const unsigned int flushed = total_outstanding(d);
+	unsigned int wkr;
+
+	while (total_outstanding(d) > 0)
+		rte_distributor_process_burst(d, NULL, 0);
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		handle_returns(d, wkr);
+
+	return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d)
+{
+	unsigned int wkr;
+
+	/* throw away returns, so workers can exit */
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		d->bufs[wkr].retptr64[0] = 0;
+}
+
+/* creates a distributor instance */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name,
+		unsigned int socket_id,
+		unsigned int num_workers)
+{
+	struct rte_distributor_burst *d;
+	struct rte_dist_burst_list *dist_burst_list;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	const struct rte_memzone *mz;
+	unsigned int i;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+
+	if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+	mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+	if (mz == NULL) {
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	d = mz->addr;
+	snprintf(d->name, sizeof(d->name), "%s", name);
+	d->num_workers = num_workers;
+
+	d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+
+	/*
+	 * Set up the backog tags so they're pointing at the second cache
+	 * line for performance during flow matching
+	 */
+	for (i = 0 ; i < num_workers ; i++)
+		d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
+
+	dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
+					  rte_dist_burst_list);
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+	TAILQ_INSERT_TAIL(dist_burst_list, d, next);
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return d;
+}
diff --git a/lib/librte_distributor/rte_distributor_burst.h b/lib/librte_distributor/rte_distributor_burst.h
new file mode 100644
index 0000000..5096b13
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.h
@@ -0,0 +1,255 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_BURST_H_
+#define _RTE_DIST_BURST_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rte_distributor_burst;
+struct rte_mbuf;
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ *   The name to be given to the distributor instance.
+ * @param socket_id
+ *   The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ *   The maximum number of workers that will request packets from this
+ *   distributor
+ * @return
+ *   The newly created distributor instance
+ */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name, unsigned int socket_id,
+		unsigned int num_workers);
+
+/*  *** APIS to be called on the distributor lcore ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on a
+ * single lcore which acts as the distributor lcore for a given distributor
+ * instance. These functions cannot be called on multiple cores simultaneously
+ * without using locking to protect access to the internals of the distributor.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be processed on different cores at
+ * the same time.
+ *
+ * The user is advocated to set tag for each mbuf before calling this function.
+ * If user doesn't set the tag, the tag value can be various values depending on
+ * driver implementation and configuration.
+ *
+ * This is not multi-thread safe and should only be called on a single lcore.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs to be distributed
+ * @param num_mbufs
+ *   The number of mbufs in the mbufs array
+ * @return
+ *   The number of mbufs processed.
+ */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs pointer array to be filled in
+ * @param max_mbufs
+ *   The size of the mbufs array
+ * @return
+ *   The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @return
+ *   The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d);
+
+/*  *** APIS to be called on the worker lcores ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on
+ * multiple lcores which act as workers for a distributor. Each lcore should use
+ * a unique worker id when requesting packets.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * API called by a worker to get new packets to process. Any previous packets
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param pkts
+ *   The mbufs pointer array to be filled in (up to 8 packets)
+ * @param oldpkt
+ *   The previous packet, if any, being processed by the worker
+ * @param retcount
+ *   The number of packets being returned
+ *
+ * @return
+ *   The number of packets in the pkts array
+ */
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **pkts,
+	struct rte_mbuf **oldpkt, unsigned int retcount);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbuf
+ *   The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **oldpkt, int num);
+
+/**
+ * API called by a worker to request a new packet to process.
+ * Any previous packet given to the worker is assumed to have completed
+ * processing, and may be optionally returned to the distributor via
+ * the oldpkt parameter.
+ * Unlike rte_distributor_get_pkt_burst(), this function does not wait for a
+ * new packet to be provided by the distributor.
+ *
+ * NOTE: after calling this function, rte_distributor_poll_pkt_burst() should
+ * be used to poll for the packet requested. The rte_distributor_get_pkt_burst()
+ * API should *not* be used to try and retrieve the new packet.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param oldpkt
+ *   The returning packets, if any, processed by the worker
+ * @param count
+ *   The number of returning packets
+ */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count);
+
+/**
+ * API called by a worker to check for a new packet that was previously
+ * requested by a call to rte_distributor_request_pkt(). It does not wait
+ * for the new packet to be available, but returns NULL if the request has
+ * not yet been fulfilled by the distributor.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbufs
+ *   The array of mbufs being given to the worker
+ *
+ * @return
+ *   The number of packets being given to the worker thread, zero if no
+ *   packet is yet available.
+ */
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **mbufs);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_distributor/rte_distributor_priv.h b/lib/librte_distributor/rte_distributor_priv.h
new file mode 100644
index 0000000..833855f
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_priv.h
@@ -0,0 +1,189 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_PRIV_H_
+#define _RTE_DIST_PRIV_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/*
+ * We will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits.
+ */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
+#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
+#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
+#define RTE_DISTRIB_VALID_BUF (4)  /**< set if bufptr contains ptr */
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS 64
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to three cache lines.
+ * Only 64-bits of the memory is actually used though.
+ */
+union rte_distributor_buffer {
+	volatile int64_t bufptr64;
+	char pad[RTE_CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+/**
+ * Number of packets to deal with in bursts. Needs to be 8 so as to
+ * fit in one cache line.
+ */
+#define RTE_DIST_BURST_SIZE (sizeof(__m128i) / sizeof(uint16_t))
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to two cache lines.
+ * We can pass up to 8 mbufs at a time in one cacheline.
+ * There is a separate cacheline for returns in the burst API.
+ */
+struct rte_distributor_buffer_burst {
+	volatile int64_t bufptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= outgoing to worker */
+
+	int64_t pad1 __rte_cache_aligned;    /* <= one cache line  */
+
+	volatile int64_t retptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= incoming from worker */
+
+	int64_t pad2 __rte_cache_aligned;    /* <= one cache line  */
+
+	int count __rte_cache_aligned;       /* <= number of current mbufs */
+};
+
+
+struct rte_distributor_backlog {
+	unsigned int start;
+	unsigned int count;
+	int64_t pkts[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
+	uint16_t *tags; /* will point to second cacheline of inflights */
+} __rte_cache_aligned;
+
+
+struct rte_distributor_returned_pkts {
+	unsigned int start;
+	unsigned int count;
+	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+		/**< Tracks the tag being processed per core */
+	uint64_t in_flight_bitmask;
+		/**< on/off bits for in-flight tags.
+		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+		 * the bitmask has to expand.
+		 */
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
+
+	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+};
+
+/* All different signature compare functions */
+enum rte_distributor_match_function {
+	RTE_DIST_MATCH_SCALAR = 0,
+	RTE_DIST_MATCH_NUM
+};
+
+struct rte_distributor_burst {
+	TAILQ_ENTRY(rte_distributor_burst) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	/**>
+	 * First cache line in the this array are the tags inflight
+	 * on the worker core. Second cache line are the backlog
+	 * that are going to go to the worker core.
+	 */
+	uint16_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS][RTE_DIST_BURST_SIZE*2]
+			__rte_cache_aligned;
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]
+			__rte_cache_aligned;
+
+	struct rte_distributor_buffer_burst bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+
+	enum rte_distributor_match_function dist_match_fn;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_distributor/rte_distributor_version.map b/lib/librte_distributor/rte_distributor_version.map
index 73fdc43..39795a1 100644
--- a/lib/librte_distributor/rte_distributor_version.map
+++ b/lib/librte_distributor/rte_distributor_version.map
@@ -2,14 +2,23 @@  DPDK_2.0 {
 	global:
 
 	rte_distributor_clear_returns;
+	rte_distributor_clear_returns_burst;
 	rte_distributor_create;
+	rte_distributor_create_burst;
 	rte_distributor_flush;
+	rte_distributor_flush_burst;
 	rte_distributor_get_pkt;
+	rte_distributor_get_pkt_burst;
 	rte_distributor_poll_pkt;
+	rte_distributor_poll_pkt_burst;
 	rte_distributor_process;
+	rte_distributor_process_burst;
 	rte_distributor_request_pkt;
+	rte_distributor_request_pkt_burst;
 	rte_distributor_return_pkt;
+	rte_distributor_return_pkt_burst;
 	rte_distributor_returned_pkts;
+	rte_distributor_returned_pkts_burst;
 
 	local: *;
 };