From patchwork Mon Mar 20 10:08:37 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Hunt, David" X-Patchwork-Id: 22018 Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id CFC89D148; Mon, 20 Mar 2017 18:08:46 +0100 (CET) Received: from mga03.intel.com (mga03.intel.com [134.134.136.65]) by dpdk.org (Postfix) with ESMTP id CDFB4568A for ; Mon, 20 Mar 2017 18:08:13 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=simple/simple; d=intel.com; i=@intel.com; q=dns/txt; s=intel; t=1490029694; x=1521565694; h=from:to:cc:subject:date:message-id:in-reply-to: references; bh=GYmCMkhwDhelY/j+C2vgpZlhKXJdgMIMxJQHzFtf6e8=; b=SxRoulp0cHBaTf6ZnMAzHp0W0+N0VNg981cTOgJXrO/nFZnxcjwwYkkA UmtYVitDOugi9nJ3beO2g761SvuzrQ==; Received: from orsmga004.jf.intel.com ([10.7.209.38]) by orsmga103.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 20 Mar 2017 10:08:13 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.36,195,1486454400"; d="scan'208";a="69069006" Received: from silpixa00397515.ir.intel.com (HELO silpixa00397515.ger.corp.intel.com) ([10.237.223.14]) by orsmga004.jf.intel.com with ESMTP; 20 Mar 2017 10:08:12 -0700 From: David Hunt To: dev@dpdk.org Cc: bruce.richardson@intel.com, David Hunt Date: Mon, 20 Mar 2017 10:08:37 +0000 Message-Id: <1490004522-183515-14-git-send-email-david.hunt@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1490004522-183515-1-git-send-email-david.hunt@intel.com> References: <1489558767-56329-2-git-send-email-david.hunt@intel.com> <1490004522-183515-1-git-send-email-david.hunt@intel.com> Subject: [dpdk-dev] [PATCH v11 13/18] examples/distributor: add dedicated core for dist X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Give the distribution functionality it's own core for performance, otherwise it's limited by the Rx core. Signed-off-by: David Hunt Acked-by: Bruce Richardson --- examples/distributor/main.c | 181 ++++++++++++++++++++++++++++++-------------- 1 file changed, 123 insertions(+), 58 deletions(-) diff --git a/examples/distributor/main.c b/examples/distributor/main.c index 75c001d..96d6454 100644 --- a/examples/distributor/main.c +++ b/examples/distributor/main.c @@ -49,6 +49,8 @@ #define NUM_MBUFS ((64*1024)-1) #define MBUF_CACHE_SIZE 250 #define BURST_SIZE 32 +#define SCHED_RX_RING_SZ 8192 +#define SCHED_TX_RING_SZ 65536 #define RTE_RING_SZ 1024 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 @@ -193,37 +195,14 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool) struct lcore_params { unsigned worker_id; struct rte_distributor *d; - struct rte_ring *r; + struct rte_ring *rx_dist_ring; + struct rte_ring *dist_tx_ring; struct rte_mempool *mem_pool; }; static int -quit_workers(struct rte_distributor *d, struct rte_mempool *p) -{ - const unsigned num_workers = rte_lcore_count() - 2; - unsigned i; - struct rte_mbuf *bufs[num_workers]; - - if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { - printf("line %d: Error getting mbufs from pool\n", __LINE__); - return -1; - } - - for (i = 0; i < num_workers; i++) - bufs[i]->hash.rss = i << 1; - - rte_distributor_process(d, bufs, num_workers); - rte_mempool_put_bulk(p, (void *)bufs, num_workers); - - return 0; -} - -static int lcore_rx(struct lcore_params *p) { - struct rte_distributor *d = p->d; - struct rte_mempool *mem_pool = p->mem_pool; - struct rte_ring *r = p->r; const uint8_t nb_ports = rte_eth_dev_count(); const int socket_id = rte_socket_id(); uint8_t port; @@ -260,9 +239,15 @@ lcore_rx(struct lcore_params *p) } app_stats.rx.rx_pkts += nb_rx; - rte_distributor_process(d, bufs, nb_rx); - const uint16_t nb_ret = rte_distributor_returned_pkts(d, - bufs, BURST_SIZE*2); +/* + * You can run the distributor on the rx core with this code. Returned + * packets are then send straight to the tx core. + */ +#if 0 + rte_distributor_process(d, bufs, nb_rx); + const uint16_t nb_ret = rte_distributor_returned_pktsd, + bufs, BURST_SIZE*2); + app_stats.rx.returned_pkts += nb_ret; if (unlikely(nb_ret == 0)) { if (++port == nb_ports) @@ -270,7 +255,22 @@ lcore_rx(struct lcore_params *p) continue; } - uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); + struct rte_ring *tx_ring = p->dist_tx_ring; + uint16_t sent = rte_ring_enqueue_burst(tx_ring, + (void *)bufs, nb_ret); +#else + uint16_t nb_ret = nb_rx; + /* + * Swap the following two lines if you want the rx traffic + * to go directly to tx, no distribution. + */ + struct rte_ring *out_ring = p->rx_dist_ring; + /* struct rte_ring *out_ring = p->dist_tx_ring; */ + + uint16_t sent = rte_ring_enqueue_burst(out_ring, + (void *)bufs, nb_ret); +#endif + app_stats.rx.enqueued_pkts += sent; if (unlikely(sent < nb_ret)) { RTE_LOG_DP(DEBUG, DISTRAPP, @@ -281,20 +281,9 @@ lcore_rx(struct lcore_params *p) if (++port == nb_ports) port = 0; } - rte_distributor_process(d, NULL, 0); - /* flush distributor to bring to known state */ - rte_distributor_flush(d); /* set worker & tx threads quit flag */ + printf("\nCore %u exiting rx task.\n", rte_lcore_id()); quit_signal = 1; - /* - * worker threads may hang in get packet as - * distributor process is not running, just make sure workers - * get packets till quit_signal is actually been - * received and they gracefully shutdown - */ - if (quit_workers(d, mem_pool) != 0) - return -1; - /* rx thread should quit at last */ return 0; } @@ -331,6 +320,58 @@ flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) } } + + +static int +lcore_distributor(struct lcore_params *p) +{ + struct rte_ring *in_r = p->rx_dist_ring; + struct rte_ring *out_r = p->dist_tx_ring; + struct rte_mbuf *bufs[BURST_SIZE * 4]; + struct rte_distributor *d = p->d; + + printf("\nCore %u acting as distributor core.\n", rte_lcore_id()); + while (!quit_signal_dist) { + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, + (void *)bufs, BURST_SIZE*1); + if (nb_rx) { + app_stats.dist.in_pkts += nb_rx; + + /* Distribute the packets */ + rte_distributor_process(d, bufs, nb_rx); + /* Handle Returns */ + const uint16_t nb_ret = + rte_distributor_returned_pkts(d, + bufs, BURST_SIZE*2); + + if (unlikely(nb_ret == 0)) + continue; + app_stats.dist.ret_pkts += nb_ret; + + uint16_t sent = rte_ring_enqueue_burst(out_r, + (void *)bufs, nb_ret); + app_stats.dist.sent_pkts += sent; + if (unlikely(sent < nb_ret)) { + app_stats.dist.enqdrop_pkts += nb_ret - sent; + RTE_LOG(DEBUG, DISTRAPP, + "%s:Packet loss due to full out ring\n", + __func__); + while (sent < nb_ret) + rte_pktmbuf_free(bufs[sent++]); + } + } + } + printf("\nCore %u exiting distributor task.\n", rte_lcore_id()); + quit_signal_work = 1; + + rte_distributor_flush(d); + /* Unblock any returns so workers can exit */ + rte_distributor_clear_returns(d); + quit_signal_rx = 1; + return 0; +} + + static int lcore_tx(struct rte_ring *in_r) { @@ -403,7 +444,7 @@ int_handler(int sig_num) { printf("Exiting on signal %d\n", sig_num); /* set quit flag for rx thread to exit */ - quit_signal_rx = 1; + quit_signal_dist = 1; } static void @@ -517,7 +558,7 @@ lcore_worker(struct lcore_params *p) buf[i] = NULL; printf("\nCore %u acting as worker core.\n", rte_lcore_id()); - while (!quit_signal) { + while (!quit_signal_work) { num = rte_distributor_get_pkt(d, id, buf, buf, num); /* Do a little bit of work for each packet */ for (i = 0; i < num; i++) { @@ -608,7 +649,8 @@ main(int argc, char *argv[]) { struct rte_mempool *mbuf_pool; struct rte_distributor *d; - struct rte_ring *output_ring; + struct rte_ring *dist_tx_ring; + struct rte_ring *rx_dist_ring; unsigned lcore_id, worker_id = 0; unsigned nb_ports; uint8_t portid; @@ -630,10 +672,11 @@ main(int argc, char *argv[]) if (ret < 0) rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); - if (rte_lcore_count() < 3) + if (rte_lcore_count() < 4) rte_exit(EXIT_FAILURE, "Error, This application needs at " - "least 3 logical cores to run:\n" - "1 lcore for packet RX and distribution\n" + "least 4 logical cores to run:\n" + "1 lcore for packet RX\n" + "1 lcore for distribution\n" "1 lcore for packet TX\n" "and at least 1 lcore for worker threads\n"); @@ -673,30 +716,52 @@ main(int argc, char *argv[]) } d = rte_distributor_create("PKT_DIST", rte_socket_id(), - rte_lcore_count() - 2, + rte_lcore_count() - 3, RTE_DIST_ALG_BURST); if (d == NULL) rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); /* - * scheduler ring is read only by the transmitter core, but written to - * by multiple threads + * scheduler ring is read by the transmitter core, and written to + * by scheduler core */ - output_ring = rte_ring_create("Output_ring", RTE_RING_SZ, - rte_socket_id(), RING_F_SC_DEQ); - if (output_ring == NULL) + dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ, + rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); + if (dist_tx_ring == NULL) + rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); + + rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ, + rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); + if (rx_dist_ring == NULL) rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); RTE_LCORE_FOREACH_SLAVE(lcore_id) { - if (worker_id == rte_lcore_count() - 2) + if (worker_id == rte_lcore_count() - 3) { + printf("Starting distributor on lcore_id %d\n", + lcore_id); + /* distributor core */ + struct lcore_params *p = + rte_malloc(NULL, sizeof(*p), 0); + if (!p) + rte_panic("malloc failure\n"); + *p = (struct lcore_params){worker_id, d, + rx_dist_ring, dist_tx_ring, mbuf_pool}; + rte_eal_remote_launch( + (lcore_function_t *)lcore_distributor, + p, lcore_id); + } else if (worker_id == rte_lcore_count() - 4) { + printf("Starting tx on worker_id %d, lcore_id %d\n", + worker_id, lcore_id); + /* tx core */ rte_eal_remote_launch((lcore_function_t *)lcore_tx, - output_ring, lcore_id); - else { + dist_tx_ring, lcore_id); + } else { struct lcore_params *p = rte_malloc(NULL, sizeof(*p), 0); if (!p) rte_panic("malloc failure\n"); - *p = (struct lcore_params){worker_id, d, output_ring, mbuf_pool}; + *p = (struct lcore_params){worker_id, d, rx_dist_ring, + dist_tx_ring, mbuf_pool}; rte_eal_remote_launch((lcore_function_t *)lcore_worker, p, lcore_id); @@ -704,7 +769,7 @@ main(int argc, char *argv[]) worker_id++; } /* call lcore_main on master core only */ - struct lcore_params p = { 0, d, output_ring, mbuf_pool}; + struct lcore_params p = { 0, d, rx_dist_ring, dist_tx_ring, mbuf_pool}; if (lcore_rx(&p) != 0) return -1;