[dpdk-dev,v9,14/18] examples/distributor: give distributor a core

Message ID 1488791433-186137-15-git-send-email-david.hunt@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Hunt, David March 6, 2017, 9:10 a.m. UTC
  Signed-off-by: David Hunt <david.hunt@intel.com>
---
 examples/distributor/main.c | 181 ++++++++++++++++++++++++++++++--------------
 1 file changed, 123 insertions(+), 58 deletions(-)
  

Comments

Bruce Richardson March 10, 2017, 4:49 p.m. UTC | #1
On Mon, Mar 06, 2017 at 09:10:29AM +0000, David Hunt wrote:
> Signed-off-by: David Hunt <david.hunt@intel.com>
> ---

Title could do with some rewording - e.g. "make distributor API calls on
dedicated core"

This also requires an explanation as to why the change is being made.
Does it not also need an update to the sample app guide about how the
app works?

/Bruce
  
Hunt, David March 14, 2017, 10:48 a.m. UTC | #2
On 10/3/2017 4:49 PM, Bruce Richardson wrote:
> On Mon, Mar 06, 2017 at 09:10:29AM +0000, David Hunt wrote:
>> Signed-off-by: David Hunt <david.hunt@intel.com>
>> ---
> Title could do with some rewording - e.g. "make distributor API calls on
> dedicated core"
>
> This also requires an explanation as to why the change is being made.
> Does it not also need an update to the sample app guide about how the
> app works?
>
> /Bruce

Yes, sure. And I'll add some more info into the dist_app.rst file.
Rgds,
Dave.
  

Patch

diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index aeb75a8..e9ebe5e 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -49,6 +49,8 @@ 
 #define NUM_MBUFS ((64*1024)-1)
 #define MBUF_CACHE_SIZE 250
 #define BURST_SIZE 32
+#define SCHED_RX_RING_SZ 8192
+#define SCHED_TX_RING_SZ 65536
 #define RTE_RING_SZ 1024
 
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
@@ -193,37 +195,14 @@  port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 struct lcore_params {
 	unsigned worker_id;
 	struct rte_distributor *d;
-	struct rte_ring *r;
+	struct rte_ring *rx_dist_ring;
+	struct rte_ring *dist_tx_ring;
 	struct rte_mempool *mem_pool;
 };
 
 static int
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
-{
-	const unsigned num_workers = rte_lcore_count() - 2;
-	unsigned i;
-	struct rte_mbuf *bufs[num_workers];
-
-	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
-		printf("line %d: Error getting mbufs from pool\n", __LINE__);
-		return -1;
-	}
-
-	for (i = 0; i < num_workers; i++)
-		bufs[i]->hash.rss = i << 1;
-
-	rte_distributor_process(d, bufs, num_workers);
-	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
-
-	return 0;
-}
-
-static int
 lcore_rx(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
-	struct rte_mempool *mem_pool = p->mem_pool;
-	struct rte_ring *r = p->r;
 	const uint8_t nb_ports = rte_eth_dev_count();
 	const int socket_id = rte_socket_id();
 	uint8_t port;
@@ -260,9 +239,15 @@  lcore_rx(struct lcore_params *p)
 		}
 		app_stats.rx.rx_pkts += nb_rx;
 
-		rte_distributor_process(d, bufs, nb_rx);
-		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
-				bufs, BURST_SIZE*2);
+/*
+ * You can run the distributor on the rx core with this code. Returned
+ * packets are then send straight to the tx core.
+ */
+#if 0
+	rte_distributor_process(d, bufs, nb_rx);
+	const uint16_t nb_ret = rte_distributor_returned_pktsd,
+			bufs, BURST_SIZE*2);
+
 		app_stats.rx.returned_pkts += nb_ret;
 		if (unlikely(nb_ret == 0)) {
 			if (++port == nb_ports)
@@ -270,7 +255,22 @@  lcore_rx(struct lcore_params *p)
 			continue;
 		}
 
-		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+		struct rte_ring *tx_ring = p->dist_tx_ring;
+		uint16_t sent = rte_ring_enqueue_burst(tx_ring,
+				(void *)bufs, nb_ret);
+#else
+		uint16_t nb_ret = nb_rx;
+		/*
+		 * Swap the following two lines if you want the rx traffic
+		 * to go directly to tx, no distribution.
+		 */
+		struct rte_ring *out_ring = p->rx_dist_ring;
+		/* struct rte_ring *out_ring = p->dist_tx_ring; */
+
+		uint16_t sent = rte_ring_enqueue_burst(out_ring,
+				(void *)bufs, nb_ret);
+#endif
+
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
 			RTE_LOG_DP(DEBUG, DISTRAPP,
@@ -281,20 +281,9 @@  lcore_rx(struct lcore_params *p)
 		if (++port == nb_ports)
 			port = 0;
 	}
-	rte_distributor_process(d, NULL, 0);
-	/* flush distributor to bring to known state */
-	rte_distributor_flush(d);
 	/* set worker & tx threads quit flag */
+	printf("\nCore %u exiting rx task.\n", rte_lcore_id());
 	quit_signal = 1;
-	/*
-	 * worker threads may hang in get packet as
-	 * distributor process is not running, just make sure workers
-	 * get packets till quit_signal is actually been
-	 * received and they gracefully shutdown
-	 */
-	if (quit_workers(d, mem_pool) != 0)
-		return -1;
-	/* rx thread should quit at last */
 	return 0;
 }
 
@@ -331,6 +320,58 @@  flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
 	}
 }
 
+
+
+static int
+lcore_distributor(struct lcore_params *p)
+{
+	struct rte_ring *in_r = p->rx_dist_ring;
+	struct rte_ring *out_r = p->dist_tx_ring;
+	struct rte_mbuf *bufs[BURST_SIZE * 4];
+	struct rte_distributor *d = p->d;
+
+	printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
+	while (!quit_signal_dist) {
+		const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
+				(void *)bufs, BURST_SIZE*1);
+		if (nb_rx) {
+			app_stats.dist.in_pkts += nb_rx;
+
+			/* Distribute the packets */
+			rte_distributor_process(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts(d,
+					bufs, BURST_SIZE*2);
+
+			if (unlikely(nb_ret == 0))
+				continue;
+			app_stats.dist.ret_pkts += nb_ret;
+
+			uint16_t sent = rte_ring_enqueue_burst(out_r,
+					(void *)bufs, nb_ret);
+			app_stats.dist.sent_pkts += sent;
+			if (unlikely(sent < nb_ret)) {
+				app_stats.dist.enqdrop_pkts += nb_ret - sent;
+				RTE_LOG(DEBUG, DISTRAPP,
+					"%s:Packet loss due to full out ring\n",
+					__func__);
+				while (sent < nb_ret)
+					rte_pktmbuf_free(bufs[sent++]);
+			}
+		}
+	}
+	printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
+	quit_signal_work = 1;
+
+	rte_distributor_flush(d);
+	/* Unblock any returns so workers can exit */
+	rte_distributor_clear_returns(d);
+	quit_signal_rx = 1;
+	return 0;
+}
+
+
 static int
 lcore_tx(struct rte_ring *in_r)
 {
@@ -403,7 +444,7 @@  int_handler(int sig_num)
 {
 	printf("Exiting on signal %d\n", sig_num);
 	/* set quit flag for rx thread to exit */
-	quit_signal_rx = 1;
+	quit_signal_dist = 1;
 }
 
 static void
@@ -517,7 +558,7 @@  lcore_worker(struct lcore_params *p)
 		buf[i] = NULL;
 
 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
-	while (!quit_signal) {
+	while (!quit_signal_work) {
 		num = rte_distributor_get_pkt(d, id, buf, buf, num);
 		/* Do a little bit of work for each packet */
 		for (i = 0; i < num; i++) {
@@ -608,7 +649,8 @@  main(int argc, char *argv[])
 {
 	struct rte_mempool *mbuf_pool;
 	struct rte_distributor *d;
-	struct rte_ring *output_ring;
+	struct rte_ring *dist_tx_ring;
+	struct rte_ring *rx_dist_ring;
 	unsigned lcore_id, worker_id = 0;
 	unsigned nb_ports;
 	uint8_t portid;
@@ -630,10 +672,11 @@  main(int argc, char *argv[])
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-	if (rte_lcore_count() < 3)
+	if (rte_lcore_count() < 4)
 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
-				"least 3 logical cores to run:\n"
-				"1 lcore for packet RX and distribution\n"
+				"least 4 logical cores to run:\n"
+				"1 lcore for packet RX\n"
+				"1 lcore for distribution\n"
 				"1 lcore for packet TX\n"
 				"and at least 1 lcore for worker threads\n");
 
@@ -673,30 +716,52 @@  main(int argc, char *argv[])
 	}
 
 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-			rte_lcore_count() - 2,
+			rte_lcore_count() - 3,
 			RTE_DIST_ALG_BURST);
 	if (d == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
 
 	/*
-	 * scheduler ring is read only by the transmitter core, but written to
-	 * by multiple threads
+	 * scheduler ring is read by the transmitter core, and written to
+	 * by scheduler core
 	 */
-	output_ring = rte_ring_create("Output_ring", RTE_RING_SZ,
-			rte_socket_id(), RING_F_SC_DEQ);
-	if (output_ring == NULL)
+	dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (dist_tx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
+
+	rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (rx_dist_ring == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-		if (worker_id == rte_lcore_count() - 2)
+		if (worker_id == rte_lcore_count() - 3) {
+			printf("Starting distributor on lcore_id %d\n",
+					lcore_id);
+			/* distributor core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d,
+				rx_dist_ring, dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch(
+				(lcore_function_t *)lcore_distributor,
+				p, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 4) {
+			printf("Starting tx  on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* tx core */
 			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
-					output_ring, lcore_id);
-		else {
+					dist_tx_ring, lcore_id);
+		} else {
 			struct lcore_params *p =
 					rte_malloc(NULL, sizeof(*p), 0);
 			if (!p)
 				rte_panic("malloc failure\n");
-			*p = (struct lcore_params){worker_id, d, output_ring, mbuf_pool};
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
 
 			rte_eal_remote_launch((lcore_function_t *)lcore_worker,
 					p, lcore_id);
@@ -704,7 +769,7 @@  main(int argc, char *argv[])
 		worker_id++;
 	}
 	/* call lcore_main on master core only */
-	struct lcore_params p = { 0, d, output_ring, mbuf_pool};
+	struct lcore_params p = { 0, d, rx_dist_ring, dist_tx_ring, mbuf_pool};
 
 	if (lcore_rx(&p) != 0)
 		return -1;