[dpdk-dev] [PATCH v5 1/3] examples/eventdev_pipeline: added sample app
Hunt, David
david.hunt at intel.com
Tue Jul 4 09:55:25 CEST 2017
Hi Jerin,
On 3/7/2017 4:57 AM, Jerin Jacob wrote:
> -----Original Message-----
>> From: Harry van Haaren<harry.van.haaren at intel.com>
>>
>> This commit adds a sample app for the eventdev library.
>> The app has been tested with DPDK 17.05-rc2, hence this
>> release (or later) is recommended.
>>
>> The sample app showcases a pipeline processing use-case,
>> with event scheduling and processing defined per stage.
>> The application receives traffic as normal, with each
>> packet traversing the pipeline. Once the packet has
>> been processed by each of the pipeline stages, it is
>> transmitted again.
>>
>> The app provides a framework to utilize cores for a single
>> role or multiple roles. Examples of roles are the RX core,
>> TX core, Scheduling core (in the case of the event/sw PMD),
>> and worker cores.
>>
>> Various flags are available to configure numbers of stages,
>> cycles of work at each stage, type of scheduling, number of
>> worker cores, queue depths etc. For a full explaination,
>> please refer to the documentation.
> A few comments on bugs and "to avoid the future rework on base code when
> HW PMD is introduced". As we agreed, We will keep the functionality intact to
> provide an application to test ethdev + eventdev with _SW PMD_ for 17.08
>
Sure OK. I will Address.
>> ---
>> examples/Makefile | 2 +
>> examples/eventdev_pipeline/Makefile | 49 ++
>> examples/eventdev_pipeline/main.c | 999 ++++++++++++++++++++++++++++++++++++
>> 3 files changed, 1050 insertions(+)
>> create mode 100644 examples/eventdev_pipeline/Makefile
>> create mode 100644 examples/eventdev_pipeline/main.c
> Do we need to update the MAINTAINERS file?
Updated
>> diff --git a/examples/Makefile b/examples/Makefile
>> index 6298626..a6dcc2b 100644
>> --- a/examples/Makefile
>> +++ b/examples/Makefile
>> @@ -100,4 +100,6 @@ $(info vm_power_manager requires libvirt >= 0.9.3)
>> endif
>> endif
>>
>> +DIRS-y += eventdev_pipeline
> Can you change to eventdev_pipeline_sw_pmd to emphasis on the scope.
> We will rename to eventdev_pipeline once it working effectively on both SW and HW
> PMD with ethdev.
OK, I've updated the directory, app name and relevant docs across the
board so they're all
eventdev_pipeline_sw_pmd. This should make it clear to anyone using it
that it's for the
sw_pmd only, and an updated version will be provided later.
>> +
>> include $(RTE_SDK)/mk/rte.extsubdir.mk
>> diff --git a/examples/eventdev_pipeline/Makefile b/examples/eventdev_pipeline/Makefile
>> new file mode 100644
>> index 0000000..4c26e15
>> --- /dev/null
>> +++ b/examples/eventdev_pipeline/Makefile
>> @@ -0,0 +1,49 @@
>> +# BSD LICENSE
>> +#
>> +# Copyright(c) 2016 Intel Corporation. All rights reserved.
> 2016-2017
Done.
>> +#
>> +# Redistribution and use in source and binary forms, with or without
>> +# modification, are permitted provided that the following conditions
>> +# are met:
>> +#
>> +
>> +static unsigned int active_cores;
>> +static unsigned int num_workers;
>> +static long num_packets = (1L << 25); /* do ~32M packets */
>> +static unsigned int num_fids = 512;
>> +static unsigned int num_stages = 1;
>> +static unsigned int worker_cq_depth = 16;
>> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
>> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
>> +static int16_t qid[MAX_NUM_STAGES] = {-1};
>> +static int worker_cycles;
>> +static int enable_queue_priorities;
>> +
>> +struct prod_data {
>> + uint8_t dev_id;
>> + uint8_t port_id;
>> + int32_t qid;
>> + unsigned int num_nic_ports;
>> +} __rte_cache_aligned;
>> +
>> +struct cons_data {
>> + uint8_t dev_id;
>> + uint8_t port_id;
>> +} __rte_cache_aligned;
>> +
>> +static struct prod_data prod_data;
>> +static struct cons_data cons_data;
>> +
>> +struct worker_data {
>> + uint8_t dev_id;
>> + uint8_t port_id;
>> +} __rte_cache_aligned;
>> +
>> +static unsigned int *enqueue_cnt;
>> +static unsigned int *dequeue_cnt;
> Not been consumed. Remove it.
Done.
>> +
>> +static volatile int done;
>> +static int quiet;
>> +static int dump_dev;
>> +static int dump_dev_signal;
>> +
>> +static uint32_t rx_lock;
>> +static uint32_t tx_lock;
>> +static uint32_t sched_lock;
>> +static bool rx_single;
>> +static bool tx_single;
>> +static bool sched_single;
>> +
>> +static unsigned int rx_core[MAX_NUM_CORE];
>> +static unsigned int tx_core[MAX_NUM_CORE];
>> +static unsigned int sched_core[MAX_NUM_CORE];
>> +static unsigned int worker_core[MAX_NUM_CORE];
>> +
>> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
> Could you please remove this global variable and group under a structure
> for "command line parsing specific" and "fast path specific"(anything comes
> in producer(), worker() and consumer()). And please
> allocate "fast path specific" structure variable from huge page area.
> So that we can easily add new parsing and fastpath variable in future.
>
Done. Fastpath vars now allocated using rte_malloc()
>> +
>> +static int
>> +consumer(void)
>> +{
>> + const uint64_t freq_khz = rte_get_timer_hz() / 1000;
>> + struct rte_event packets[BATCH_SIZE];
>> +
>> + static uint64_t received;
>> + static uint64_t last_pkts;
>> + static uint64_t last_time;
>> + static uint64_t start_time;
>> + unsigned int i, j;
>> + uint8_t dev_id = cons_data.dev_id;
>> + uint8_t port_id = cons_data.port_id;
>> +
>> + uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
>> + packets, RTE_DIM(packets), 0);
>> +
>> + if (n == 0) {
>> + for (j = 0; j < rte_eth_dev_count(); j++)
>> + rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
>> + return 0;
>> + }
>> + if (start_time == 0)
>> + last_time = start_time = rte_get_timer_cycles();
>> +
>> + received += n;
>> + for (i = 0; i < n; i++) {
>> + uint8_t outport = packets[i].mbuf->port;
>> + rte_eth_tx_buffer(outport, 0, tx_buf[outport],
>> + packets[i].mbuf);
>> + }
>> +
>> + /* Print out mpps every 1<22 packets */
>> + if (!quiet && received >= last_pkts + (1<<22)) {
>> + const uint64_t now = rte_get_timer_cycles();
>> + const uint64_t total_ms = (now - start_time) / freq_khz;
>> + const uint64_t delta_ms = (now - last_time) / freq_khz;
>> + uint64_t delta_pkts = received - last_pkts;
>> +
>> + printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
>> + "avg %.3f mpps [current %.3f mpps]\n",
>> + received,
>> + total_ms,
>> + received / (total_ms * 1000.0),
>> + delta_pkts / (delta_ms * 1000.0));
>> + last_pkts = received;
>> + last_time = now;
>> + }
>> +
>> + dequeue_cnt[0] += n;
> Not really used.
Removed
>> +
>> + num_packets -= n;
>> + if (num_packets <= 0)
>> + done = 1;
>> +
>> + return 0;
>> +}
>> +
>> +static int
>> +producer(void)
>> +{
>> + static uint8_t eth_port;
>> + struct rte_mbuf *mbufs[BATCH_SIZE+2];
>> + struct rte_event ev[BATCH_SIZE+2];
>> + uint32_t i, num_ports = prod_data.num_nic_ports;
>> + int32_t qid = prod_data.qid;
>> + uint8_t dev_id = prod_data.dev_id;
>> + uint8_t port_id = prod_data.port_id;
>> + uint32_t prio_idx = 0;
>> +
>> + const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
>> + if (++eth_port == num_ports)
>> + eth_port = 0;
>> + if (nb_rx == 0) {
>> + rte_pause();
>> + return 0;
>> + }
>> +
>> + for (i = 0; i < nb_rx; i++) {
>> + ev[i].flow_id = mbufs[i]->hash.rss;
>> + ev[i].op = RTE_EVENT_OP_NEW;
>> + ev[i].sched_type = queue_type;
>> + ev[i].queue_id = qid;
>> + ev[i].event_type = RTE_EVENT_TYPE_ETHDEV;
>> + ev[i].sub_event_type = 0;
>> + ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
>> + ev[i].mbuf = mbufs[i];
>> + RTE_SET_USED(prio_idx);
>> + }
>> +
>> + const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);
>> + if (nb_tx != nb_rx) {
>> + for (i = nb_tx; i < nb_rx; i++)
>> + rte_pktmbuf_free(mbufs[i]);
>> + }
>> + enqueue_cnt[0] += nb_tx;
> Not really used.
Removed
>> +
>> + return 0;
>> +}
>> +
>> +
>> +static inline void
>> +work(struct rte_mbuf *m)
>> +{
>> + struct ether_hdr *eth;
>> + struct ether_addr addr;
>> +
>> + /* change mac addresses on packet (to use mbuf data) */
>> + eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
>> + ether_addr_copy(ð->d_addr, &addr);
>> + ether_addr_copy(ð->s_addr, ð->d_addr);
>> + ether_addr_copy(&addr, ð->s_addr);
> If it is even number of stages(say 2), Will mac swap be negated? as we are
> swapping on each stage NOT in consumer?
The mac swap is just to touch the mbuf. It does not matter if it is negated.
>> +
>> + /* do a number of cycles of work per packet */
>> + volatile uint64_t start_tsc = rte_rdtsc();
>> + while (rte_rdtsc() < start_tsc + worker_cycles)
>> + rte_pause();
>> +}
>> +
>> +static int
>> +worker(void *arg)
> Looks good.
>
>> +/*
>> + * Initializes a given port using global settings and with the RX buffers
>> + * coming from the mbuf_pool passed as a parameter.
>> + */
>> +static inline int
>> +port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> Looks good.
>
>> +static int
>> +setup_eventdev(struct prod_data *prod_data,
>> + struct cons_data *cons_data,
>> + struct worker_data *worker_data)
>> +{
>> + /* final queue for sending to TX core */
>> + if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
>> + printf("%d: error creating qid %d\n", __LINE__, i);
>> + return -1;
>> + }
>> + tx_queue.queue_id = i;
>> + tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
>> +
>> + if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
>> + tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
> s/tx_p_conf.dequeue_depth/wkr_p_conf.dequeue_depth
done, along with other similar coding errors.
--snip--
>> +
>> +int
>> +main(int argc, char **argv)
>> +{
>> + struct worker_data *worker_data;
>> + unsigned int num_ports;
>> + int lcore_id;
>> + int err;
>> +
>> + signal(SIGINT, signal_handler);
>> + signal(SIGTERM, signal_handler);
>> + signal(SIGTSTP, signal_handler);
>> +
>> + if (!quiet) {
>> + printf("\nPort Workload distribution:\n");
>> + uint32_t i;
>> + uint64_t tot_pkts = 0;
>> + uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
>> + for (i = 0; i < num_workers; i++) {
>> + char statname[64];
>> + snprintf(statname, sizeof(statname), "port_%u_rx",
>> + worker_data[i].port_id);
>> + pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
>> + dev_id, statname, NULL);
> As discussed, Check the the given xstat supported on the PMD first.
Checking has now been implemented. It'd done by calling
rte_event_dev_xstats_by_name_get()
and seeing if the result is -ENOTSUP. However there is a bug in the
function in that it is declared
as a uint64_t, but then returns a -ENOTSUP, so I have to cast the
-ENOTSUP as a uint64_t for
comparison. This will need to be fixed when the function is patched.
retval = rte_event_dev_xstats_by_name_get(
dev_id, statname, NULL);
if (retval != (uint64_t)-ENOTSUP) {
pkts_per_wkr[i] = retval;
tot_pkts += pkts_per_wkr[i];
}
>> + tot_pkts += pkts_per_wkr[i];
>> + }
>> + for (i = 0; i < num_workers; i++) {
>> + float pc = pkts_per_wkr[i] * 100 /
>> + ((float)tot_pkts);
>> + printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
>> + i, pc, pkts_per_wkr[i]);
>> + }
>> +
>> + }
>> +
>> + return 0;
>> +}
> With above changes,
>
> Jerin Jacob<jerin.jacob at caviumnetworks.com>
Thanks for the reviews.
Regards,
Dave.
More information about the dev
mailing list