[dpdk-dev,v9,16/18] examples/distributor: give Rx thread a core

Message ID 1488791433-186137-17-git-send-email-david.hunt@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Hunt, David March 6, 2017, 9:10 a.m. UTC
  This so that with the increased amount of stats we are counting,
we don't interfere with the rx core.

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 examples/distributor/main.c | 50 ++++++++++++++++++++++++++++++---------------
 1 file changed, 34 insertions(+), 16 deletions(-)
  

Comments

Bruce Richardson March 10, 2017, 4:51 p.m. UTC | #1
On Mon, Mar 06, 2017 at 09:10:31AM +0000, David Hunt wrote:
> This so that with the increased amount of stats we are counting,
> we don't interfere with the rx core.
> 
Where are the stats being counted in the current code and how would they
interfere?

/Bruce
  
Hunt, David March 14, 2017, 9:34 a.m. UTC | #2
On 10/3/2017 4:51 PM, Bruce Richardson wrote:
> On Mon, Mar 06, 2017 at 09:10:31AM +0000, David Hunt wrote:
>> This so that with the increased amount of stats we are counting,
>> we don't interfere with the rx core.
>>
> Where are the stats being counted in the current code and how would they
> interfere?
>
> /Bruce

Previous version of the distributor example did not print out several 
lines of
statistics every second, which the new version does. I felt that it it 
would be
better to separate this off to it's own core, so that the rx core 
performance
would not be impacted.

I'll add an extra comment in the commit message.

Regards,
Dave.
  

Patch

diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index cf2e826..8daf43d 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -278,6 +278,7 @@  lcore_rx(struct lcore_params *p)
 
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
+			app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
 			RTE_LOG_DP(DEBUG, DISTRAPP,
 				"%s:Packet loss due to full ring\n", __func__);
 			while (sent < nb_ret)
@@ -295,13 +296,12 @@  lcore_rx(struct lcore_params *p)
 static inline void
 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
-	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-			outbuf->count);
-	app_stats.tx.tx_pkts += nb_tx;
+	unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+			outbuf->mbufs, outbuf->count);
+	app_stats.tx.tx_pkts += outbuf->count;
 
 	if (unlikely(nb_tx < outbuf->count)) {
-		RTE_LOG_DP(DEBUG, DISTRAPP,
-			"%s:Packet loss with tx_burst\n", __func__);
+		app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
 		do {
 			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
 		} while (++nb_tx < outbuf->count);
@@ -313,6 +313,7 @@  static inline void
 flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
 {
 	uint8_t outp;
+
 	for (outp = 0; outp < nb_ports; outp++) {
 		/* skip ports that are not enabled */
 		if ((enabled_port_mask & (1 << outp)) == 0)
@@ -405,9 +406,9 @@  lcore_tx(struct rte_ring *in_r)
 			if ((enabled_port_mask & (1 << port)) == 0)
 				continue;
 
-			struct rte_mbuf *bufs[BURST_SIZE];
+			struct rte_mbuf *bufs[BURST_SIZE_TX];
 			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-					(void *)bufs, BURST_SIZE);
+					(void *)bufs, BURST_SIZE_TX);
 			app_stats.tx.dequeue_pkts += nb_rx;
 
 			/* if we get no traffic, flush anything we have */
@@ -436,11 +437,12 @@  lcore_tx(struct rte_ring *in_r)
 
 				outbuf = &tx_buffers[outp];
 				outbuf->mbufs[outbuf->count++] = bufs[i];
-				if (outbuf->count == BURST_SIZE)
+				if (outbuf->count == BURST_SIZE_TX)
 					flush_one_port(outbuf, outp);
 			}
 		}
 	}
+	printf("\nCore %u exiting tx task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -562,6 +564,8 @@  lcore_worker(struct lcore_params *p)
 	for (i = 0; i < 8; i++)
 		buf[i] = NULL;
 
+	app_stats.worker_pkts[p->worker_id] = 1;
+
 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
 	while (!quit_signal_work) {
 		num = rte_distributor_get_pkt(d, id, buf, buf, num);
@@ -573,6 +577,10 @@  lcore_worker(struct lcore_params *p)
 				rte_pause();
 			buf[i]->port ^= xor_val;
 		}
+
+		app_stats.worker_pkts[p->worker_id] += num;
+		if (num > 0)
+			app_stats.worker_bursts[p->worker_id][num-1]++;
 	}
 	return 0;
 }
@@ -677,9 +685,10 @@  main(int argc, char *argv[])
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-	if (rte_lcore_count() < 4)
+	if (rte_lcore_count() < 5)
 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
-				"least 4 logical cores to run:\n"
+				"least 5 logical cores to run:\n"
+				"1 lcore for stats (can be core 0)\n"
 				"1 lcore for packet RX\n"
 				"1 lcore for distribution\n"
 				"1 lcore for packet TX\n"
@@ -721,7 +730,7 @@  main(int argc, char *argv[])
 	}
 
 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-			rte_lcore_count() - 3,
+			rte_lcore_count() - 4,
 			RTE_DIST_ALG_BURST);
 	if (d == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
@@ -760,7 +769,21 @@  main(int argc, char *argv[])
 			/* tx core */
 			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
 					dist_tx_ring, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 2) {
+			printf("Starting rx on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* rx core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+					p, lcore_id);
 		} else {
+			printf("Starting worker on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
 			struct lcore_params *p =
 					rte_malloc(NULL, sizeof(*p), 0);
 			if (!p)
@@ -773,11 +796,6 @@  main(int argc, char *argv[])
 		}
 		worker_id++;
 	}
-	/* call lcore_main on master core only */
-	struct lcore_params p = { 0, d, rx_dist_ring, dist_tx_ring, mbuf_pool};
-
-	if (lcore_rx(&p) != 0)
-		return -1;
 
 	freq = rte_get_timer_hz();
 	t = rte_rdtsc() + freq;