[dpdk-dev] [PATCH 03/11] ip_pipeline: modified init to match new params struct

Maciej Gajdzica maciejx.t.gajdzica at intel.com
Fri May 29 17:43:10 CEST 2015


After changes in config parser, app params struct is changed and
requires modifications in initialization procedures.

Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica at intel.com>
---
 examples/ip_pipeline/Makefile |    2 +-
 examples/ip_pipeline/init.c   | 1498 +++++++++++++++++++++++++++++------------
 examples/ip_pipeline/main.c   |    3 +
 3 files changed, 1073 insertions(+), 430 deletions(-)

diff --git a/examples/ip_pipeline/Makefile b/examples/ip_pipeline/Makefile
index c893952..443f7e4 100644
--- a/examples/ip_pipeline/Makefile
+++ b/examples/ip_pipeline/Makefile
@@ -45,7 +45,7 @@ APP = ip_pipeline
 SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) := main.c
 SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_parse.c
 SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c
-#SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c
+SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c
 SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c
 #SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cmdline.c
 #SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_rx.c
diff --git a/examples/ip_pipeline/init.c b/examples/ip_pipeline/init.c
index d79762f..77d5f07 100644
--- a/examples/ip_pipeline/init.c
+++ b/examples/ip_pipeline/init.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -32,561 +32,1201 @@
  */
 
 #include <stdio.h>
-#include <stdlib.h>
-#include <stdint.h>
-#include <inttypes.h>
-#include <sys/types.h>
 #include <string.h>
-#include <sys/queue.h>
-#include <stdarg.h>
-#include <errno.h>
-#include <getopt.h>
-
-#include <rte_common.h>
-#include <rte_byteorder.h>
-#include <rte_log.h>
-#include <rte_memory.h>
-#include <rte_memcpy.h>
-#include <rte_memzone.h>
-#include <rte_eal.h>
-#include <rte_per_lcore.h>
-#include <rte_launch.h>
-#include <rte_atomic.h>
+
 #include <rte_cycles.h>
-#include <rte_prefetch.h>
-#include <rte_lcore.h>
-#include <rte_per_lcore.h>
-#include <rte_branch_prediction.h>
-#include <rte_interrupts.h>
-#include <rte_pci.h>
-#include <rte_random.h>
-#include <rte_debug.h>
-#include <rte_ether.h>
 #include <rte_ethdev.h>
-#include <rte_ring.h>
-#include <rte_mempool.h>
-#include <rte_malloc.h>
-#include <rte_mbuf.h>
-#include <rte_string_fns.h>
+#include <rte_ether.h>
 #include <rte_ip.h>
-#include <rte_tcp.h>
-#include <rte_lpm.h>
-#include <rte_lpm6.h>
-
-#include "main.h"
-
-#define NA                             APP_SWQ_INVALID
-
-struct app_params app = {
-	/* CPU cores */
-	.cores = {
-	{0, APP_CORE_MASTER, {15, 16, 17, NA, NA, NA, NA, NA},
-		{12, 13, 14, NA, NA, NA, NA, NA} },
-	{0, APP_CORE_RX,     {NA, NA, NA, NA, NA, NA, NA, 12},
-		{ 0,  1,  2,  3, NA, NA, NA, 15} },
-	{0, APP_CORE_FC,     { 0,  1,  2,  3, NA, NA, NA, 13},
-		{ 4,  5,  6,  7, NA, NA, NA, 16} },
-	{0, APP_CORE_RT,     { 4,  5,  6,  7, NA, NA, NA, 14},
-		{ 8,  9, 10, 11, NA, NA, NA, 17} },
-	{0, APP_CORE_TX,     { 8,  9, 10, 11, NA, NA, NA, NA},
-		{NA, NA, NA, NA, NA, NA, NA, NA} },
-	},
-
-	/* Ports*/
-	.n_ports = APP_MAX_PORTS,
-	.rsz_hwq_rx = 128,
-	.rsz_hwq_tx = 512,
-	.bsz_hwq_rd = 64,
-	.bsz_hwq_wr = 64,
-
-	.port_conf = {
-		.rxmode = {
-			.split_hdr_size = 0,
-			.header_split   = 0, /* Header Split disabled */
-			.hw_ip_checksum = 1, /* IP checksum offload enabled */
-			.hw_vlan_filter = 0, /* VLAN filtering disabled */
-			.jumbo_frame    = 1, /* Jumbo Frame Support enabled */
-			.max_rx_pkt_len = 9000, /* Jumbo Frame MAC pkt length */
-			.hw_strip_crc   = 0, /* CRC stripped by hardware */
-		},
-		.rx_adv_conf = {
-			.rss_conf = {
-				.rss_key = NULL,
-				.rss_hf = ETH_RSS_IP,
-			},
-		},
-		.txmode = {
-			.mq_mode = ETH_MQ_TX_NONE,
-		},
-	},
-
-	.rx_conf = {
-		.rx_thresh = {
-			.pthresh = 8,
-			.hthresh = 8,
-			.wthresh = 4,
-		},
-		.rx_free_thresh = 64,
-		.rx_drop_en = 0,
-	},
-
-	.tx_conf = {
-		.tx_thresh = {
-			.pthresh = 36,
-			.hthresh = 0,
-			.wthresh = 0,
-		},
-		.tx_free_thresh = 0,
-		.tx_rs_thresh = 0,
-	},
-
-	/* SWQs */
-	.rsz_swq = 128,
-	.bsz_swq_rd = 64,
-	.bsz_swq_wr = 64,
-
-	/* Buffer pool */
-	.pool_buffer_size = RTE_MBUF_DEFAULT_BUF_SIZE,
-	.pool_size = 32 * 1024,
-	.pool_cache_size = 256,
-
-	/* Message buffer pool */
-	.msg_pool_buffer_size = 256,
-	.msg_pool_size = 1024,
-	.msg_pool_cache_size = 64,
-
-	/* Rule tables */
-	.max_arp_rules = 1 << 10,
-	.max_firewall_rules = 1 << 5,
-	.max_routing_rules = 1 << 24,
-	.max_flow_rules = 1 << 24,
-
-	/* Application processing */
-	.ether_hdr_pop_push = 0,
-};
-
-struct app_core_params *
-app_get_core_params(uint32_t core_id)
+#include <rte_eal.h>
+#include <rte_malloc.h>
+
+#include "app.h"
+#include "pipeline.h"
+
+#define APP_NAME_SIZE	32
+
+static void
+app_init_core_map(struct app_params *app)
 {
+	RTE_LOG(INFO, USER1, "Creating CPU core map ...\n");
+	app->core_map = cpu_core_map_init(4, 32, 4, 0);
+
+	if (app->core_map == NULL)
+		rte_panic("Cannot create CPU core map\n");
+
+	cpu_core_map_print(app->core_map);
+}
+
+static void
+app_init_core_mask(struct app_params *app)
+{
+	uint64_t mask = 0;
 	uint32_t i;
 
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
+	for (i = 0; i < APP_MAX_PIPELINES; i++) {
+		struct app_pipeline_params *p = &app->pipeline_params[i];
+		int lcore_id;
 
-		if (p->core_id != core_id)
+		if (!APP_PARAM_VALID(&app->pipeline_params[i]))
 			continue;
 
-		return p;
+		lcore_id = cpu_core_map_get_lcore_id(app->core_map,
+			p->socket_id, p->core_id, p->hyper_th_id);
+
+		if (lcore_id < 0)
+			rte_panic("Cannot create CPU core mask\n");
+
+		mask |= 1LLU << lcore_id;
 	}
 
-	return NULL;
+	app->core_mask = mask;
+	RTE_LOG(INFO, USER1, "CPU core mask = 0x%016lx\n", app->core_mask);
 }
 
-static uint32_t
-app_get_n_swq_in(void)
+static void
+app_init_eal(struct app_params *app)
 {
-	uint32_t max_swq_id = 0, i, j;
+	char buffer[32];
+	int status;
+	app->eal_argc = 0;
 
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
+	app->eal_argv[app->eal_argc++] = strdup(app->app_name);
 
-		if (p->core_type == APP_CORE_NONE)
-			continue;
+	snprintf(buffer, sizeof(buffer), "-c %lx", app->core_mask);
+	app->eal_argv[app->eal_argc++] = strdup(buffer);
 
-		for (j = 0; j < APP_MAX_SWQ_PER_CORE; j++) {
-			uint32_t swq_id = p->swq_in[j];
+	snprintf(buffer, sizeof(buffer), "-n 4");
+	app->eal_argv[app->eal_argc++] = strdup(buffer);
 
-			if ((swq_id != APP_SWQ_INVALID) &&
-				(swq_id > max_swq_id))
-				max_swq_id = swq_id;
-		}
-	}
+	snprintf(buffer, sizeof(buffer), "--");
+	app->eal_argv[app->eal_argc++] = strdup(buffer);
 
-	return (1 + max_swq_id);
+	status = rte_eal_init(app->eal_argc, app->eal_argv);
+	if (status < 0)
+		rte_panic("EAL init error\n");
 }
 
-static uint32_t
-app_get_n_swq_out(void)
+static void
+app_init_mempool(struct app_params *app)
 {
-	uint32_t max_swq_id = 0, i, j;
+	int i;
 
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
+	for (i = 0; i < APP_MAX_MEMPOOLS; i++) {
+		struct app_mempool_params *p = &app->mempool_params[i];
 
-		if (p->core_type == APP_CORE_NONE)
+		if (!APP_PARAM_VALID(p))
 			continue;
 
-		for (j = 0; j < APP_MAX_SWQ_PER_CORE; j++) {
-			uint32_t swq_id = p->swq_out[j];
-
-			if ((swq_id != APP_SWQ_INVALID) &&
-				(swq_id > max_swq_id))
-				max_swq_id = swq_id;
-		}
+		RTE_LOG(INFO, USER1, "Creating %s ...\n", p->name);
+		app->mempool[i] = rte_mempool_create(
+				p->name,
+				p->pool_size,
+				p->buffer_size,
+				p->cache_size,
+				sizeof(struct rte_pktmbuf_pool_private),
+				rte_pktmbuf_pool_init, NULL,
+				rte_pktmbuf_init, NULL,
+				p->cpu_socket_id,
+				0);
+
+		if (app->mempool[i] == NULL)
+			rte_panic("Cannot create mempool\n");
 	}
+}
 
-	return (1 + max_swq_id);
+static inline int
+app_link_filter_arp_add(struct app_link_params *link)
+{
+	struct rte_eth_ethertype_filter filter = {
+			.ether_type = ETHER_TYPE_ARP,
+			.flags = 0,
+			.queue = link->arp_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(link->pmd_id, RTE_ETH_FILTER_ETHERTYPE,
+			RTE_ETH_FILTER_ADD, &filter);
 }
 
-static uint32_t
-app_get_swq_in_count(uint32_t swq_id)
+static inline int
+app_link_filter_arp_del(struct app_link_params *link)
 {
-	uint32_t n, i;
+	struct rte_eth_ethertype_filter filter = {
+			.ether_type = ETHER_TYPE_ARP,
+			.flags = 0,
+			.queue = link->arp_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(link->pmd_id, RTE_ETH_FILTER_ETHERTYPE,
+			RTE_ETH_FILTER_DELETE, &filter);
+}
 
-	for (n = 0, i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
-		uint32_t j;
+static inline int
+app_link_filter_tcp_syn_add(struct app_link_params *link)
+{
+	struct rte_eth_syn_filter filter = {
+			.hig_pri = 1,
+			.queue = link->tcp_syn_local_q,
+	};
 
-		if (p->core_type == APP_CORE_NONE)
-			continue;
+	return rte_eth_dev_filter_ctrl(link->pmd_id, RTE_ETH_FILTER_SYN,
+			RTE_ETH_FILTER_ADD, &filter);
+}
 
-		for (j = 0; j < APP_MAX_SWQ_PER_CORE; j++)
-			if (p->swq_in[j] == swq_id)
-				n++;
-	}
+static inline int
+app_link_filter_tcp_syn_del(struct app_link_params *link)
+{
+	struct rte_eth_syn_filter filter = {
+			.hig_pri = 1,
+			.queue = link->tcp_syn_local_q,
+	};
 
-	return n;
+	return rte_eth_dev_filter_ctrl(link->pmd_id, RTE_ETH_FILTER_SYN,
+			RTE_ETH_FILTER_DELETE, &filter);
 }
 
-static uint32_t
-app_get_swq_out_count(uint32_t swq_id)
+static inline int
+app_link_filter_ip_add(struct app_link_params *l1, struct app_link_params *l2)
 {
-	uint32_t n, i;
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = 0,
+			.proto_mask = 1, /* Disable */
+			.tcp_flags = 0,
+			.priority = 1, /* Lowest */
+			.queue = l1->ip_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_ADD, &filter);
+}
 
-	for (n = 0, i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
-		uint32_t j;
+static inline int
+app_link_filter_ip_del(struct app_link_params *l1, struct app_link_params *l2)
+{
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = 0,
+			.proto_mask = 1, /* Disable */
+			.tcp_flags = 0,
+			.priority = 1, /* Lowest */
+			.queue = l1->ip_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_DELETE, &filter);
+}
 
-		if (p->core_type == APP_CORE_NONE)
-			continue;
+static inline int
+app_link_filter_tcp_add(struct app_link_params *l1, struct app_link_params *l2)
+{
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = IPPROTO_TCP,
+			.proto_mask = 0, /* Enable */
+			.tcp_flags = 0,
+			.priority = 2, /* Higher priority than IP */
+			.queue = l1->tcp_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_ADD, &filter);
+}
 
-		for (j = 0; j < APP_MAX_SWQ_PER_CORE; j++)
-			if (p->swq_out[j] == swq_id)
-				n++;
-	}
+static inline int
+app_link_filter_tcp_del(struct app_link_params *l1, struct app_link_params *l2)
+{
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = IPPROTO_TCP,
+			.proto_mask = 0, /* Enable */
+			.tcp_flags = 0,
+			.priority = 2, /* Higher priority than IP */
+			.queue = l1->tcp_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_DELETE, &filter);
+}
 
-	return n;
+static inline int
+app_link_filter_udp_add(struct app_link_params *l1, struct app_link_params *l2)
+{
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = IPPROTO_UDP,
+			.proto_mask = 0, /* Enable */
+			.tcp_flags = 0,
+			.priority = 2, /* Higher priority than IP */
+			.queue = l1->udp_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_ADD, &filter);
+}
+
+static inline int
+app_link_filter_udp_del(struct app_link_params *l1, struct app_link_params *l2)
+{
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = IPPROTO_UDP,
+			.proto_mask = 0, /* Enable */
+			.tcp_flags = 0,
+			.priority = 2, /* Higher priority than IP */
+			.queue = l1->udp_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_DELETE, &filter);
+}
+
+static inline int
+app_link_filter_sctp_add(struct app_link_params *l1, struct app_link_params *l2)
+{
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = IPPROTO_SCTP,
+			.proto_mask = 0, /* Enable */
+			.tcp_flags = 0,
+			.priority = 2, /* Higher priority than IP */
+			.queue = l1->sctp_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_ADD, &filter);
+}
+
+static inline int
+app_link_filter_sctp_del(struct app_link_params *l1, struct app_link_params *l2)
+{
+	struct rte_eth_ntuple_filter filter = {
+			.flags = RTE_5TUPLE_FLAGS,
+			.dst_ip = rte_bswap32(l2->ip),
+			.dst_ip_mask = 0, /* Enable */
+			.src_ip = 0,
+			.src_ip_mask = 1, /* Disable */
+			.dst_port = 0,
+			.dst_port_mask = 1, /* Disable */
+			.src_port = 0,
+			.src_port_mask = 1, /* Disable */
+			.proto = IPPROTO_SCTP,
+			.proto_mask = 0, /* Enable */
+			.tcp_flags = 0,
+			.priority = 2, /* Higher priority than IP */
+			.queue = l1->sctp_local_q,
+	};
+
+	return rte_eth_dev_filter_ctrl(l1->pmd_id, RTE_ETH_FILTER_NTUPLE,
+			RTE_ETH_FILTER_DELETE, &filter);
 }
 
 void
-app_check_core_params(void)
+app_link_up_internal(struct app_params *app, struct app_link_params *cp)
 {
-	uint32_t n_swq_in = app_get_n_swq_in();
-	uint32_t n_swq_out = app_get_n_swq_out();
-	uint32_t i;
+	int status;
+	uint32_t link_id, i;
+
+	sscanf(cp->name, "LINK%u", &link_id);
+
+	/* Mark port as UP */
+	cp->state = 1;
 
-	/* Check that range of SW queues is contiguous and each SW queue has
-	   exactly one reader and one writer */
-	if (n_swq_in != n_swq_out)
-		rte_panic("Number of input SW queues is not equal to the "
-			"number of output SW queues\n");
+	/* Start port */
+	status = rte_eth_dev_start(cp->pmd_id);
+	if (status < 0)
+		rte_panic("Cannot start LINK%u (%u) (error %d)\n",
+			link_id, cp->pmd_id, status);
 
-	for (i = 0; i < n_swq_in; i++) {
-		uint32_t n = app_get_swq_in_count(i);
+	/* Apply ARP filter for current port */
+	if (cp->arp_q != 0) {
+		status = app_link_filter_arp_add(cp);
 
-		if (n == 0)
-			rte_panic("SW queue %u has no reader\n", i);
+		RTE_LOG(INFO, USER1, "LINK%u (%u): Adding ARP filter "
+			"(queue = %u)\n",
+			link_id, cp->pmd_id, cp->arp_q);
 
-		if (n > 1)
-			rte_panic("SW queue %u has more than one reader\n", i);
+		if (status)
+			rte_panic("LINK%u (%u): Error while adding ARP filter "
+				"(queue = %u) (%d)\n",
+				link_id, cp->pmd_id, cp->arp_q, status);
 	}
 
-	for (i = 0; i < n_swq_out; i++) {
-		uint32_t n = app_get_swq_out_count(i);
+	/* Apply TCP SYN filter for current port */
+	if (cp->tcp_syn_local_q != 0) {
+		status = app_link_filter_tcp_syn_add(cp);
 
-		if (n == 0)
-			rte_panic("SW queue %u has no writer\n", i);
+		RTE_LOG(INFO, USER1, "Port %u (%u): Adding TCP SYN filter "
+			"(queue = %u)\n",
+			link_id, cp->pmd_id, cp->tcp_syn_local_q);
 
-		if (n > 1)
-			rte_panic("SW queue %u has more than one writer\n", i);
+		if (status)
+			rte_panic("Port %u (%u): Error while adding TCP SYN filter "
+				"(queue = %u) (%d)\n",
+				link_id, cp->pmd_id, cp->tcp_syn_local_q, status);
 	}
 
-	/* Check the request and response queues are valid */
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
-		uint32_t ring_id_req, ring_id_resp;
+	/* For each port that is currently UP, add filters for current port */
+	if (cp->ip != 0) {
+		for (i = 0; i < APP_MAX_LINKS; i++) {
+			struct app_link_params *p = &app->link_params[i];
+
+			if (!APP_PARAM_VALID(p))
+				continue;
+
+			/* Skip ports that are DOWN */
+			if (p->state == 0) {
+				continue;
+			}
+
+			/* IP */
+			if (p->ip_local_q != 0) {
+				status = app_link_filter_ip_add(p, cp);
+
+				RTE_LOG(INFO, USER1, "LINK%u (%u): Adding IP filter "
+					"(queue= %u, IP = 0x%08x)\n",
+					i, p->pmd_id, p->ip_local_q, cp->ip);
+
+				if (status)
+					rte_panic("LINK%u (%u): Error while adding IP filter "
+						"(queue= %u, IP = 0x%08x) (%d)\n",
+						i, p->pmd_id, p->ip_local_q, cp->ip, status);
+			}
+
+			/* TCP */
+			if (p->tcp_local_q != 0) {
+				status = app_link_filter_tcp_add(p, cp);
+
+				RTE_LOG(INFO, USER1, "LINK%u (%u): Adding TCP filter "
+					"(queue = %u, IP = 0x%08x)\n",
+					i, p->pmd_id, p->tcp_local_q, cp->ip);
+
+				if (status)
+					rte_panic("LINK%u (%u): Error while adding TCP filter "
+						"(queue = %u, IP = 0x%08x) (%d)\n",
+						i, p->pmd_id, p->tcp_local_q, cp->ip, status);
+			}
+
+			/* UDP */
+			if (p->udp_local_q != 0) {
+				status = app_link_filter_udp_add(p, cp);
+
+				RTE_LOG(INFO, USER1, "LINK%u (%u): Adding UDP filter "
+					"(queue = %u, IP = 0x%08x)\n",
+					i, p->pmd_id, p->udp_local_q, cp->ip);
+
+				if (status)
+					rte_panic("LINK%u (%u): Error while adding UDP filter "
+						"(queue = %u, IP = 0x%08x) (%d)\n",
+						i, p->pmd_id, p->udp_local_q, cp->ip, status);
+			}
+
+			/* SCTP */
+			if (p->sctp_local_q != 0) {
+				status = app_link_filter_sctp_add(p, cp);
+
+				RTE_LOG(INFO, USER1, "LINK%u (%u): Adding SCTP filter "
+					"(queue = %u, IP = 0x%08x)\n",
+					i, p->pmd_id, p->sctp_local_q, cp->ip);
+
+				if (status)
+					rte_panic("LINK%u (%u): Error while adding SCTP filter "
+						"(queue = %u, IP = 0x%08x) (%d)\n",
+						i, p->pmd_id, p->sctp_local_q, cp->ip, status);
+			}
+		}
+	}
+
+	/* For current port, add filters for each port that is UP */
+	for (i = 0; i < APP_MAX_LINKS; i++) {
+		struct app_link_params *p = &app->link_params[i];
+		int status;
+
+		if (!APP_PARAM_VALID(p))
+			continue;
 
-		if ((p->core_type != APP_CORE_FC) &&
-		    (p->core_type != APP_CORE_FW) &&
-			(p->core_type != APP_CORE_RT)) {
+		/* Skip current port and ports that are DOWN */
+		if ((i == link_id) || (p->state == 0) || (p->ip == 0)) {
 			continue;
 		}
 
-		ring_id_req = p->swq_in[APP_SWQ_IN_REQ];
-		if (ring_id_req == APP_SWQ_INVALID)
-			rte_panic("Core %u of type %u has invalid request "
-				"queue ID\n", p->core_id, p->core_type);
+		/* IP */
+		if (cp->ip_local_q != 0) {
+			status = app_link_filter_ip_add(cp, p);
 
-		ring_id_resp = p->swq_out[APP_SWQ_OUT_RESP];
-		if (ring_id_resp == APP_SWQ_INVALID)
-			rte_panic("Core %u of type %u has invalid response "
-				"queue ID\n", p->core_id, p->core_type);
-	}
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Adding IP filter "
+				"(queue= %u, IP = 0x%08x)\n",
+				link_id, cp->pmd_id, cp->ip_local_q, p->ip);
 
-	return;
-}
+			if (status)
+				rte_panic("LINK%u (%u): Error while adding IP filter "
+					"(queue= %u, IP = 0x%08x) (%d)\n",
+					link_id, cp->pmd_id, cp->ip_local_q, p->ip, status);
+		}
 
-uint32_t
-app_get_first_core_id(enum app_core_type core_type)
-{
-	uint32_t i;
+		/* TCP */
+		if (cp->tcp_local_q != 0) {
+			status = app_link_filter_tcp_add(cp, p);
 
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Adding TCP filter "
+				"(queue = %u, IP = 0x%08x)\n",
+				link_id, cp->pmd_id, cp->tcp_local_q, p->ip);
 
-		if (p->core_type == core_type)
-			return p->core_id;
-	}
+			if (status)
+				rte_panic("LINK%u (%u): Error while adding TCP filter "
+					"(queue = %u, IP = 0x%08x) (%d)\n",
+					link_id, cp->pmd_id, cp->tcp_local_q, p->ip, status);
+		}
 
-	return RTE_MAX_LCORE;
-}
+		/* UDP */
+		if (cp->udp_local_q != 0) {
+			status = app_link_filter_udp_add(cp, p);
 
-struct rte_ring *
-app_get_ring_req(uint32_t core_id)
-{
-	struct app_core_params *p = app_get_core_params(core_id);
-	uint32_t ring_req_id = p->swq_in[APP_SWQ_IN_REQ];
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Adding UDP filter "
+				"(queue = %u, IP = 0x%08x)\n",
+				link_id, cp->pmd_id, cp->udp_local_q, p->ip);
+
+			if (status)
+				rte_panic("LINK%u (%u): Error while adding UDP filter "
+					"(queue = %u, IP = 0x%08x) (%d)\n",
+					link_id, cp->pmd_id, cp->udp_local_q, p->ip, status);
+		}
+
+		/* SCTP */
+		if (cp->sctp_local_q != 0) {
+			status = app_link_filter_sctp_add(cp, p);
 
-	return app.rings[ring_req_id];
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Adding SCTP filter "
+				"(queue = %u, IP = 0x%08x)\n",
+				link_id, cp->pmd_id, cp->sctp_local_q, p->ip);
+
+			if (status)
+				rte_panic("LINK%u (%u): Error while adding SCTP filter "
+					"(queue = %u, IP = 0x%08x) (%d)\n",
+					link_id, cp->pmd_id, cp->sctp_local_q, p->ip, status);
+		}
+	}
 }
 
-struct rte_ring *
-app_get_ring_resp(uint32_t core_id)
+void
+app_link_down_internal(struct app_params *app, struct app_link_params *cp)
 {
-	struct app_core_params *p = app_get_core_params(core_id);
-	uint32_t ring_resp_id = p->swq_out[APP_SWQ_OUT_RESP];
+	uint32_t link_id, i;
+	int status;
+
+	sscanf(cp->name, "LINK%u", &link_id);
+
+	/* Mark link_id as DOWN */
+	cp->state = 0;
+
+	/* Stop link_id */
+	rte_eth_dev_stop(cp->pmd_id);
+
+	/* Return if IP of current link_id is not valid */
+	if (cp->ip == 0)
+		return;
+
+	/* For each link_id that is currently UP, remove filters for current port */
+	for (i = 0; i < APP_MAX_LINKS; i ++) {
+		struct app_link_params *p = &app->link_params[i];
+
+		if (!APP_PARAM_VALID(p))
+			continue;
+
+		/* Skip ports that are down */
+		if (p->state == 0) {
+			continue;
+		}
+
+		/* IP */
+		if (p->ip_local_q != 0) {
+			status = app_link_filter_ip_del(p, cp);
+
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Deleting IP filter "
+				"(queue = %u)\n",
+				i, p->pmd_id, p->ip_local_q);
+
+			if (status)
+				rte_panic("LINK%u (%u): Error while deleting IP filter "
+					"(queue = %u) (%d)\n",
+					i, p->pmd_id, p->ip_local_q, status);
+		}
+
+		/* TCP */
+		if (p->tcp_local_q != 0) {
+			status = app_link_filter_tcp_del(p, cp);
+
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Deleting TCP filter "
+				"(queue = %u)\n",
+				i, p->pmd_id, p->tcp_local_q);
+
+			if (status)
+				rte_panic("LINK%u (%u): Error while deleting TCP filter "
+					"(queue = %u) (%d)\n",
+					i, p->pmd_id, p->tcp_local_q, status);
+		}
+
+		/* UDP */
+		if (p->udp_local_q != 0) {
+			status = app_link_filter_udp_del(p, cp);
+
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Deleting UDP filter "
+				"(queue = %u)\n",
+				i, p->pmd_id, p->udp_local_q);
 
-	return app.rings[ring_resp_id];
+			if (status)
+				rte_panic("LINK%u (%u): Error while deleting UDP filter "
+					"(queue = %u) (%d)\n",
+					i, p->pmd_id, p->udp_local_q, status);
+		}
+
+		/* SCTP */
+		if (p->sctp_local_q != 0) {
+			status = app_link_filter_sctp_del(p, cp);
+
+			RTE_LOG(INFO, USER1, "LINK%u (%u): Deleting SCTP filter "
+				"(queue = %u)\n",
+				i, p->pmd_id, p->sctp_local_q);
+
+			if (status)
+				rte_panic("LINK%u (%u): Error while deleting SCTP filter "
+					"(queue = %u) (%d)\n",
+					i, p->pmd_id, p->sctp_local_q, status);
+		}
+	}
 }
 
 static void
-app_init_mbuf_pools(void)
+app_check_link(struct app_params *app)
 {
-	/* Init the buffer pool */
-	RTE_LOG(INFO, USER1, "Creating the mbuf pool ...\n");
-	app.pool = rte_pktmbuf_pool_create("mempool", app.pool_size,
-		app.pool_cache_size, 0, app.pool_buffer_size, rte_socket_id());
-	if (app.pool == NULL)
-		rte_panic("Cannot create mbuf pool\n");
-
-	/* Init the indirect buffer pool */
-	RTE_LOG(INFO, USER1, "Creating the indirect mbuf pool ...\n");
-	app.indirect_pool = rte_pktmbuf_pool_create("indirect mempool",
-		app.pool_size, app.pool_cache_size,
-		sizeof(struct app_pkt_metadata), 0, rte_socket_id());
-	if (app.indirect_pool == NULL)
-		rte_panic("Cannot create mbuf pool\n");
-
-	/* Init the message buffer pool */
-	RTE_LOG(INFO, USER1, "Creating the message pool ...\n");
-	app.msg_pool = rte_mempool_create(
-		"mempool msg",
-		app.msg_pool_size,
-		app.msg_pool_buffer_size,
-		app.msg_pool_cache_size,
-		0,
-		NULL, NULL,
-		rte_ctrlmbuf_init, NULL,
-		rte_socket_id(),
-		0);
-	if (app.msg_pool == NULL)
-		rte_panic("Cannot create message pool\n");
+	uint32_t all_links_up, i;
+
+	all_links_up = 1;
+
+	for (i = 0; i < APP_MAX_LINKS; i++) {
+		struct rte_eth_link link_params;
+		uint32_t link;
+
+		if (!APP_PARAM_VALID(&app->link_params[i]))
+			continue;
+
+		link = app->link_params[i].pmd_id;
+		memset(&link_params, 0, sizeof(link_params));
+		rte_eth_link_get(link, &link_params);
+		RTE_LOG(INFO, USER1, "LINK%u (%u) (%u Gbps) %s\n",
+			i,
+			link,
+			link_params.link_speed / 1000,
+			link_params.link_status ? "UP" : "DOWN");
+
+		if (link_params.link_status == 0)
+			all_links_up = 0;
+	}
+
+	if (all_links_up == 0)
+		rte_panic("Some NIC ports are DOWN\n");
 }
 
 static void
-app_init_rings(void)
+app_init_link(struct app_params *app)
 {
-	uint32_t n_swq, i;
+	uint32_t i;
 
-	n_swq = app_get_n_swq_in();
-	RTE_LOG(INFO, USER1, "Initializing %u SW rings ...\n", n_swq);
+	for (i = 0; i < APP_MAX_LINKS; i++) {
+		struct app_link_params *p_link = &app->link_params[i];
+		uint32_t link_id, n_hwq_in ,n_hwq_out;
+		int ret, j;
 
-	app.rings = rte_malloc_socket(NULL, n_swq * sizeof(struct rte_ring *),
-		RTE_CACHE_LINE_SIZE, rte_socket_id());
-	if (app.rings == NULL)
-		rte_panic("Cannot allocate memory to store ring pointers\n");
+		if (!APP_PARAM_VALID(p_link))
+			continue;
 
-	for (i = 0; i < n_swq; i++) {
-		struct rte_ring *ring;
-		char name[32];
+		sscanf(p_link->name, "LINK%u", &link_id);
+		n_hwq_in = app_n_hwq_in_get(app, link_id);
+		n_hwq_out = app_n_hwq_out_get(app, link_id);
 
-		snprintf(name, sizeof(name), "app_ring_%u", i);
+		RTE_LOG(INFO, USER1, "Initializing %s (%u) "
+			"(%u RXQ, %u TXQ) ...\n",
+			p_link->name,
+			p_link->pmd_id,
+			n_hwq_in,
+			n_hwq_out);
 
-		ring = rte_ring_create(
-			name,
-			app.rsz_swq,
-			rte_socket_id(),
-			RING_F_SP_ENQ | RING_F_SC_DEQ);
+		/* LINK */
+		ret = rte_eth_dev_configure(
+			p_link->pmd_id,
+			n_hwq_in,
+			n_hwq_out,
+			&p_link->conf);
+		if (ret < 0)
+			rte_panic("%s (%d): init error (%d)\n",
+				p_link->name, p_link->pmd_id, ret);
+
+		rte_eth_macaddr_get(p_link->pmd_id,
+			(struct ether_addr *) &p_link->mac_addr);
+
+		if (p_link->promisc)
+			rte_eth_promiscuous_enable(p_link->pmd_id);
+
+		/* RXQ */
+		for (j = 0; j < APP_MAX_HWQ_IN; j++) {
+			struct app_pktq_hwq_in_params *p_rxq = &app->hwq_in_params[j];
+			uint32_t rxq_link_id, rxq_queue_id;
+
+			if (!APP_PARAM_VALID(p_rxq))
+				continue;
+
+			sscanf(p_rxq->name, "RXQ%u.%u",
+				&rxq_link_id, &rxq_queue_id);
+			if (rxq_link_id != link_id)
+				continue;
+
+			ret = rte_eth_rx_queue_setup(
+				p_link->pmd_id,
+				rxq_queue_id,
+				p_rxq->size,
+				rte_eth_dev_socket_id(p_link->pmd_id),
+				&p_rxq->conf,
+				app->mempool[p_rxq->mempool_id]);
+			if (ret < 0)
+				rte_panic("%s (%u): %s init error (%d)\n",
+					p_link->name,
+					p_link->pmd_id,
+					p_rxq->name,
+					ret);
+		}
 
-		if (ring == NULL)
-			rte_panic("Cannot create ring %u\n", i);
+		/* TXQ */
+		for (j = 0; j < APP_MAX_HWQ_OUT; j++) {
+			struct app_pktq_hwq_out_params *p_txq = &app->hwq_out_params[j];
+			uint32_t txq_link_id, txq_queue_id;
+
+			if (!APP_PARAM_VALID(p_txq))
+				continue;
+
+			sscanf(p_txq->name, "TXQ%u.%u",
+				&txq_link_id, &txq_queue_id);
+			if (txq_link_id != link_id)
+				continue;
+
+			ret = rte_eth_tx_queue_setup(
+				p_link->pmd_id,
+				txq_queue_id,
+				p_txq->size,
+				rte_eth_dev_socket_id(p_link->pmd_id),
+				&p_txq->conf);
+			if (ret < 0)
+				rte_panic("%s (%u): %s init error (%d)\n",
+					p_link->name,
+					p_link->pmd_id,
+					p_txq->name,
+					ret);
+		}
 
-		app.rings[i] = ring;
+		/* LINK UP */
+		app_link_up_internal(app, p_link);
 	}
+
+	app_check_link(app);
 }
 
 static void
-app_ports_check_link(void)
+app_init_swq(struct app_params *app)
 {
-	uint32_t all_ports_up, i;
+	int i;
 
-	all_ports_up = 1;
+	for (i = 0; i < APP_MAX_PKTQ_SWQ; i++) {
+		struct app_pktq_swq_params *p = &app->swq_params[i];
 
-	for (i = 0; i < app.n_ports; i++) {
-		struct rte_eth_link link;
-		uint32_t port;
+		if (!APP_PARAM_VALID(p))
+			continue;
 
-		port = app.ports[i];
-		memset(&link, 0, sizeof(link));
-		rte_eth_link_get_nowait(port, &link);
-		RTE_LOG(INFO, USER1, "Port %u (%u Gbps) %s\n",
-			port,
-			link.link_speed / 1000,
-			link.link_status ? "UP" : "DOWN");
+		RTE_LOG(INFO, USER1, "Initializing %s...\n", p->name);
+		app->swq[i] = rte_ring_create(
+				p->name,
+				p->size,
+				p->cpu_socket_id,
+				RING_F_SP_ENQ | RING_F_SC_DEQ);
 
-		if (link.link_status == 0)
-			all_ports_up = 0;
+		if (app->swq[i] == NULL)
+			rte_panic("%s init error\n", p->name);
 	}
+}
 
-	if (all_ports_up == 0)
-		rte_panic("Some NIC ports are DOWN\n");
+static void
+app_init_tm(struct app_params *app)
+{
+	uint32_t i;
+
+	for (i = 0; i < APP_MAX_PKTQ_TM; i++) {
+		struct app_pktq_tm_params *p_tm = &app->tm_params[i];
+		struct app_link_params *p_link;
+		struct rte_eth_link link_eth_params;
+		struct rte_sched_port *sched;
+		uint32_t n_subports, subport_id;
+		int status;
+
+		if (!APP_PARAM_VALID(p_tm))
+			continue;
+
+		p_link = app_get_link_for_tm(app, p_tm);
+		/* LINK */
+		rte_eth_link_get(p_link->pmd_id, &link_eth_params);
+
+		/* TM */
+		p_tm->sched_port_params.name = p_tm->name;
+		p_tm->sched_port_params.socket = rte_eth_dev_socket_id(p_link->pmd_id);
+		p_tm->sched_port_params.rate =
+			(uint64_t) link_eth_params.link_speed * 1000 * 1000 / 8;
+		p_tm->sched_port_params.mtu = ETHER_MTU;
+
+		sched = rte_sched_port_config(&p_tm->sched_port_params);
+		if (sched == NULL)
+			rte_panic("%s init error\n", p_tm->name);
+		app->tm[i] = sched;
+
+		/* Subport */
+		n_subports = p_tm->sched_port_params.n_subports_per_port;
+		for (subport_id = 0; subport_id < n_subports; subport_id ++) {
+			uint32_t n_pipes_per_subport, pipe_id;
+
+			status = rte_sched_subport_config(sched,
+				subport_id,
+				&p_tm->sched_subport_params[subport_id]);
+			if (status)
+				rte_panic("%s subport %u init error (%d)\n",
+					p_tm->name, subport_id, status);
+
+			/* Pipe */
+			n_pipes_per_subport =
+					p_tm->sched_port_params.n_pipes_per_subport;
+			for (pipe_id = 0; pipe_id < n_pipes_per_subport; pipe_id++) {
+				int profile_id = p_tm->sched_pipe_to_profile[subport_id
+								* APP_MAX_SCHED_PIPES + pipe_id];
+
+				if (profile_id == -1)
+					continue;
+
+				status = rte_sched_pipe_config(sched, subport_id,
+					pipe_id, profile_id);
+				if (status)
+					rte_panic("%s subport %u pipe %u "
+						"(profile %d) init error (%d)\n",
+						p_tm->name, subport_id, pipe_id, profile_id, status);
+			}
+		}
+	}
 }
 
 static void
-app_init_ports(void)
+app_init_msgq(struct app_params *app)
+{
+	int i;
+
+	for (i = 0; i < APP_MAX_MSGQ; i++) {
+		struct app_msgq_params *p = &app->msgq_params[i];
+
+		if (!APP_PARAM_VALID(p))
+			continue;
+
+		RTE_LOG(INFO, USER1, "Creating %s ...\n", p->name);
+		app->msgq[i] = rte_ring_create(
+				p->name,
+				p->size,
+				p->cpu_socket_id,
+				RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+		if (app->msgq[i] == NULL)
+			rte_panic("%s init error\n", p->name);
+	}
+}
+
+static void app_pipeline_params_get(struct app_params *app,
+	struct app_pipeline_params *p_in,
+	struct pipeline_params *p_out)
 {
 	uint32_t i;
 
-	/* Init NIC ports, then start the ports */
-	for (i = 0; i < app.n_ports; i++) {
-		uint32_t port;
-		int ret;
+	strcpy(p_out->name, p_in->name);
 
-		port = app.ports[i];
-		RTE_LOG(INFO, USER1, "Initializing NIC port %u ...\n", port);
+	p_out->socket_id = (int) p_in->socket_id;
 
-		/* Init port */
-		ret = rte_eth_dev_configure(
-			port,
-			1,
-			1,
-			&app.port_conf);
-		if (ret < 0)
-			rte_panic("Cannot init NIC port %u (%d)\n", port, ret);
-		rte_eth_promiscuous_enable(port);
-
-		/* Init RX queues */
-		ret = rte_eth_rx_queue_setup(
-			port,
-			0,
-			app.rsz_hwq_rx,
-			rte_eth_dev_socket_id(port),
-			&app.rx_conf,
-			app.pool);
-		if (ret < 0)
-			rte_panic("Cannot init RX for port %u (%d)\n",
-				(uint32_t) port, ret);
-
-		/* Init TX queues */
-		ret = rte_eth_tx_queue_setup(
-			port,
-			0,
-			app.rsz_hwq_tx,
-			rte_eth_dev_socket_id(port),
-			&app.tx_conf);
-		if (ret < 0)
-			rte_panic("Cannot init TX for port %u (%d)\n", port,
-				ret);
+	/* pktq_in */
+	p_out->n_ports_in = p_in->n_pktq_in;
+	for (i = 0; i < p_in->n_pktq_in; i++) {
+		struct app_pktq_in_params *in = &p_in->pktq_in[i];
+		struct pipeline_port_in_params *out = &p_out->port_in[i];
 
-		/* Start port */
-		ret = rte_eth_dev_start(port);
-		if (ret < 0)
-			rte_panic("Cannot start port %u (%d)\n", port, ret);
+		switch (in->type) {
+		case APP_PKTQ_IN_HWQ:
+		{
+			struct app_pktq_hwq_in_params *p_hwq_in = &app->hwq_in_params[in->id];
+			struct app_link_params *p_link = app_get_link_for_rxq(app, p_hwq_in);
+			uint32_t rxq_link_id, rxq_queue_id;
+
+			sscanf(p_hwq_in->name, "RXQ%u.%u", &rxq_link_id, &rxq_queue_id);
+
+			out->type = PIPELINE_PORT_IN_ETHDEV_READER;
+			out->params.ethdev.port_id = p_link->pmd_id;
+			out->params.ethdev.queue_id = rxq_queue_id;
+			out->burst_size = p_hwq_in->burst;
+			break;
+		}
+		case APP_PKTQ_IN_SWQ:
+			out->type = PIPELINE_PORT_IN_RING_READER;
+			out->params.ring.ring = app->swq[in->id];
+			out->burst_size = app->swq_params[in->id].burst_read;
+			/* What about frag and ras ports? */
+			break;
+		case APP_PKTQ_IN_TM:
+			out->type = PIPELINE_PORT_IN_SCHED_READER;
+			out->params.sched.sched = app->tm[in->id];
+			out->burst_size = app->tm_params[in->id].burst_read;
+			break;
+		case APP_PKTQ_IN_SOURCE:
+			out->type = PIPELINE_PORT_IN_SOURCE;
+			out->params.source.mempool = app->mempool[in->id];
+			out->burst_size = app->source_params[in->id].burst;
+			break;
+		default:
+			break;
+		}
 	}
 
-	app_ports_check_link();
-}
+	/* pktq_out */
+	p_out->n_ports_out = p_in->n_pktq_out;
+	for (i = 0; i < p_in->n_pktq_out; i++) {
+		struct app_pktq_out_params *in = &p_in->pktq_out[i];
+		struct pipeline_port_out_params *out = &p_out->port_out[i];
+
+		switch (in->type) {
+		case APP_PKTQ_OUT_HWQ:
+		{
+			struct app_pktq_hwq_out_params *p_hwq_out = &app->hwq_out_params[in->id];
+			struct app_link_params *p_link = app_get_link_for_txq(app, p_hwq_out);
+			uint32_t txq_link_id, txq_queue_id;
+
+			sscanf(p_hwq_out->name, "TXQ%u.%u", &txq_link_id, &txq_queue_id);
+
+			if (p_hwq_out->dropless == 0) {
+				struct rte_port_ethdev_writer_params *params = &out->params.ethdev;
+
+				out->type = PIPELINE_PORT_OUT_ETHDEV_WRITER;
+				params->port_id = p_link->pmd_id;
+				params->queue_id = txq_queue_id;
+				params->tx_burst_sz = app->hwq_out_params[in->id].burst;
+			}
+			else {
+				struct rte_port_ethdev_writer_nodrop_params *params =
+						&out->params.ethdev_nodrop;
+
+				out->type = PIPELINE_PORT_OUT_ETHDEV_WRITER_NODROP;
+				params->port_id = p_link->pmd_id;
+				params->queue_id = txq_queue_id;
+				params->tx_burst_sz = p_hwq_out->burst;
+				params->n_retries = p_hwq_out->n_retries;
+			}
+			break;
+		}
+		case APP_PKTQ_OUT_SWQ:
+			if (app->swq_params[in->id].dropless == 0) {
+				struct rte_port_ring_writer_params *params = &out->params.ring;
+
+				out->type = PIPELINE_PORT_OUT_RING_WRITER;
+				params->ring = app->swq[in->id];
+				params->tx_burst_sz = app->swq_params[in->id].burst_write;
+			}
+			else {
+				struct rte_port_ring_writer_nodrop_params *params =
+						&out->params.ring_nodrop;
+
+				out->type = PIPELINE_PORT_OUT_RING_WRITER_NODROP;
+				params->ring = app->swq[in->id];
+				params->tx_burst_sz = app->swq_params[in->id].burst_write;
+				params->n_retries = app->swq_params[in->id].n_retries;
+			}
+			/* What about frag and ras ports? */
+			break;
+		case APP_PKTQ_OUT_TM: {
+			struct rte_port_sched_writer_params *params = &out->params.sched;
+
+			out->type = PIPELINE_PORT_OUT_SCHED_WRITER;
+			params->sched = app->tm[in->id];
+			params->tx_burst_sz = app->tm_params[in->id].burst_write;
+			break;
+		}
+		case APP_PKTQ_OUT_SINK:
+			out->type = PIPELINE_PORT_OUT_SINK;
+			break;
+		default:
+			break;
+		}
+	}
 
-#define APP_PING_TIMEOUT_SEC                               5
+	/* msgq */
+	p_out->n_msgq = p_in->n_msgq_in;
 
-void
-app_ping(void)
-{
-	unsigned i;
-	uint64_t timestamp, diff_tsc;
+	for (i = 0; i < p_in->n_msgq_in; i++)
+		p_out->msgq_in[i] = app->msgq[p_in->msgq_in[i]];
 
-	const uint64_t timeout = rte_get_tsc_hz() * APP_PING_TIMEOUT_SEC;
+	for (i = 0; i < p_in->n_msgq_out; i++)
+		p_out->msgq_out[i] = app->msgq[p_in->msgq_out[i]];
 
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		struct app_core_params *p = &app.cores[i];
-		struct rte_ring *ring_req, *ring_resp;
-		void *msg;
-		struct app_msg_req *req;
-		int status;
+	/* args */
+	p_out->n_args = p_in->n_args;
+	for (i = 0; i < p_in->n_args; i++){
+		p_out->args_name[i] = p_in->args_name[i];
+		p_out->args_value[i] = p_in->args_value[i];
+	}
+}
 
-		if ((p->core_type != APP_CORE_FC) &&
-		    (p->core_type != APP_CORE_FW) &&
-			(p->core_type != APP_CORE_RT) &&
-			(p->core_type != APP_CORE_RX))
+static void
+app_init_pipelines(struct app_params *app)
+{
+	uint32_t p_id;
+
+	for (p_id = 0; p_id < APP_MAX_PIPELINES; p_id++) {
+		struct app_pipeline_params *params = &app->pipeline_params[p_id];
+		struct app_pipeline_data *data = &app->pipeline_data[p_id];
+		struct pipeline_type *ptype;
+		struct pipeline_params pp;
+
+		if (!APP_PARAM_VALID(params))
 			continue;
 
-		ring_req = app_get_ring_req(p->core_id);
-		ring_resp = app_get_ring_resp(p->core_id);
-
-		/* Fill request message */
-		msg = (void *)rte_ctrlmbuf_alloc(app.msg_pool);
-		if (msg == NULL)
-			rte_panic("Unable to allocate new message\n");
-
-		req = (struct app_msg_req *)
-				rte_ctrlmbuf_data((struct rte_mbuf *)msg);
-		req->type = APP_MSG_REQ_PING;
-
-		/* Send request */
-		do {
-			status = rte_ring_sp_enqueue(ring_req, msg);
-		} while (status == -ENOBUFS);
-
-		/* Wait for response */
-		timestamp = rte_rdtsc();
-		do {
-			status = rte_ring_sc_dequeue(ring_resp, &msg);
-			diff_tsc = rte_rdtsc() - timestamp;
-
-			if (unlikely(diff_tsc > timeout))
-				rte_panic("Core %u of type %d does not respond "
-					"to requests\n", p->core_id,
-					p->core_type);
-		} while (status != 0);
-
-		/* Free message buffer */
-		rte_ctrlmbuf_free(msg);
+		RTE_LOG(INFO, USER1, "Initializing %s ...\n", params->name);
+
+		ptype = app_pipeline_type_find(app, params->type);
+		if (ptype == NULL)
+			rte_panic("Init error: Unknown pipeline type \"%s\"\n", params->type);
+
+		app_pipeline_params_get(app, params, &pp);
+
+		/* Back-end */
+		data->be = NULL;
+		if (ptype->ops->f_init) {
+			data->be = ptype->ops->f_init(&pp, (void *) app);
+
+			if (data->be == NULL)
+				rte_panic("Pipeline instance \"%s\" back-end init error\n", params->name);
+		}
+
+		/* Front-end */
+		data->fe = NULL;
+		if (ptype->fe_ops->f_init) {
+			data->fe = ptype->fe_ops->f_init(&pp, (void *) app);
+
+			if (data->fe == NULL)
+				rte_panic("Pipeline instance \"%s\" front-end init error\n", params->name);
+		}
+
+		data->timer_period = (rte_get_tsc_hz() * params->timer_period) / 1000;
 	}
 }
 
 static void
-app_init_etc(void)
+app_init_threads(struct app_params *app)
 {
-	if ((app_get_first_core_id(APP_CORE_IPV4_FRAG) != RTE_MAX_LCORE) ||
-		(app_get_first_core_id(APP_CORE_IPV4_RAS) != RTE_MAX_LCORE)) {
-		RTE_LOG(INFO, USER1,
-			"Activating the Ethernet header pop/push ...\n");
-		app.ether_hdr_pop_push = 1;
+	uint64_t time = rte_get_tsc_cycles();
+	uint32_t p_id;
+
+	for (p_id = 0; p_id < APP_MAX_PIPELINES; p_id++) {
+		struct app_pipeline_params *params = &app->pipeline_params[p_id];
+		struct app_pipeline_data *data = &app->pipeline_data[p_id];
+		struct pipeline_type *ptype;
+		struct app_thread_data *t;
+		struct app_thread_pipeline_data *p;
+		int lcore_id;
+
+		if (!APP_PARAM_VALID(params))
+			continue;
+
+		lcore_id = cpu_core_map_get_lcore_id(app->core_map,
+			params->socket_id,
+			params->core_id,
+			params->hyper_th_id);
+
+		if (lcore_id < 0)
+			rte_panic("Invalid core s%uc%u%s\n",
+				params->socket_id,
+				params->core_id,
+				(params->hyper_th_id)? "h" : "");
+
+		t = &app->thread_data[lcore_id];
+
+		ptype = app_pipeline_type_find(app, params->type);
+		if (ptype == NULL)
+			rte_panic("Init error: Unknown pipeline type \"%s\"\n", params->type);
+
+		p = (ptype->ops->f_run == NULL)?
+			&t->regular[t->n_regular] :
+			&t->custom[t->n_custom];
+
+		p->be = data->be;
+		p->f_run = ptype->ops->f_run;
+		p->f_timer = ptype->ops->f_timer;
+		p->timer_period = data->timer_period;
+		p->deadline = time + data->timer_period;
+
+		if (ptype->ops->f_run == NULL)
+			t->n_regular ++;
+		else
+			t->n_custom ++;
 	}
 }
 
-void
-app_init(void)
+int app_init(struct app_params *app)
+{
+	app_init_core_map(app);
+	app_init_core_mask(app);
+
+	app_init_eal(app);
+	app_init_mempool(app);
+	app_init_link(app);
+	app_init_swq(app);
+	app_init_tm(app);
+	app_init_msgq(app);
+
+	app_init_pipelines(app);
+	app_init_threads(app);
+
+	return 0;
+}
+
+static int
+app_pipeline_type_cmd_push(struct app_params *app, struct pipeline_type *ptype)
+{
+	cmdline_parse_ctx_t *cmds;
+	uint32_t n_cmds, i;
+
+	/* Check input arguments */
+	if ((app == NULL) ||
+		(ptype == NULL))
+		return -EINVAL;
+
+	n_cmds = pipeline_type_cmds_count(ptype);
+	if (n_cmds == 0)
+		return 0;
+
+	cmds = ptype->fe_ops->cmds;
+
+	/* Check for available slots in the application commands array */
+	if (n_cmds > APP_MAX_CMDS - app->n_cmds)
+		return -ENOMEM;
+
+	/* Push pipeline commands into the application */
+	memcpy(&app->cmds[app->n_cmds],
+		cmds,
+		n_cmds * sizeof(cmdline_parse_ctx_t *));
+
+	for (i = 0; i < n_cmds; i++)
+		app->cmds[app->n_cmds + i]->data = app;
+
+	app->n_cmds += n_cmds;
+	app->cmds[app->n_cmds] = NULL;
+
+	return 0;
+}
+
+int
+app_pipeline_type_register(struct app_params *app, struct pipeline_type *ptype)
 {
-	if ((sizeof(struct app_pkt_metadata) % RTE_CACHE_LINE_SIZE) != 0)
-		rte_panic("Application pkt meta-data size mismatch\n");
+	uint32_t n_cmds, i;
+
+	/* Check input arguments */
+	if ((app == NULL) ||
+		(ptype == NULL) ||
+		(ptype->name == NULL) ||
+		(strlen(ptype->name) == 0) ||
+		(ptype->ops->f_init == NULL))
+		return -EINVAL;
+
+	/* Check for duplicate entry */
+	for (i = 0; i < app->n_pipeline_types; i++)
+		if (strcmp(app->pipeline_type[i].name ,ptype->name) == 0)
+			return -EEXIST;
+
+	/* Check for resource availability */
+	n_cmds = pipeline_type_cmds_count(ptype);
+	if ((app->n_pipeline_types == APP_MAX_PIPELINE_TYPES) ||
+		(n_cmds > APP_MAX_CMDS - app->n_cmds))
+		return -ENOMEM;
+
+	/* Copy pipeline type */
+	memcpy(&app->pipeline_type[app->n_pipeline_types++],
+		ptype,
+		sizeof(struct pipeline_type));
+
+	/* Copy CLI commands */
+	if (n_cmds)
+		app_pipeline_type_cmd_push(app, ptype);
+
+	return 0;
+}
 
-	app_check_core_params();
+struct
+pipeline_type *app_pipeline_type_find(struct app_params *app, char *name)
+{
+	uint32_t i;
 
-	app_init_mbuf_pools();
-	app_init_rings();
-	app_init_ports();
-	app_init_etc();
+	for (i = 0; i < app->n_pipeline_types; i++)
+		if (strcmp(app->pipeline_type[i].name, name) == 0)
+			return &app->pipeline_type[i];
 
-	RTE_LOG(INFO, USER1, "Initialization completed\n");
+	return NULL;
 }
diff --git a/examples/ip_pipeline/main.c b/examples/ip_pipeline/main.c
index 612eea9..ef68c86 100644
--- a/examples/ip_pipeline/main.c
+++ b/examples/ip_pipeline/main.c
@@ -49,5 +49,8 @@ main(int argc, char **argv)
 
 	app_config_check(&app);
 
+	/* Init */
+	app_init(&app);
+
 	return 0;
 }
-- 
1.7.9.5



More information about the dev mailing list