[dpdk-dev] [PATCH] librte_reorder: New reorder library with unit tests and app

Pattan, Reshma reshma.pattan at intel.com
Wed Jan 7 17:37:45 CET 2015


Self Nacked.
Sending multiple sub patches instead of this big patch.

> -----Original Message-----
> From: Pattan, Reshma
> Sent: Wednesday, January 7, 2015 3:28 PM
> To: dev at dpdk.org
> Cc: Pattan, Reshma
> Subject: [PATCH] librte_reorder: New reorder library with unit tests and app
> 
> From: Reshma Pattan <reshma.pattan at intel.com>
> 
>         1)New library to provide reordering of out of ordered
>         mbufs based on sequence number of mbuf. Library uses reorder buffer
> structure
>         which in tern uses two circular buffers called ready and order buffers.
>         *rte_reorder_create API creates instance of reorder buffer.
>         *rte_reorder_init API initializes given reorder buffer instance.
>         *rte_reorder_reset API resets given reorder buffer instance.
>         *rte_reorder_insert API inserts the mbuf into order circular buffer.
>         *rte_reorder_fill_overflow moves mbufs from order buffer to ready buffer
>         to accomodate early packets in order buffer.
>         *rte_reorder_drain API provides draining facility to fetch out
>         reordered mbufs from order and ready buffers.
> 
>         2)New unit test cases added.
> 
>         3)New application added to verify the performance of library.
> 
>     Signed-off-by: Reshma Pattan <reshma.pattan at intel.com>
>     Signed-off-by: Richardson Bruce <bruce.richardson at intel.com>
> ---
>  app/test/Makefile                              |   2 +
>  app/test/test_reorder.c                        | 452 ++++++++++++++++++
>  config/common_bsdapp                           |   5 +
>  config/common_linuxapp                         |   5 +
>  examples/packet_ordering/Makefile              |  50 ++
>  examples/packet_ordering/main.c                | 637 +++++++++++++++++++++++++
>  lib/Makefile                                   |   1 +
>  lib/librte_eal/common/include/rte_tailq_elem.h |   2 +
>  lib/librte_mbuf/rte_mbuf.h                     |   3 +
>  lib/librte_reorder/Makefile                    |  50 ++
>  lib/librte_reorder/rte_reorder.c               | 464 ++++++++++++++++++
>  lib/librte_reorder/rte_reorder.h               | 184 +++++++
>  mk/rte.app.mk                                  |   4 +
>  13 files changed, 1859 insertions(+)
>  create mode 100644 app/test/test_reorder.c
>  create mode 100644 examples/packet_ordering/Makefile
>  create mode 100644 examples/packet_ordering/main.c
>  create mode 100644 lib/librte_reorder/Makefile
>  create mode 100644 lib/librte_reorder/rte_reorder.c
>  create mode 100644 lib/librte_reorder/rte_reorder.h
> 
> diff --git a/app/test/Makefile b/app/test/Makefile
> index 4311f96..24b27d7 100644
> --- a/app/test/Makefile
> +++ b/app/test/Makefile
> @@ -124,6 +124,8 @@ SRCS-$(CONFIG_RTE_LIBRTE_IVSHMEM) +=
> test_ivshmem.c
>  SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += test_distributor.c
>  SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += test_distributor_perf.c
> 
> +SRCS-$(CONFIG_RTE_LIBRTE_REORDER) += test_reorder.c
> +
>  SRCS-y += test_devargs.c
>  SRCS-y += virtual_pmd.c
>  SRCS-y += packet_burst_generator.c
> diff --git a/app/test/test_reorder.c b/app/test/test_reorder.c
> new file mode 100644
> index 0000000..6a673e2
> --- /dev/null
> +++ b/app/test/test_reorder.c
> @@ -0,0 +1,452 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
> OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> + */
> +
> +#include "test.h"
> +#include "stdio.h"
> +
> +#include <unistd.h>
> +#include <string.h>
> +
> +#include <rte_cycles.h>
> +#include <rte_errno.h>
> +#include <rte_mbuf.h>
> +#include <rte_reorder.h>
> +#include <rte_lcore.h>
> +#include <rte_malloc.h>
> +
> +#include "test.h"
> +
> +#define BURST 32
> +#define REORDER_BUFFER_SIZE 16384
> +#define NUM_MBUFS (2*REORDER_BUFFER_SIZE)
> +#define REORDER_BUFFER_SIZE_INVALID 2049
> +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> RTE_PKTMBUF_HEADROOM)
> +
> +struct reorder_unittest_params {
> +	struct rte_mempool *p;
> +	struct rte_reorder_buffer *b;
> +};
> +
> +static struct reorder_unittest_params default_params  = {
> +	.p = NULL,
> +	.b = NULL
> +};
> +
> +static struct reorder_unittest_params *test_params = &default_params;
> +
> +static int
> +test_reorder_create_inval_name(void)
> +{
> +	struct rte_reorder_buffer *b = NULL;
> +	char *name = NULL;
> +
> +	b = rte_reorder_create(name, rte_socket_id(), REORDER_BUFFER_SIZE);
> +	TEST_ASSERT_EQUAL(b, NULL, "No error on create() with invalid name
> param.");
> +	TEST_ASSERT_EQUAL(rte_errno, EINVAL,
> +				"No error on create() with invalid name
> param.");
> +	return 0;
> +}
> +
> +static int
> +test_reorder_create_inval_size(void)
> +{
> +	struct rte_reorder_buffer *b = NULL;
> +
> +	b = rte_reorder_create("PKT", rte_socket_id(),
> REORDER_BUFFER_SIZE_INVALID);
> +	TEST_ASSERT_EQUAL(b, NULL,
> +				"No error on create() with invalid buffer size
> param.");
> +	TEST_ASSERT_EQUAL(rte_errno, EINVAL,
> +				"No error on create() with invalid buffer size
> param.");
> +	return 0;
> +}
> +
> +static int
> +test_reorder_init_null_buffer(void)
> +{
> +	struct rte_reorder_buffer *b = NULL;
> +	/*
> +	 * The minimum memory area size that should be passed to library is,
> +	 * sizeof(struct rte_reorder_buffer) + (2 * size * sizeof(struct rte_mbuf
> *));
> +	 * Otherwise error will be thrown
> +	 */
> +	unsigned int mzsize = 262336;
> +	b = rte_reorder_init(b, mzsize, "PKT1", REORDER_BUFFER_SIZE);
> +	TEST_ASSERT_EQUAL(b, NULL, "No error on init with NULL buffer.");
> +	TEST_ASSERT_EQUAL(rte_errno, EINVAL, "No error on init with NULL
> buffer.");
> +	return 0;
> +}
> +
> +static int
> +test_reorder_init_inval_mzsize(void)
> +{
> +	struct rte_reorder_buffer *b = NULL;
> +	unsigned int mzsize =  100;
> +	b = rte_malloc(NULL, mzsize, 0);
> +	b = rte_reorder_init(b, mzsize, "PKT1", REORDER_BUFFER_SIZE);
> +	TEST_ASSERT_EQUAL(b, NULL, "No error on init with invalid mem zone
> size.");
> +	TEST_ASSERT_EQUAL(rte_errno, ENOMEM,
> +				"No error on init with invalid mem zone size.");
> +	rte_free(b);
> +	return 0;
> +}
> +
> +static int
> +test_reorder_init_inval_size(void)
> +{
> +	struct rte_reorder_buffer *b = NULL;
> +	unsigned int mzsize =  262336;
> +	b = rte_malloc(NULL, mzsize, 0);
> +	b = rte_reorder_init(b, mzsize, "PKT1",
> REORDER_BUFFER_SIZE_INVALID);
> +	TEST_ASSERT_EQUAL(b, NULL, "No error on init with invalid buffer size
> param.");
> +	TEST_ASSERT_EQUAL(rte_errno, EINVAL,
> +				"No error on init with invalid buffer size
> param.");
> +	rte_free(b);
> +	return 0;
> +}
> +
> +static int
> +test_reorder_init_inval_name(void)
> +{
> +	struct rte_reorder_buffer *b = NULL;
> +	char *name = NULL;
> +	unsigned int mzsize =  262336;
> +	b = rte_malloc(NULL, mzsize, 0);
> +	b = rte_reorder_init(b, mzsize, name, REORDER_BUFFER_SIZE);
> +	TEST_ASSERT_EQUAL(b, NULL, "No error on init with invalid name.");
> +	TEST_ASSERT_EQUAL(rte_errno, EINVAL, "No error on init with invalid
> name.");
> +	rte_free(b);
> +	return 0;
> +}
> +
> +static int
> +test_reorder_buf_instance_existance(void)
> +{
> +	struct rte_reorder_buffer *result = NULL;
> +	struct rte_reorder_buffer *b1 = NULL;
> +	struct rte_reorder_buffer *b2 = NULL;
> +	unsigned int mzsize =  262336;
> +
> +	/* Try to find existing reorder buffer instance */
> +	result = rte_reorder_find_existing("PKT_RO1");
> +	TEST_ASSERT_EQUAL(test_params->b, result,
> +			"existing reorder buffer instance not found");
> +
> +	/* Try to find non existing reorder buffer instance */
> +	result = rte_reorder_find_existing("ro_find_non_existing");
> +	TEST_ASSERT_EQUAL(result, NULL,
> +			"non existing reorder buffer instance found");
> +	TEST_ASSERT_EQUAL(rte_errno, ENOENT,
> +			"non existing reorder buffer instance found");
> +
> +	b1 = rte_malloc(NULL, mzsize, 0);
> +	b2 = rte_reorder_init(b1, mzsize, "PKT_RO1", REORDER_BUFFER_SIZE);
> +	TEST_ASSERT_EQUAL(b2, test_params->b,
> +			"no error on init with existing reorder instance name");
> +	rte_free(b1);
> +
> +	b1 = rte_malloc(NULL, mzsize, 0);
> +	b2 = rte_reorder_init(b1, mzsize, "ro_find_nonexisting1",
> REORDER_BUFFER_SIZE);
> +	TEST_ASSERT_EQUAL(b2, b1,
> +			"error on init with non existing reorder instance name");
> +	rte_reorder_free(b1);
> +
> +	return 0;
> +}
> +
> +static int
> +test_reorder_insert(void)
> +{
> +	struct rte_reorder_buffer *b = test_params->b;
> +	struct rte_mempool *p = test_params->p;
> +	rte_reorder_reset(b);
> +	int num_bufs = 4;
> +	struct rte_mbuf *bufs[num_bufs];
> +	int ret = 0;
> +	if (rte_mempool_get_bulk(p, (void *)bufs, num_bufs) != 0) {
> +		printf("%s: Error getting mbuf from pool\n", __func__);
> +		return -1;
> +	}
> +
> +	/* too early packet */
> +	bufs[0]->seqn = (3*REORDER_BUFFER_SIZE);
> +	ret = rte_reorder_insert(b, bufs[0]);
> +	if (ret != -1 || rte_errno != ERANGE) {
> +		printf("%s:%d: No error on insert() of too early packet with
> seqn:"
> +				" (3*REORDER_BUFFER_SIZE)\n", __func__,
> __LINE__);
> +		rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
> +		return -1;
> +	}
> +
> +	/* early packet */
> +	bufs[1]->seqn = (2*REORDER_BUFFER_SIZE)-2;
> +	ret = rte_reorder_insert(b, bufs[1]);
> +	if (ret == -1 || rte_errno == ENOSPC) {
> +		printf("%s:%d: Error on insert of early packet with seqn:"
> +			" (2*REORDER_BUFFER_SIZE)-2\n", __func__ ,
> __LINE__);
> +		rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
> +		return -1;
> +	}
> +
> +	bufs[2]->seqn = (3*REORDER_BUFFER_SIZE)-1;
> +	ret = rte_reorder_insert(b, bufs[2]);
> +	if (ret != -1 && rte_errno != ENOSPC) {
> +		printf("%s:%d: Error on insert of early packet with seqn:"
> +			" (3*REORDER_BUFFER_SIZE)-3\n", __func__ ,
> __LINE__);
> +		rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
> +		return -1;
> +	}
> +
> +	rte_mempool_put_bulk(p, (void *)bufs, num_bufs);
> +	return 0;
> +}
> +
> +/* Test case covers draining conditions on order buffer */
> +static int
> +test_reorder_drain_order_buf(void)
> +{
> +
> +	struct rte_reorder_buffer *b = test_params->b;
> +	struct rte_mempool *p = test_params->p;
> +	rte_reorder_reset(b);
> +	struct rte_mbuf *bufs[REORDER_BUFFER_SIZE+10] = {NULL};
> +	struct rte_mbuf *robufs[REORDER_BUFFER_SIZE+10] = {NULL};
> +	int cnt;
> +	int i = 0;
> +
> +	if (rte_mempool_get_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10)
> != 0) {
> +		printf("%s: Error getting mbuf from pool\n", __func__);
> +		return -1;
> +	}
> +
> +	/* insert mbufs in order buffer with gaps i.e seqn 0 to 5 and 8,9 inserted
> */
> +	for (i = 0; i < 10; ) {
> +		bufs[i]->seqn = i;
> +		rte_reorder_insert(b, bufs[i]);
> +		if (i == 5)
> +			i += 3;
> +		else
> +			i++;
> +	}
> +
> +	/* should drain till first gap */
> +	cnt = rte_reorder_drain(b, robufs, BURST);
> +	if (cnt != 6) {
> +		printf("%s:%d:%d: number of expected packets not drained\n",
> +			__func__, __LINE__, cnt);
> +		rte_mempool_put_bulk(p, (void *)bufs,
> REORDER_BUFFER_SIZE+10);
> +		return -1;
> +	}
> +
> +	/* now add missing entries and remaining entries till end of order buf */
> +	bufs[6]->seqn = 6;
> +	bufs[7]->seqn = 7;
> +	rte_reorder_insert(b, bufs[6]);
> +	rte_reorder_insert(b, bufs[7]);
> +	for (i = 10; i < REORDER_BUFFER_SIZE; i++) {
> +		bufs[i]->seqn = i;
> +		rte_reorder_insert(b, bufs[i]);
> +	}
> +
> +	/*
> +	 * hence gaps are filled now, drain should return entries
> +	 * from last gap to till end
> +	 */
> +	cnt = rte_reorder_drain(b, robufs, REORDER_BUFFER_SIZE+1);
> +	if (cnt != REORDER_BUFFER_SIZE-6) {
> +		printf("%s:%d: number of expected packets not drained\n",
> +			__func__, __LINE__);
> +		rte_mempool_put_bulk(p, (void *)bufs,
> REORDER_BUFFER_SIZE+10);
> +		return -1;
> +	}
> +	rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
> +	return 0;
> +}
> +
> +/* Test case covers draining conditions on ready buffer */
> +static int
> +test_reorder_drain_ready_buf(void)
> +{
> +
> +	struct rte_reorder_buffer *b = test_params->b;
> +	struct rte_mempool *p = test_params->p;
> +	rte_reorder_reset(b);
> +
> +	struct rte_mbuf *bufs[REORDER_BUFFER_SIZE+10] = {NULL};
> +	struct rte_mbuf *robufs[REORDER_BUFFER_SIZE+10] = {NULL};
> +	int cnt = 0;
> +	int i;
> +	int ret = 0;
> +
> +	if (rte_mempool_get_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10)
> != 0) {
> +		printf("%s: Error getting mbuf from pool\n", __func__);
> +		return -1;
> +	}
> +
> +	/*1: draining of ready buffer with tail == 0 */
> +	for (i = 0; i < REORDER_BUFFER_SIZE; i++) {
> +		bufs[i]->seqn = i;
> +		ret = rte_reorder_insert(b, bufs[i]);
> +		if (ret) {
> +			printf("%s: Error on insert of bufs[%u]\n",
> +				__func__, i);
> +			rte_mempool_put_bulk(p, (void *)bufs,
> REORDER_BUFFER_SIZE+10);
> +			return -1;
> +		}
> +	}
> +
> +	/*
> +	 * insert early packet, this moves entries from order buffer
> +	 * to ready buffer
> +	 */
> +	bufs[REORDER_BUFFER_SIZE]->seqn = (2*REORDER_BUFFER_SIZE)-1;
> +	rte_reorder_insert(b, bufs[REORDER_BUFFER_SIZE]);
> +
> +	/*
> +	 * since ready buffer is full, could drain REORDER_BUFFER_SIZE
> +	 * entries  from ready buffer
> +	 */
> +	cnt = rte_reorder_drain(b, robufs, REORDER_BUFFER_SIZE);
> +	if (cnt != REORDER_BUFFER_SIZE) {
> +		printf("%s:%d:%d: number of expected packets not drained\n",
> +			__func__, __LINE__, cnt);
> +		rte_mempool_put_bulk(p, (void *)bufs,
> REORDER_BUFFER_SIZE+10);
> +		return -1;
> +	}
> +
> +	/*2: draining of ready buffer with tail != 0 */
> +
> +	/* insert mbufs with seqn:REORDER_BUFFER_SIZE to
> 2*REORDER_BUFFER_SIZE */
> +	for (i = 0; i < REORDER_BUFFER_SIZE; i++) {
> +		bufs[i]->seqn = REORDER_BUFFER_SIZE+1+i;
> +		ret = rte_reorder_insert(b, bufs[i]);
> +		if (ret) {
> +			printf("%s: Error on insert of bufs[%u]\n",
> +				__func__, i);
> +			rte_mempool_put_bulk(p, (void *)bufs,
> REORDER_BUFFER_SIZE+10);
> +			return -1;
> +		}
> +	}
> +
> +	/*
> +	 * insert early packet, this will move entries
> +	 * from order buffer to ready buffer
> +	 */
> +	bufs[REORDER_BUFFER_SIZE]->seqn = (3*REORDER_BUFFER_SIZE)-5;
> +	rte_reorder_insert(b, bufs[REORDER_BUFFER_SIZE]);
> +
> +	/*
> +	 * drain only 3 mbufs, this will drain ready buffer
> +	 * and advances tail by 3
> +	 */
> +	cnt = rte_reorder_drain(b, robufs, 3);
> +	if (cnt != 3) {
> +		printf("%s:%d:%d: number of expected packets not drained\n",
> +			__func__, __LINE__, cnt);
> +		rte_mempool_put_bulk(p, (void *)bufs,
> REORDER_BUFFER_SIZE+10);
> +		return -1;
> +	}
> +
> +	/* insert early packet */
> +	bufs[REORDER_BUFFER_SIZE]->seqn = (3*REORDER_BUFFER_SIZE)+2;
> +	rte_reorder_insert(b, bufs[REORDER_BUFFER_SIZE]);
> +
> +	/*
> +	 * perform drain on ready buffer with advanced tail,
> +	 * validates if(tail == size) in drain
> +	 */
> +	rte_reorder_drain(b, robufs, REORDER_BUFFER_SIZE);
> +	rte_mempool_put_bulk(p, (void *)bufs, REORDER_BUFFER_SIZE+10);
> +	return 0;
> +}
> +
> +static int
> +test_setup(void)
> +{
> +	/* reorder buffer instance creation */
> +	if (test_params->b == NULL) {
> +		test_params->b = rte_reorder_create("PKT_RO1",
> rte_socket_id(),
> +
> 	REORDER_BUFFER_SIZE);
> +		if (test_params->b == NULL) {
> +			printf("%s: Error creating reorder buffer instance b\n",
> +					__func__);
> +			return -1;
> +		}
> +	} else
> +		rte_reorder_reset(test_params->b);
> +
> +	/* mempool creation */
> +	if (test_params->p == NULL) {
> +		test_params->p = rte_mempool_create("RO_MBUF_POOL",
> NUM_MBUFS,
> +				MBUF_SIZE, BURST,
> +				sizeof(struct rte_pktmbuf_pool_private),
> +				rte_pktmbuf_pool_init, NULL,
> +				rte_pktmbuf_init, NULL,
> +				rte_socket_id(), 0);
> +		if (test_params->p == NULL) {
> +			printf("%s: Error creating mempool\n", __func__);
> +			return -1;
> +		}
> +	}
> +	return 0;
> +}
> +
> +static struct unit_test_suite reorder_test_suite  = {
> +
> +	.setup = test_setup,
> +	.suite_name = "Reorder Unit Test Suite",
> +	.unit_test_cases = {
> +		TEST_CASE(test_reorder_create_inval_name),
> +		TEST_CASE(test_reorder_create_inval_size),
> +		TEST_CASE(test_reorder_init_null_buffer),
> +		TEST_CASE(test_reorder_init_inval_mzsize),
> +		TEST_CASE(test_reorder_init_inval_size),
> +		TEST_CASE(test_reorder_init_inval_name),
> +		TEST_CASE(test_reorder_buf_instance_existance),
> +		TEST_CASE(test_reorder_insert),
> +		TEST_CASE(test_reorder_drain_order_buf),
> +		TEST_CASE(test_reorder_drain_ready_buf),
> +		TEST_CASES_END()
> +	}
> +};
> +
> +static int
> +test_reorder(void)
> +{
> +	return unit_test_suite_runner(&reorder_test_suite);
> +}
> +
> +static struct test_command reorder_cmd = {
> +	.command = "reorder_autotest",
> +	.callback = test_reorder,
> +};
> +REGISTER_TEST_COMMAND(reorder_cmd);
> diff --git a/config/common_bsdapp b/config/common_bsdapp
> index 9177db1..e3e0e94 100644
> --- a/config/common_bsdapp
> +++ b/config/common_bsdapp
> @@ -334,6 +334,11 @@ CONFIG_RTE_SCHED_PORT_N_GRINDERS=8
>  CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> 
>  #
> +# Compile the reorder library
> +#
> +CONFIG_RTE_LIBRTE_REORDER=y
> +
> +#
>  # Compile librte_port
>  #
>  CONFIG_RTE_LIBRTE_PORT=y
> diff --git a/config/common_linuxapp b/config/common_linuxapp
> index 2f9643b..b5ec730 100644
> --- a/config/common_linuxapp
> +++ b/config/common_linuxapp
> @@ -342,6 +342,11 @@ CONFIG_RTE_SCHED_PORT_N_GRINDERS=8
>  CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> 
>  #
> +# Compile the reorder library
> +#
> +CONFIG_RTE_LIBRTE_REORDER=y
> +
> +#
>  # Compile librte_port
>  #
>  CONFIG_RTE_LIBRTE_PORT=y
> diff --git a/examples/packet_ordering/Makefile
> b/examples/packet_ordering/Makefile
> new file mode 100644
> index 0000000..44bd2e1
> --- /dev/null
> +++ b/examples/packet_ordering/Makefile
> @@ -0,0 +1,50 @@
> +#   BSD LICENSE
> +#
> +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> +#   All rights reserved.
> +#
> +#   Redistribution and use in source and binary forms, with or without
> +#   modification, are permitted provided that the following conditions
> +#   are met:
> +#
> +#     * Redistributions of source code must retain the above copyright
> +#       notice, this list of conditions and the following disclaimer.
> +#     * Redistributions in binary form must reproduce the above copyright
> +#       notice, this list of conditions and the following disclaimer in
> +#       the documentation and/or other materials provided with the
> +#       distribution.
> +#     * Neither the name of Intel Corporation nor the names of its
> +#       contributors may be used to endorse or promote products derived
> +#       from this software without specific prior written permission.
> +#
> +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> NOT
> +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
> USE,
> +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY
> +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> THE USE
> +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> +
> +ifeq ($(RTE_SDK),)
> +$(error "Please define RTE_SDK environment variable")
> +endif
> +
> +# Default target, can be overriden by command line or environment
> +RTE_TARGET ?= x86_64-ivshmem-linuxapp-gcc
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# binary name
> +APP = packet_ordering
> +
> +# all source are stored in SRCS-y
> +SRCS-y := main.c
> +
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS)
> +
> +include $(RTE_SDK)/mk/rte.extapp.mk
> diff --git a/examples/packet_ordering/main.c
> b/examples/packet_ordering/main.c
> new file mode 100644
> index 0000000..8b65275
> --- /dev/null
> +++ b/examples/packet_ordering/main.c
> @@ -0,0 +1,637 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
> OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> + */
> +
> +#include <signal.h>
> +#include <getopt.h>
> +
> +#include <rte_eal.h>
> +#include <rte_common.h>
> +#include <rte_errno.h>
> +#include <rte_ethdev.h>
> +#include <rte_lcore.h>
> +#include <rte_mbuf.h>
> +#include <rte_mempool.h>
> +#include <rte_ring.h>
> +#include <rte_reorder.h>
> +
> +#define RX_DESC_PER_QUEUE 128
> +#define TX_DESC_PER_QUEUE 512
> +
> +#define MAX_PKTS_BURST 32
> +#define REORDER_BUFFER_SIZE 8192
> +#define MBUF_PER_POOL 65535
> +#define MBUF_SIZE (1600 + sizeof(struct rte_mbuf) +
> RTE_PKTMBUF_HEADROOM)
> +#define MBUF_POOL_CACHE_SIZE 250
> +
> +#define RING_SIZE 16384
> +
> +/* uncommnet below line to enable debug logs */
> +/* #define DEBUG */
> +
> +#ifdef DEBUG
> +#define LOG_LEVEL RTE_LOG_DEBUG
> +#define LOG_DEBUG(log_type, fmt, args...) RTE_LOG(DEBUG, log_type, fmt,
> ##args)
> +#else
> +#define LOG_LEVEL RTE_LOG_INFO
> +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
> +#endif
> +
> +/* Macros for printing using RTE_LOG */
> +#define RTE_LOGTYPE_REORDERAPP          RTE_LOGTYPE_USER1
> +
> +unsigned int portmask;
> +volatile uint8_t quit_signal;
> +
> +static struct rte_mempool *mbuf_pool;
> +
> +static struct rte_eth_conf port_conf_default;
> +
> +struct worker_thread_args {
> +	struct rte_ring *ring_in;
> +	struct rte_ring *ring_out;
> +};
> +
> +struct output_buffer {
> +	unsigned count;
> +	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
> +};
> +
> +volatile struct app_stats {
> +	struct {
> +		uint64_t rx_pkts;
> +		uint64_t enqueue_pkts;
> +		uint64_t enqueue_failed_pkts;
> +	} rx __rte_cache_aligned;
> +
> +	struct {
> +		uint64_t dequeue_pkts;
> +		uint64_t enqueue_pkts;
> +		uint64_t enqueue_failed_pkts;
> +	} wkr __rte_cache_aligned;
> +
> +	struct {
> +		uint64_t dequeue_pkts;
> +		/* Too early pkts transmitted directly w/o reordering */
> +		uint64_t early_pkts_txtd_woro;
> +		/* Too early pkts failed from direct transmit */
> +		uint64_t early_pkts_tx_failed_woro;
> +		uint64_t ro_tx_pkts;
> +		uint64_t ro_tx_failed_pkts;
> +	} tx __rte_cache_aligned;
> +} app_stats;
> +
> +/**
> + * Get the last enabled lcore ID
> + *
> + * @return
> + *   The last enabled lcore ID.
> + */
> +static unsigned int
> +get_last_lcore_id(void)
> +{
> +	int i;
> +
> +	for (i = RTE_MAX_LCORE - 1; i >= 0; i--)
> +		if (rte_lcore_is_enabled(i))
> +			return i;
> +	return 0;
> +}
> +
> +/**
> + * Get the previous enabled lcore ID
> + * @param id
> + *  The current lcore ID
> + * @return
> + *   The previous enabled lcore ID or the current lcore
> + *   ID if it is the first available core.
> + */
> +static unsigned int
> +get_previous_lcore_id(unsigned int id)
> +{
> +	int i;
> +
> +	for (i = id - 1; i >= 0; i--)
> +		if (rte_lcore_is_enabled(i))
> +			return i;
> +	return id;
> +}
> +
> +static inline void
> +pktmbuf_free_bulk(struct rte_mbuf *mbuf_table[], unsigned n)
> +{
> +	unsigned int i;
> +
> +	for (i = 0; i < n; i++)
> +		rte_pktmbuf_free(mbuf_table[i]);
> +}
> +
> +/* display usage */
> +static void
> +print_usage(const char *prgname)
> +{
> +	printf("%s [EAL options] -- -p PORTMASK\n"
> +			"  -p PORTMASK: hexadecimal bitmask of ports to
> configure\n",
> +			prgname);
> +}
> +
> +static int
> +parse_portmask(const char *portmask)
> +{
> +	unsigned long pm;
> +	char *end = NULL;
> +
> +	/* parse hexadecimal string */
> +	pm = strtoul(portmask, &end, 16);
> +	if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
> +		return -1;
> +
> +	if (pm == 0)
> +		return -1;
> +
> +	return pm;
> +}
> +
> +/* Parse the argument given in the command line of the application */
> +static int
> +parse_args(int argc, char **argv)
> +{
> +	int opt;
> +	int option_index;
> +	char **argvopt;
> +	char *prgname = argv[0];
> +	static struct option lgopts[] = {
> +		{NULL, 0, 0, 0}
> +	};
> +
> +	argvopt = argv;
> +
> +	while ((opt = getopt_long(argc, argvopt, "p:",
> +					lgopts, &option_index)) != EOF) {
> +		switch (opt) {
> +		/* portmask */
> +		case 'p':
> +			portmask = parse_portmask(optarg);
> +			if (portmask == 0) {
> +				printf("invalid portmask\n");
> +				print_usage(prgname);
> +				return -1;
> +			}
> +			break;
> +		default:
> +			print_usage(prgname);
> +			return -1;
> +		}
> +	}
> +	if (optind <= 1) {
> +		print_usage(prgname);
> +		return -1;
> +	}
> +
> +	argv[optind-1] = prgname;
> +	optind = 0; /* reset getopt lib */
> +	return 0;
> +}
> +
> +static inline int
> +configure_eth_port(uint8_t port_id)
> +{
> +	const uint16_t rxRings = 1, txRings = 1;
> +	const uint8_t nb_ports = rte_eth_dev_count();
> +	int ret;
> +	uint16_t q;
> +
> +	if (port_id > nb_ports)
> +		return -1;
> +
> +	ret = rte_eth_dev_configure(port_id, rxRings, txRings ,
> &port_conf_default);
> +	if (ret != 0)
> +		return ret;
> +
> +	for (q = 0; q < rxRings; q++) {
> +		ret = rte_eth_rx_queue_setup(port_id, q,
> RX_DESC_PER_QUEUE,
> +				rte_eth_dev_socket_id(port_id), NULL,
> +				mbuf_pool);
> +		if (ret < 0)
> +			return ret;
> +	}
> +
> +	for (q = 0; q < txRings; q++) {
> +		ret = rte_eth_tx_queue_setup(port_id, q,
> TX_DESC_PER_QUEUE,
> +				rte_eth_dev_socket_id(port_id), NULL);
> +		if (ret < 0)
> +			return ret;
> +	}
> +
> +	ret = rte_eth_dev_start(port_id);
> +	if (ret < 0)
> +		return ret;
> +
> +	struct ether_addr addr;
> +	rte_eth_macaddr_get(port_id, &addr);
> +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> +			(unsigned)port_id,
> +			addr.addr_bytes[0], addr.addr_bytes[1],
> +			addr.addr_bytes[2], addr.addr_bytes[3],
> +			addr.addr_bytes[4], addr.addr_bytes[5]);
> +
> +	rte_eth_promiscuous_enable(port_id);
> +
> +	return 0;
> +}
> +
> +static void
> +print_stats(void)
> +{
> +	const uint8_t nb_ports = rte_eth_dev_count();
> +	unsigned i;
> +	struct rte_eth_stats eth_stats;
> +
> +	printf("\nRX thread stats:\n");
> +	printf(" - Pkts rxd:				%"PRIu64"\n",
> +						app_stats.rx.rx_pkts);
> +	printf(" - Pkts enqd to workers ring:		%"PRIu64"\n",
> +						app_stats.rx.enqueue_pkts);
> +
> +	printf("\nWorker thread stats:\n");
> +	printf(" - Pkts deqd from workers ring:		%"PRIu64"\n",
> +						app_stats.wkr.dequeue_pkts);
> +	printf(" - Pkts enqd to tx ring:		%"PRIu64"\n",
> +						app_stats.wkr.enqueue_pkts);
> +	printf(" - Pkts enq to tx failed:		%"PRIu64"\n",
> +
> 	app_stats.wkr.enqueue_failed_pkts);
> +
> +	printf("\nTX stats:\n");
> +	printf(" - Pkts deqd from tx ring:		%"PRIu64"\n",
> +						app_stats.tx.dequeue_pkts);
> +	printf(" - Ro Pkts transmitted:			%"PRIu64"\n",
> +						app_stats.tx.ro_tx_pkts);
> +	printf(" - Ro Pkts tx failed:			%"PRIu64"\n",
> +
> 	app_stats.tx.ro_tx_failed_pkts);
> +	printf(" - Pkts transmitted w/o reorder:	%"PRIu64"\n",
> +
> 	app_stats.tx.early_pkts_txtd_woro);
> +	printf(" - Pkts tx failed w/o reorder:		%"PRIu64"\n",
> +
> 	app_stats.tx.early_pkts_tx_failed_woro);
> +
> +	for (i = 0; i < nb_ports; i++) {
> +		rte_eth_stats_get(i, &eth_stats);
> +		printf("\nPort %u stats:\n", i);
> +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> +	}
> +}
> +
> +static void
> +int_handler(int sig_num)
> +{
> +	printf("Exiting on signal %d\n", sig_num);
> +	quit_signal = 1;
> +}
> +
> +/**
> + * This thread receives mbufs from the port and affects them an internal
> + * sequence number to keep track of their order of arrival through an
> + * mbuf structure.
> + * The mbufs are then passed to the worker threads via the rx_to_workers
> + * ring.
> + */
> +static int
> +rx_thread(struct rte_ring *ring_out)
> +{
> +	const uint8_t nb_ports = rte_eth_dev_count();
> +	uint32_t seqn = 0;
> +	uint16_t i, ret = 0;
> +	uint16_t nb_rx_pkts;
> +	uint8_t port_id;
> +	struct rte_mbuf *pkts[MAX_PKTS_BURST];
> +
> +	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
> +							rte_lcore_id());
> +
> +	while (!quit_signal) {
> +
> +		for (port_id = 0; port_id < nb_ports; port_id++) {
> +			if ((portmask & (1 << port_id)) != 0) {
> +
> +				/* receive packets */
> +				nb_rx_pkts = rte_eth_rx_burst(port_id, 0,
> +								pkts,
> MAX_PKTS_BURST);
> +				if (nb_rx_pkts == 0) {
> +					LOG_DEBUG(REORDERAPP,
> +					"%s():Received zero packets\n",
> 	__func__);
> +					continue;
> +				}
> +				app_stats.rx.rx_pkts += nb_rx_pkts;
> +
> +				/* mark sequence number */
> +				for (i = 0; i < nb_rx_pkts; )
> +					pkts[i++]->seqn = seqn++;
> +
> +				/* enqueue to rx_to_workers ring */
> +				ret = rte_ring_enqueue_burst(ring_out, (void *)
> pkts,
> +								nb_rx_pkts);
> +				app_stats.rx.enqueue_pkts += ret;
> +				if (unlikely(ret < nb_rx_pkts)) {
> +					app_stats.rx.enqueue_failed_pkts +=
> +
> 	(nb_rx_pkts-ret);
> +					pktmbuf_free_bulk(&pkts[ret],
> nb_rx_pkts - ret);
> +				}
> +			}
> +		}
> +	}
> +	return 0;
> +}
> +
> +/**
> + * This thread takes bursts of packets from the rx_to_workers ring and
> + * Changes the input port value to output port value. And feds it to
> + * workers_to_tx
> + */
> +static int
> +worker_thread(void *args_ptr)
> +{
> +	const uint8_t nb_ports = rte_eth_dev_count();
> +	uint16_t i, ret = 0;
> +	uint16_t burst_size = 0;
> +	struct worker_thread_args *args;
> +	struct rte_mbuf *burst_buffer[MAX_PKTS_BURST] = { NULL };
> +	struct rte_ring *ring_in, *ring_out;
> +
> +	args = (struct worker_thread_args *) args_ptr;
> +	ring_in  = args->ring_in;
> +	ring_out = args->ring_out;
> +
> +	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
> +							rte_lcore_id());
> +	const unsigned xor_val = (nb_ports > 1);
> +	while (!quit_signal) {
> +
> +		/* dequeue the mbufs from rx_to_workers ring */
> +		burst_size = rte_ring_dequeue_burst(ring_in,
> +				(void *)burst_buffer, MAX_PKTS_BURST);
> +		if (unlikely(burst_size == 0))
> +			continue;
> +
> +		__sync_fetch_and_add(&app_stats.wkr.dequeue_pkts,
> burst_size);
> +
> +		/* just do some operation on mbuf */
> +		for (i = 0; i < burst_size;)
> +			burst_buffer[i++]->port ^= xor_val;
> +
> +		/* enqueue the modified mbufs to workers_to_tx ring */
> +		ret = rte_ring_enqueue_burst(ring_out, (void *)burst_buffer,
> burst_size);
> +		__sync_fetch_and_add(&app_stats.wkr.enqueue_pkts, ret);
> +		if (unlikely(ret < burst_size)) {
> +			/* Return the mbufs to their respective pool, dropping
> packets */
> +
> 	__sync_fetch_and_add(&app_stats.wkr.enqueue_failed_pkts,
> +					(int)burst_size - ret);
> +			pktmbuf_free_bulk(&burst_buffer[ret], burst_size - ret);
> +		}
> +	}
> +	return 0;
> +}
> +
> +static inline void
> +flush_one_port(struct output_buffer *outbuf, uint8_t outp)
> +{
> +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> +			outbuf->count);
> +	app_stats.tx.ro_tx_pkts += nb_tx;
> +
> +	if (unlikely(nb_tx < outbuf->count)) {
> +		/* free the mbufs which failed from transmit */
> +		app_stats.tx.ro_tx_failed_pkts += (outbuf->count - nb_tx);
> +		LOG_DEBUG(REORDERAPP, "%s:Packet loss with tx_burst\n",
> __func__);
> +		pktmbuf_free_bulk(&outbuf->mbufs[nb_tx], outbuf->count -
> nb_tx);
> +	}
> +	outbuf->count = 0;
> +}
> +
> +/**
> + * Dequeue mbufs from the workers_to_tx ring and reorder them before
> + * transmitting.
> + */
> +static int
> +send_thread(struct rte_ring *ring_in)
> +{
> +	int ret;
> +	unsigned int i, dret;
> +	uint16_t nb_dq_mbufs;
> +	uint8_t outp;
> +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> +	struct rte_mbuf *mbufs[MAX_PKTS_BURST];
> +	struct rte_mbuf *rombufs[MAX_PKTS_BURST] = {NULL};
> +	struct rte_reorder_buffer *buffer;
> +
> +	RTE_LOG(INFO, REORDERAPP, "%s() started on lcore %u\n", __func__,
> +							rte_lcore_id());
> +	buffer = rte_reorder_create("PKT_RO", rte_socket_id(),
> REORDER_BUFFER_SIZE);
> +	while (!quit_signal) {
> +
> +		/* deque the mbufs from workers_to_tx ring */
> +		nb_dq_mbufs = rte_ring_dequeue_burst(ring_in,
> +				(void *)mbufs, MAX_PKTS_BURST);
> +
> +		if (unlikely(nb_dq_mbufs == 0))
> +			continue;
> +
> +		app_stats.tx.dequeue_pkts += nb_dq_mbufs;
> +
> +		for (i = 0; i < nb_dq_mbufs; i++) {
> +			/* send dequeued mbufs for reordering */
> +			ret = rte_reorder_insert(buffer, mbufs[i]);
> +
> +			if (ret == -1 && rte_errno == ERANGE) {
> +				/* Too early pkts should be transmitted out
> directly */
> +				LOG_DEBUG(REORDERAPP, "%s():Cannot
> reorder early packet"
> +						"direct enqueuing to TX\n",
> __func__);
> +				outp = mbufs[i]->port;
> +				if ((portmask & (1 << outp)) == 0) {
> +					rte_pktmbuf_free(mbufs[i]);
> +					continue;
> +				}
> +				if (rte_eth_tx_burst(outp, 0, (void *)mbufs[i], 1)
> != 1) {
> +					rte_pktmbuf_free(mbufs[i]);
> +
> 	app_stats.tx.early_pkts_tx_failed_woro++;
> +				} else
> +					app_stats.tx.early_pkts_txtd_woro++;
> +			} else if (ret == -1 && rte_errno == ENOSPC) {
> +				/**
> +				 * Early pkts just outside of window should be
> dropped
> +				 */
> +				rte_pktmbuf_free(mbufs[i]);
> +			}
> +		}
> +
> +		/*
> +		 * drain MAX_PKTS_BURST of reordered
> +		 * mbufs for transmit
> +		 */
> +		dret = rte_reorder_drain(buffer, rombufs, MAX_PKTS_BURST);
> +		for (i = 0; i < dret; i++) {
> +
> +			struct output_buffer *outbuf;
> +			uint8_t outp1;
> +
> +			outp1 = rombufs[i]->port;
> +			/* skip ports that are not enabled */
> +			if ((portmask & (1 << outp1)) == 0) {
> +				rte_pktmbuf_free(rombufs[i]);
> +				continue;
> +			}
> +
> +			outbuf = &tx_buffers[outp1];
> +			outbuf->mbufs[outbuf->count++] = rombufs[i];
> +			if (outbuf->count == MAX_PKTS_BURST)
> +				flush_one_port(outbuf, outp1);
> +		}
> +	}
> +	return 0;
> +}
> +
> +int
> +main(int argc, char **argv)
> +{
> +	int ret;
> +	unsigned nb_ports;
> +	unsigned int lcore_id, last_lcore_id, master_lcore_id;
> +	uint8_t port_id;
> +	uint8_t nb_ports_available;
> +	struct worker_thread_args worker_args = {NULL, NULL};
> +	struct rte_ring *rx_to_workers;
> +	struct rte_ring *workers_to_tx;
> +
> +	/* catch ctrl-c so we can print on exit */
> +	signal(SIGINT, int_handler);
> +
> +	/* Initialize EAL */
> +	ret = rte_eal_init(argc, argv);
> +	if (ret < 0)
> +		return -1;
> +
> +	argc -= ret;
> +	argv += ret;
> +
> +	/* Parse the application specific arguments */
> +	ret = parse_args(argc, argv);
> +	if (ret < 0)
> +		return -1;
> +
> +	/* Check if we have enought cores */
> +	if (rte_lcore_count() < 3)
> +		rte_exit(EXIT_FAILURE, "Error, This application needs at "
> +				"least 3 logical cores to run:\n"
> +				"1 lcore for packet RX\n"
> +				"1 lcore for packet TX\n"
> +				"and at least 1 lcore for worker threads\n");
> +
> +	nb_ports = rte_eth_dev_count();
> +	if (nb_ports == 0)
> +		rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
> +	if (nb_ports != 1 && (nb_ports & 1))
> +		rte_exit(EXIT_FAILURE, "Error: number of ports must be even,
> except "
> +				"when using a single port\n");
> +
> +	mbuf_pool = rte_mempool_create("mbuf_pool", MBUF_PER_POOL,
> MBUF_SIZE,
> +			MBUF_POOL_CACHE_SIZE,
> +			sizeof(struct rte_pktmbuf_pool_private),
> +			rte_pktmbuf_pool_init, NULL,
> +			rte_pktmbuf_init, NULL,
> +			rte_socket_id(), 0);
> +	if (mbuf_pool == NULL)
> +		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
> +
> +	nb_ports_available = nb_ports;
> +
> +	/* initialize all ports */
> +	for (port_id = 0; port_id < nb_ports; port_id++) {
> +		/* skip ports that are not enabled */
> +		if ((portmask & (1 << port_id)) == 0) {
> +			printf("\nSkipping disabled port %d\n", port_id);
> +			nb_ports_available--;
> +			continue;
> +		}
> +		/* init port */
> +		printf("Initializing port %u... done\n", (unsigned) port_id);
> +
> +		if (configure_eth_port(port_id) != 0)
> +			rte_exit(EXIT_FAILURE, "Cannot initialize port
> %"PRIu8"\n",
> +					port_id);
> +	}
> +
> +	if (!nb_ports_available) {
> +		rte_exit(EXIT_FAILURE,
> +			"All available ports are disabled. Please set
> portmask.\n");
> +	}
> +
> +	/* Create rings for inter core communication */
> +	rx_to_workers = rte_ring_create("rx_to_workers", RING_SIZE,
> rte_socket_id(),
> +			RING_F_SP_ENQ);
> +	if (rx_to_workers == NULL)
> +		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
> +
> +	workers_to_tx = rte_ring_create("workers_to_tx", RING_SIZE,
> rte_socket_id(),
> +			RING_F_SC_DEQ);
> +	if (workers_to_tx == NULL)
> +		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
> +
> +	last_lcore_id   = get_last_lcore_id();
> +	master_lcore_id = rte_get_master_lcore();
> +
> +	worker_args.ring_in  = rx_to_workers;
> +	worker_args.ring_out = workers_to_tx;
> +
> +	/* Start worker_thread() on all the available slave cores but the last 1 */
> +	for (lcore_id = 0; lcore_id <= get_previous_lcore_id(last_lcore_id);
> lcore_id++)
> +		if (rte_lcore_is_enabled(lcore_id) && lcore_id !=
> master_lcore_id)
> +			rte_eal_remote_launch(worker_thread, (void
> *)&worker_args,
> +						lcore_id);
> +
> +	/* Start send_thread() on the last slave core */
> +	rte_eal_remote_launch((lcore_function_t *)send_thread,
> workers_to_tx,
> +				last_lcore_id);
> +
> +	/* Start rx_thread() on the master core */
> +	rx_thread(rx_to_workers);
> +
> +	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
> +		if (rte_eal_wait_lcore(lcore_id) < 0)
> +			return -1;
> +	}
> +
> +	print_stats();
> +	return 0;
> +}
> diff --git a/lib/Makefile b/lib/Makefile
> index 0ffc982..5919d32 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -65,6 +65,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) +=
> librte_distributor
>  DIRS-$(CONFIG_RTE_LIBRTE_PORT) += librte_port
>  DIRS-$(CONFIG_RTE_LIBRTE_TABLE) += librte_table
>  DIRS-$(CONFIG_RTE_LIBRTE_PIPELINE) += librte_pipeline
> +DIRS-$(CONFIG_RTE_LIBRTE_REORDER) += librte_reorder
> 
>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
>  DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
> diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h
> b/lib/librte_eal/common/include/rte_tailq_elem.h
> index f74fc7c..3013869 100644
> --- a/lib/librte_eal/common/include/rte_tailq_elem.h
> +++ b/lib/librte_eal/common/include/rte_tailq_elem.h
> @@ -84,6 +84,8 @@ rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
> 
>  rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
> 
> +rte_tailq_elem(RTE_TAILQ_REORDER, "RTE_REORDER")
> +
>  rte_tailq_end(RTE_TAILQ_NUM)
> 
>  #undef rte_tailq_elem
> diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
> index 16059c6..ed27eb8 100644
> --- a/lib/librte_mbuf/rte_mbuf.h
> +++ b/lib/librte_mbuf/rte_mbuf.h
> @@ -262,6 +262,9 @@ struct rte_mbuf {
>  		uint32_t usr;	  /**< User defined tags. See
> @rte_distributor_process */
>  	} hash;                   /**< hash information */
> 
> +	/* sequence number - field used in distributor and reorder library */
> +	uint32_t seqn;
> +
>  	/* second cache line - fields only used in slow path or on TX */
>  	MARKER cacheline1 __rte_cache_aligned;
> 
> diff --git a/lib/librte_reorder/Makefile b/lib/librte_reorder/Makefile
> new file mode 100644
> index 0000000..12b916f
> --- /dev/null
> +++ b/lib/librte_reorder/Makefile
> @@ -0,0 +1,50 @@
> +#   BSD LICENSE
> +#
> +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> +#   All rights reserved.
> +#
> +#   Redistribution and use in source and binary forms, with or without
> +#   modification, are permitted provided that the following conditions
> +#   are met:
> +#
> +#     * Redistributions of source code must retain the above copyright
> +#       notice, this list of conditions and the following disclaimer.
> +#     * Redistributions in binary form must reproduce the above copyright
> +#       notice, this list of conditions and the following disclaimer in
> +#       the documentation and/or other materials provided with the
> +#       distribution.
> +#     * Neither the name of Intel Corporation nor the names of its
> +#       contributors may be used to endorse or promote products derived
> +#       from this software without specific prior written permission.
> +#
> +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> NOT
> +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
> USE,
> +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY
> +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> THE USE
> +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_reorder.a
> +
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_REORDER) := rte_reorder.c
> +
> +# install this header file
> +SYMLINK-$(CONFIG_RTE_LIBRTE_REORDER)-include := rte_reorder.h
> +
> +# this lib depends upon:
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_REORDER) += lib/librte_mbuf
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_REORDER) += lib/librte_eal
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_reorder/rte_reorder.c b/lib/librte_reorder/rte_reorder.c
> new file mode 100644
> index 0000000..fb3e986
> --- /dev/null
> +++ b/lib/librte_reorder/rte_reorder.c
> @@ -0,0 +1,464 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
> OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> + */
> +
> +#include <inttypes.h>
> +#include <string.h>
> +
> +#include <rte_log.h>
> +#include <rte_mbuf.h>
> +#include <rte_memzone.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_errno.h>
> +#include <rte_tailq.h>
> +#include <rte_malloc.h>
> +
> +#include "rte_reorder.h"
> +
> +TAILQ_HEAD(rte_reorder_list, rte_tailq_entry);
> +
> +#define NO_FLAGS 0
> +#define RTE_REORDER_PREFIX "RO_"
> +#define RTE_REORDER_NAMESIZE 32
> +
> +/* Macros for printing using RTE_LOG */
> +#define RTE_LOGTYPE_REORDER	RTE_LOGTYPE_USER1
> +
> +/* A generic circular buffer */
> +struct cir_buffer {
> +	unsigned int size;   /**< Number of entries that can be stored */
> +	unsigned int mask;   /**< [buffer_size - 1]: used for wrap-around */
> +	unsigned int head;   /**< insertion point in buffer */
> +	unsigned int tail;   /**< extraction point in buffer */
> +	struct rte_mbuf **entries;
> +} __rte_cache_aligned;
> +
> +/* The reorder buffer data structure itself */
> +struct rte_reorder_buffer {
> +	char name[RTE_REORDER_NAMESIZE];
> +	uint32_t min_seqn;  /**< Lowest seq. number that can be in the buffer
> */
> +	unsigned int memsize; /**< memory area size of reorder buffer */
> +	struct cir_buffer ready_buf; /**< temp buffer for dequeued entries */
> +	struct cir_buffer order_buf; /**< buffer used to reorder entries */
> +} __rte_cache_aligned;
> +
> +struct rte_reorder_buffer *
> +rte_reorder_init(void *buf, unsigned int bufsize,
> +	const char *name, unsigned int size)
> +{
> +	struct rte_reorder_buffer *b = (struct rte_reorder_buffer *)buf;
> +	const unsigned int min_bufsize = sizeof(*b) +
> +					(2 * size * sizeof(struct rte_mbuf *));
> +
> +	struct rte_reorder_buffer *be;
> +	struct rte_tailq_entry *te;
> +	struct rte_reorder_list *reorder_list;
> +
> +	/* check that we have an initialised tail queue */
> +	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER,
> rte_reorder_list);
> +	if (!reorder_list) {
> +		rte_errno = E_RTE_NO_TAILQ;
> +		return NULL;
> +	}
> +
> +	if (!rte_is_power_of_2(size)) {
> +		RTE_LOG(ERR, REORDER, "Invalid reorder buffer size"
> +				" - Not a power of 2\n");
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +	if (b == NULL) {
> +		RTE_LOG(ERR, REORDER, "Invalid reorder buffer parameter:"
> +					" NULL\n");
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +	if (name == NULL) {
> +		RTE_LOG(ERR, REORDER, "Invalid reorder buffer name ptr:"
> +					" NULL\n");
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +	if (bufsize < min_bufsize) {
> +		RTE_LOG(ERR, REORDER, "Invalid reorder buffer size:%u, "
> +			"should be minimum:%u\n", bufsize, min_bufsize);
> +		rte_errno = ENOMEM;
> +		return NULL;
> +	}
> +
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	/* guarantee there's no existing */
> +	TAILQ_FOREACH(te, reorder_list, next) {
> +		be = (struct rte_reorder_buffer *) te->data;
> +		if (strncmp(name, be->name, RTE_REORDER_NAMESIZE) == 0)
> +			break;
> +	}
> +	if (te != NULL) {
> +		b = be;
> +		memset(b, 0, bufsize);
> +		snprintf(b->name, sizeof(b->name), "%s", name);
> +		b->memsize = bufsize;
> +		b->order_buf.size = b->ready_buf.size = size;
> +		b->order_buf.mask = b->ready_buf.mask = size - 1;
> +		b->ready_buf.entries = (void *)&b[1];
> +		b->order_buf.entries = RTE_PTR_ADD(&b[1],
> +				size * sizeof(b->ready_buf.entries[0]));
> +		goto exit;
> +	}
> +
> +	/* allocate tailq entry */
> +	te = rte_zmalloc("REORDER_TAILQ_ENTRY", sizeof(*te), 0);
> +	if (te == NULL) {
> +		RTE_LOG(ERR, REORDER, "Failed to allocate tailq entry\n");
> +		goto exit;
> +	}
> +
> +	memset(b, 0, bufsize);
> +	snprintf(b->name, sizeof(b->name), "%s", name);
> +	b->memsize = bufsize;
> +	b->order_buf.size = b->ready_buf.size = size;
> +	b->order_buf.mask = b->ready_buf.mask = size - 1;
> +	b->ready_buf.entries = (void *)&b[1];
> +	b->order_buf.entries = RTE_PTR_ADD(&b[1],
> +			size * sizeof(b->ready_buf.entries[0]));
> +
> +	te->data = (void *) b;
> +
> +	TAILQ_INSERT_TAIL(reorder_list, te, next);
> +
> +exit:
> +	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +	return b;
> +}
> +
> +void rte_reorder_reset(struct rte_reorder_buffer *b)
> +{
> +	unsigned int i = 0;
> +	char name[RTE_REORDER_NAMESIZE];
> +	/* Free up the mbufs of order buffer & ready buffer */
> +	for (i = 0; i < b->order_buf.size; i++) {
> +		if (b->order_buf.entries[i])
> +			rte_pktmbuf_free(b->order_buf.entries[i]);
> +		if (b->ready_buf.entries[i])
> +			rte_pktmbuf_free(b->ready_buf.entries[i]);
> +	}
> +	snprintf(name, sizeof(name), "%s", b->name);
> +	rte_reorder_init(b, b->memsize, name, b->order_buf.size);
> +}
> +
> +struct rte_reorder_buffer*
> +rte_reorder_create(const char *name, unsigned socket_id, unsigned int size)
> +{
> +	const struct rte_memzone *mz;
> +	struct rte_reorder_buffer *b = NULL;
> +	struct rte_tailq_entry *te;
> +	struct rte_reorder_list *reorder_list;
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +
> +	/* check that we have an initialised tail queue */
> +	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER,
> rte_reorder_list);
> +	if (!reorder_list) {
> +		rte_errno = E_RTE_NO_TAILQ;
> +		return NULL;
> +	}
> +
> +	/* Check user arguments. */
> +	if (!rte_is_power_of_2(size)) {
> +		RTE_LOG(ERR, REORDER, "Invalid reorder buffer size"
> +				" - Not a power of 2\n");
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +	if (name == NULL) {
> +		RTE_LOG(ERR, REORDER, "Invalid reorder buffer name ptr:"
> +					" NULL\n");
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	/* guarantee there's no existing */
> +	TAILQ_FOREACH(te, reorder_list, next) {
> +		b = (struct rte_reorder_buffer *) te->data;
> +		if (strncmp(name, b->name, RTE_REORDER_NAMESIZE) == 0)
> +			break;
> +	}
> +	if (te != NULL)
> +		goto exit;
> +
> +	/* allocate tailq entry */
> +	te = rte_zmalloc("REORDER_TAILQ_ENTRY", sizeof(*te), 0);
> +	if (te == NULL) {
> +		RTE_LOG(ERR, REORDER, "Failed to allocate tailq entry\n");
> +		goto exit;
> +	}
> +
> +	/* Allocate memory to store the reorder buffer structure. */
> +	const unsigned int bufsize = sizeof(struct rte_reorder_buffer) +
> +					(2 * size * sizeof(struct rte_mbuf *));
> +	snprintf(mz_name, sizeof(mz_name), RTE_REORDER_PREFIX"%s",
> name);
> +	mz = rte_memzone_reserve(mz_name, bufsize,
> +			socket_id, NO_FLAGS);
> +	if (mz == NULL) {
> +		RTE_LOG(ERR, REORDER, "Memzone allocation failed\n");
> +		rte_errno = ENOMEM;
> +		return NULL;
> +	}
> +	b = mz->addr;
> +	memset(b, 0, bufsize);
> +	snprintf(b->name, sizeof(b->name), "%s", name);
> +	b->memsize = bufsize;
> +	b->order_buf.size = b->ready_buf.size = size;
> +	b->order_buf.mask = b->ready_buf.mask = size - 1;
> +	b->ready_buf.entries = (void *)&b[1];
> +	b->order_buf.entries = RTE_PTR_ADD(&b[1],
> +			size * sizeof(b->ready_buf.entries[0]));
> +
> +	te->data = (void *) b;
> +
> +	TAILQ_INSERT_TAIL(reorder_list, te, next);
> +
> +exit:
> +	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +	return b;
> +}
> +
> +void
> +rte_reorder_free(struct rte_reorder_buffer *b)
> +{
> +	struct rte_reorder_list *reorder_list;
> +	struct rte_tailq_entry *te;
> +
> +	/* Check user arguments. */
> +	if (b == NULL)
> +		return;
> +
> +	/* check that we have an initialised tail queue */
> +	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER,
> rte_reorder_list);
> +	if (!reorder_list) {
> +		rte_errno = E_RTE_NO_TAILQ;
> +		return;
> +	}
> +
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	/* find our tailq entry */
> +	TAILQ_FOREACH(te, reorder_list, next) {
> +		if (te->data == (void *) b)
> +			break;
> +	}
> +	if (te == NULL) {
> +		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +		return;
> +	}
> +
> +	TAILQ_REMOVE(reorder_list, te, next);
> +
> +	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	rte_free(b);
> +	rte_free(te);
> +}
> +
> +struct rte_reorder_buffer *
> +rte_reorder_find_existing(const char *name)
> +{
> +	struct rte_reorder_buffer *b = NULL;
> +	struct rte_tailq_entry *te;
> +	struct rte_reorder_list *reorder_list;
> +
> +	/* check that we have an initialised tail queue */
> +	reorder_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_REORDER,
> rte_reorder_list);
> +	if (!reorder_list) {
> +		rte_errno = E_RTE_NO_TAILQ;
> +		return NULL;
> +	}
> +
> +	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
> +	TAILQ_FOREACH(te, reorder_list, next) {
> +		b = (struct rte_reorder_buffer *) te->data;
> +		if (strncmp(name, b->name, RTE_REORDER_NAMESIZE) == 0)
> +			break;
> +	}
> +	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	if (te == NULL) {
> +		rte_errno = ENOENT;
> +		return NULL;
> +	}
> +
> +	return b;
> +}
> +
> +static unsigned
> +rte_reorder_fill_overflow(struct rte_reorder_buffer *b, unsigned n)
> +{
> +	/*
> +	 * 1. Move all ready entries that fit to the ready_buf
> +	 * 2. check if we meet the minimum needed (n).
> +	 * 3. If not, then skip any gaps and keep moving.
> +	 * 4. If at any point the ready buffer is full, stop
> +	 * 5. Return the number of positions the order_buf head has moved
> +	 */
> +
> +	struct cir_buffer *order_buf = &b->order_buf,
> +			*ready_buf = &b->ready_buf;
> +
> +	unsigned int order_head_adv = 0;
> +
> +	/*
> +	 * move at least n packets to ready buffer, assuming ready buffer
> +	 * has room for those packets.
> +	 */
> +	while (order_head_adv < n &&
> +			((ready_buf->head + 1) & ready_buf->mask) !=
> ready_buf->tail) {
> +
> +		/* if we are blocked waiting on a packet, skip it */
> +		if (order_buf->entries[order_buf->head] == NULL) {
> +			order_buf->head++, order_head_adv++;
> +
> +			if (order_buf->head == order_buf->size)
> +				order_buf->head = 0;
> +		}
> +
> +		/* Move all ready entries that fit to the ready_buf */
> +		while (order_buf->entries[order_buf->head] != NULL) {
> +			ready_buf->entries[ready_buf->head++] =
> +					order_buf->entries[order_buf->head];
> +
> +			order_buf->entries[order_buf->head++] = NULL;
> +			order_head_adv++;
> +
> +			if (ready_buf->head == ready_buf->size)
> +				ready_buf->head = 0;
> +			if (order_buf->head == order_buf->size)
> +				order_buf->head = 0;
> +
> +			if (((ready_buf->head+1) & ready_buf->mask) ==
> ready_buf->tail)
> +				break;
> +		}
> +	}
> +
> +	b->min_seqn += order_head_adv;
> +	/* Return the number of positions the order_buf head has moved */
> +	return order_head_adv;
> +}
> +
> +int
> +rte_reorder_insert(struct rte_reorder_buffer *b, struct rte_mbuf *mbuf)
> +{
> +	uint32_t offset, position;
> +	struct cir_buffer *order_buf = &b->order_buf;
> +
> +	/*
> +	 * calculate the offset from the head pointer we need to go.
> +	 * The subtraction takes care of the sequence number wrapping.
> +	 * For example (using 16-bit for brevity):
> +	 *	min_seqn  = 0xFFFD
> +	 *	mbuf_seqn = 0x0010
> +	 *	offset    = 0x0010 - 0xFFFD = 0x13
> +	 */
> +	offset = mbuf->seqn - b->min_seqn;
> +
> +	/*
> +	 * action to take depends on offset.
> +	 * offset < buffer->size: the mbuf fits within the current window of
> +	 *    sequence numbers we can reorder. EXPECTED CASE.
> +	 * offset > buffer->size: the mbuf is outside the current window. There
> +	 *    are a number of cases to consider:
> +	 *    1. The packet sequence is just outside the window, then we need
> +	 *       to see about shifting the head pointer and taking any ready
> +	 *       to return packets out of the ring. If there was a delayed
> +	 *       or dropped packet preventing drains from shifting the window
> +	 *       this case will skip over the dropped packet instead, and any
> +	 *       packets dequeued here will be returned on the next drain call.
> +	 *    2. The packet sequence number is vastly outside our window, taken
> +	 *       here as having offset greater than twice the buffer size. In
> +	 *       this case, the packet is probably an old or late packet that
> +	 *       was previously skipped, so just enqueue the packet for
> +	 *       immediate return on the next drain call, or else return error.
> +	 */
> +	if (offset < b->order_buf.size) {
> +		position = (order_buf->head + offset) & order_buf->mask;
> +		order_buf->entries[position] = mbuf;
> +	} else if (offset < 2 * b->order_buf.size) {
> +		if (rte_reorder_fill_overflow(b, offset - order_buf->size) <
> +				offset - order_buf->size) {
> +			/* Put in handling for enqueue straight to output */
> +			rte_errno = ENOSPC;
> +			return -1;
> +		}
> +		offset = mbuf->seqn - b->min_seqn;
> +		position = (order_buf->head + offset) & order_buf->mask;
> +		order_buf->entries[position] = mbuf;
> +	} else {
> +		/* Put in handling for enqueue straight to output */
> +		rte_errno = ERANGE;
> +		return -1;
> +	}
> +	return 0;
> +}
> +
> +unsigned int
> +rte_reorder_drain(struct rte_reorder_buffer *b, struct rte_mbuf **mbufs,
> +		unsigned max_mbufs)
> +{
> +	unsigned int drain_cnt = 0;
> +
> +	struct cir_buffer *order_buf = &b->order_buf,
> +			*ready_buf = &b->ready_buf;
> +
> +	/* Try to fetch requested number of mbufs from ready buffer */
> +	while ((drain_cnt < max_mbufs) && (ready_buf->tail != ready_buf-
> >head)) {
> +		mbufs[drain_cnt++] = ready_buf->entries[ready_buf->tail++];
> +		if (ready_buf->tail == ready_buf->size)
> +			ready_buf->tail = 0;
> +	}
> +
> +	/*
> +	 * If requested number of buffers not fetched from ready buffer, fetch
> +	 * remaining buffers from order buffer
> +	 */
> +	while ((drain_cnt < max_mbufs) &&
> +			(order_buf->entries[order_buf->head] != NULL)) {
> +		mbufs[drain_cnt++] = order_buf->entries[order_buf->head];
> +		order_buf->entries[order_buf->head] = NULL;
> +		b->min_seqn++;
> +		order_buf->head++;
> +		if (order_buf->head == order_buf->size)
> +			order_buf->head = 0;
> +	}
> +
> +	return drain_cnt;
> +}
> diff --git a/lib/librte_reorder/rte_reorder.h b/lib/librte_reorder/rte_reorder.h
> new file mode 100644
> index 0000000..3ec7011
> --- /dev/null
> +++ b/lib/librte_reorder/rte_reorder.h
> @@ -0,0 +1,184 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
> OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> + */
> +
> +#ifndef _RTE_REORDER_H_
> +#define _RTE_REORDER_H_
> +
> +/**
> + * @file
> + * RTE reorder
> + *
> + * Reorder library is a component which is designed to
> + * provide ordering of out of ordered packets based on
> + * sequence number present in mbuf.
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +struct rte_reorder_buffer;
> +
> +/**
> + * Create a new reorder buffer instance
> + *
> + * Allocate memory and initialize a new reorder buffer in that
> + * memory, returning the reorder buffer pointer to the user
> + *
> + * @param name
> + *   The name to be given to the reorder buffer instance.
> + * @param socket_id
> + *   The NUMA node on which the memory for the reorder buffer
> + *   instance is to be reserved.
> + * @param size
> + *   Max number of elements that can be stored in the reorder buffer
> + * @return
> + *   The initialized reorder buffer instance, or NULL on error
> + *   On error case, rte_errno will be set appropriately:
> + *    - ENOMEM - no appropriate memory area found in which to create
> memzone
> + *    - EINVAL - invalid parameters
> + */
> +struct rte_reorder_buffer *
> +rte_reorder_create(const char *name, unsigned socket_id, unsigned int size);
> +
> +/**
> + * Initializes given reorder buffer instance
> + *
> + * @param buf
> + *   Pointer to memory area where reorder buffer instance
> + *   should be initialized
> + * @param bufsize
> + *   Size of the memory area to be used for reorder buffer instance
> + *   initialization
> + * @param name
> + *   The name to be given to the reorder buffer instance
> + * @param size
> + *   Number of elements that can be stored in reorder buffer
> + * @return
> + *   The initialized reorder buffer instance, or NULL on error
> + *   On error case, rte_errno will be set appropriately:
> + *    - EINVAL - invalid parameters
> + *    - ENOMEM - not enough memory for reorder buffer instance
> + *    initialization
> + */
> +struct rte_reorder_buffer *
> +rte_reorder_init(void *buf, unsigned int bufsize,
> +		const char *name, unsigned int size);
> +
> +/**
> + * Reset the given reorder buffer instance with initial values.
> + *
> + * @param b
> + *   Reorder buffer instance which has to be reset
> + */
> +void rte_reorder_reset(struct rte_reorder_buffer *b);
> +
> +/**
> + * Find an existing reorder buffer instance
> + * and return a pointer to it.
> + *
> + * @param name
> + *   Name of the reorder buffer instacne as passed to rte_reorder_create()
> + * @return
> + *   Pointer to reorder buffer instance or NULL if object not found with
> rte_errno
> + *   set appropriately. Possible rte_errno values include:
> + *    - ENOENT - required entry not available to return.
> + *    - E_RTE_NO_TAILQ - no tailq list could be got for the
> + *    reorder instance list
> + */
> +struct rte_reorder_buffer *
> +rte_reorder_find_existing(const char *name);
> +
> +/**
> + * Free reorder buffer instance.
> + *
> + * @param b
> + *   reorder buffer instance
> + * @return
> + *   None
> + */
> +void
> +rte_reorder_free(struct rte_reorder_buffer *b);
> +
> +/**
> + * Insert given mbuf in reorder buffer in its correct position
> + *
> + * The given mbuf is to be reordered relative to other mbufs in the system.
> + * The mbuf must contain a sequence number which is then used to place
> + * the buffer in the correct position in the reorder buffer. Reordered
> + * packets can later be taken from the buffer using the rte_reorder_drain()
> + * API.
> + *
> + * @param b
> + *   Reorder buffer where the mbuf has to be inserted.
> + * @param mbuf
> + *   mbuf of packet that needs to be inserted in reorder buffer.
> + * @return
> + *   0 on success
> + *   -1 on error
> + *   On error case, rte_errno will be set appropriately:
> + *    - ENOSPC - Cannot move existing mbufs from reorder buffer to
> accomodate ealry mbuf.
> + *    But mbuf can be accomodated by performing drain and then insert.
> + *    - ERANGE - Too early or late mbuf which is vastly out of
> + *    range of expected window should be ingnored without any handling.
> + */
> +int
> +rte_reorder_insert(struct rte_reorder_buffer *b, struct rte_mbuf *mbuf);
> +
> +/**
> + * Fetch reordered buffers
> + *
> + * Returns a set of in-order buffers from the reorder buffer structure. Gaps
> + * may be present in the sequence numbers of the mbuf if packets have been
> + * delayed too long before reaching the reorder window, or have been
> previously
> + * dropped by the system.
> + *
> + * @param b
> + *   Reorder buffer instance from which packets are to be drained
> + * @param mbufs
> + *   array of mbufs where reordered packets will be inserted from reorder
> buffer
> + * @param max_mbufs
> + *   the number of elements in the mbufs array.
> + * @return
> + *   number of mbuf pointers written to mbufs. 0 <= N < max_mbufs.
> + */
> +unsigned int
> +rte_reorder_drain(struct rte_reorder_buffer *b, struct rte_mbuf **mbufs,
> +	unsigned max_mbufs);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_REORDER_H_ */
> diff --git a/mk/rte.app.mk b/mk/rte.app.mk
> index e1a0dbf..2a08acb 100644
> --- a/mk/rte.app.mk
> +++ b/mk/rte.app.mk
> @@ -67,6 +67,10 @@ ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
>  LDLIBS += -lrte_distributor
>  endif
> 
> +ifeq ($(CONFIG_RTE_LIBRTE_REORDER),y)
> +LDLIBS += -lrte_reorder
> +endif
> +
>  ifeq ($(CONFIG_RTE_LIBRTE_KNI),y)
>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
>  LDLIBS += -lrte_kni
> --
> 1.8.3.1



More information about the dev mailing list