[dpdk-dev] [PATCH v5 1/3] examples/eventdev_pipeline: added sample app

Hunt, David david.hunt at intel.com
Tue Jul 4 09:55:25 CEST 2017


Hi Jerin,


On 3/7/2017 4:57 AM, Jerin Jacob wrote:
> -----Original Message-----
>> From: Harry van Haaren<harry.van.haaren at intel.com>
>>
>> This commit adds a sample app for the eventdev library.
>> The app has been tested with DPDK 17.05-rc2, hence this
>> release (or later) is recommended.
>>
>> The sample app showcases a pipeline processing use-case,
>> with event scheduling and processing defined per stage.
>> The application receives traffic as normal, with each
>> packet traversing the pipeline. Once the packet has
>> been processed by each of the pipeline stages, it is
>> transmitted again.
>>
>> The app provides a framework to utilize cores for a single
>> role or multiple roles. Examples of roles are the RX core,
>> TX core, Scheduling core (in the case of the event/sw PMD),
>> and worker cores.
>>
>> Various flags are available to configure numbers of stages,
>> cycles of work at each stage, type of scheduling, number of
>> worker cores, queue depths etc. For a full explaination,
>> please refer to the documentation.
> A few comments on bugs and "to avoid the future rework on base code when
> HW PMD is introduced". As we agreed, We will keep the functionality intact to
> provide an application to test ethdev + eventdev with _SW PMD_ for 17.08
>

Sure OK. I will Address.

>> ---
>>   examples/Makefile                   |   2 +
>>   examples/eventdev_pipeline/Makefile |  49 ++
>>   examples/eventdev_pipeline/main.c   | 999 ++++++++++++++++++++++++++++++++++++
>>   3 files changed, 1050 insertions(+)
>>   create mode 100644 examples/eventdev_pipeline/Makefile
>>   create mode 100644 examples/eventdev_pipeline/main.c
> Do we need to update the MAINTAINERS file?

Updated
>> diff --git a/examples/Makefile b/examples/Makefile
>> index 6298626..a6dcc2b 100644
>> --- a/examples/Makefile
>> +++ b/examples/Makefile
>> @@ -100,4 +100,6 @@ $(info vm_power_manager requires libvirt >= 0.9.3)
>>   endif
>>   endif
>>   
>> +DIRS-y += eventdev_pipeline
> Can you change to eventdev_pipeline_sw_pmd to emphasis on the scope.
> We will rename to eventdev_pipeline once it working effectively on both SW and HW
> PMD with ethdev.

OK, I've updated the directory, app name and relevant docs across the 
board so they're all
eventdev_pipeline_sw_pmd. This should make it clear to anyone using it 
that it's for the
sw_pmd only, and an updated version will be provided later.


>> +
>>   include $(RTE_SDK)/mk/rte.extsubdir.mk
>> diff --git a/examples/eventdev_pipeline/Makefile b/examples/eventdev_pipeline/Makefile
>> new file mode 100644
>> index 0000000..4c26e15
>> --- /dev/null
>> +++ b/examples/eventdev_pipeline/Makefile
>> @@ -0,0 +1,49 @@
>> +#   BSD LICENSE
>> +#
>> +#   Copyright(c) 2016 Intel Corporation. All rights reserved.
> 2016-2017

Done.

>> +#
>> +#   Redistribution and use in source and binary forms, with or without
>> +#   modification, are permitted provided that the following conditions
>> +#   are met:
>> +#
>> +
>> +static unsigned int active_cores;
>> +static unsigned int num_workers;
>> +static long num_packets = (1L << 25); /* do ~32M packets */
>> +static unsigned int num_fids = 512;
>> +static unsigned int num_stages = 1;
>> +static unsigned int worker_cq_depth = 16;
>> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
>> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
>> +static int16_t qid[MAX_NUM_STAGES] = {-1};
>> +static int worker_cycles;
>> +static int enable_queue_priorities;
>> +
>> +struct prod_data {
>> +	uint8_t dev_id;
>> +	uint8_t port_id;
>> +	int32_t qid;
>> +	unsigned int num_nic_ports;
>> +} __rte_cache_aligned;
>> +
>> +struct cons_data {
>> +	uint8_t dev_id;
>> +	uint8_t port_id;
>> +} __rte_cache_aligned;
>> +
>> +static struct prod_data prod_data;
>> +static struct cons_data cons_data;
>> +
>> +struct worker_data {
>> +	uint8_t dev_id;
>> +	uint8_t port_id;
>> +} __rte_cache_aligned;
>> +
>> +static unsigned int *enqueue_cnt;
>> +static unsigned int *dequeue_cnt;
> Not been consumed. Remove it.

Done.

>> +
>> +static volatile int done;
>> +static int quiet;
>> +static int dump_dev;
>> +static int dump_dev_signal;
>> +
>> +static uint32_t rx_lock;
>> +static uint32_t tx_lock;
>> +static uint32_t sched_lock;
>> +static bool rx_single;
>> +static bool tx_single;
>> +static bool sched_single;
>> +
>> +static unsigned int rx_core[MAX_NUM_CORE];
>> +static unsigned int tx_core[MAX_NUM_CORE];
>> +static unsigned int sched_core[MAX_NUM_CORE];
>> +static unsigned int worker_core[MAX_NUM_CORE];
>> +
>> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
> Could you please remove this global variable and group under a structure
> for "command line parsing specific" and "fast path specific"(anything comes
> in producer(), worker() and consumer()). And please
> allocate "fast path specific" structure variable from huge page area.
> So that we can easily add new parsing and fastpath variable in future.
>

Done. Fastpath vars now allocated using rte_malloc()

>> +
>> +static int
>> +consumer(void)
>> +{
>> +	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
>> +	struct rte_event packets[BATCH_SIZE];
>> +
>> +	static uint64_t received;
>> +	static uint64_t last_pkts;
>> +	static uint64_t last_time;
>> +	static uint64_t start_time;
>> +	unsigned int i, j;
>> +	uint8_t dev_id = cons_data.dev_id;
>> +	uint8_t port_id = cons_data.port_id;
>> +
>> +	uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
>> +			packets, RTE_DIM(packets), 0);
>> +
>> +	if (n == 0) {
>> +		for (j = 0; j < rte_eth_dev_count(); j++)
>> +			rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
>> +		return 0;
>> +	}
>> +	if (start_time == 0)
>> +		last_time = start_time = rte_get_timer_cycles();
>> +
>> +	received += n;
>> +	for (i = 0; i < n; i++) {
>> +		uint8_t outport = packets[i].mbuf->port;
>> +		rte_eth_tx_buffer(outport, 0, tx_buf[outport],
>> +				packets[i].mbuf);
>> +	}
>> +
>> +	/* Print out mpps every 1<22 packets */
>> +	if (!quiet && received >= last_pkts + (1<<22)) {
>> +		const uint64_t now = rte_get_timer_cycles();
>> +		const uint64_t total_ms = (now - start_time) / freq_khz;
>> +		const uint64_t delta_ms = (now - last_time) / freq_khz;
>> +		uint64_t delta_pkts = received - last_pkts;
>> +
>> +		printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
>> +			"avg %.3f mpps [current %.3f mpps]\n",
>> +				received,
>> +				total_ms,
>> +				received / (total_ms * 1000.0),
>> +				delta_pkts / (delta_ms * 1000.0));
>> +		last_pkts = received;
>> +		last_time = now;
>> +	}
>> +
>> +	dequeue_cnt[0] += n;
> Not really used.

Removed

>> +
>> +	num_packets -= n;
>> +	if (num_packets <= 0)
>> +		done = 1;
>> +
>> +	return 0;
>> +}
>> +
>> +static int
>> +producer(void)
>> +{
>> +	static uint8_t eth_port;
>> +	struct rte_mbuf *mbufs[BATCH_SIZE+2];
>> +	struct rte_event ev[BATCH_SIZE+2];
>> +	uint32_t i, num_ports = prod_data.num_nic_ports;
>> +	int32_t qid = prod_data.qid;
>> +	uint8_t dev_id = prod_data.dev_id;
>> +	uint8_t port_id = prod_data.port_id;
>> +	uint32_t prio_idx = 0;
>> +
>> +	const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
>> +	if (++eth_port == num_ports)
>> +		eth_port = 0;
>> +	if (nb_rx == 0) {
>> +		rte_pause();
>> +		return 0;
>> +	}
>> +
>> +	for (i = 0; i < nb_rx; i++) {
>> +		ev[i].flow_id = mbufs[i]->hash.rss;
>> +		ev[i].op = RTE_EVENT_OP_NEW;
>> +		ev[i].sched_type = queue_type;
>> +		ev[i].queue_id = qid;
>> +		ev[i].event_type = RTE_EVENT_TYPE_ETHDEV;
>> +		ev[i].sub_event_type = 0;
>> +		ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
>> +		ev[i].mbuf = mbufs[i];
>> +		RTE_SET_USED(prio_idx);
>> +	}
>> +
>> +	const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);
>> +	if (nb_tx != nb_rx) {
>> +		for (i = nb_tx; i < nb_rx; i++)
>> +			rte_pktmbuf_free(mbufs[i]);
>> +	}
>> +	enqueue_cnt[0] += nb_tx;
> Not really used.

Removed

>> +
>> +	return 0;
>> +}
>> +
>> +
>> +static inline void
>> +work(struct rte_mbuf *m)
>> +{
>> +	struct ether_hdr *eth;
>> +	struct ether_addr addr;
>> +
>> +	/* change mac addresses on packet (to use mbuf data) */
>> +	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
>> +	ether_addr_copy(&eth->d_addr, &addr);
>> +	ether_addr_copy(&eth->s_addr, &eth->d_addr);
>> +	ether_addr_copy(&addr, &eth->s_addr);
> If it is even number of stages(say 2), Will mac swap be negated? as we are
> swapping on each stage NOT in consumer?

The mac swap is just to touch the mbuf. It does not matter if it is negated.

>> +
>> +	/* do a number of cycles of work per packet */
>> +	volatile uint64_t start_tsc = rte_rdtsc();
>> +	while (rte_rdtsc() < start_tsc + worker_cycles)
>> +		rte_pause();
>> +}
>> +
>> +static int
>> +worker(void *arg)
> Looks good.
>
>> +/*
>> + * Initializes a given port using global settings and with the RX buffers
>> + * coming from the mbuf_pool passed as a parameter.
>> + */
>> +static inline int
>> +port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> Looks good.
>
>> +static int
>> +setup_eventdev(struct prod_data *prod_data,
>> +		struct cons_data *cons_data,
>> +		struct worker_data *worker_data)
>> +{
>> +	/* final queue for sending to TX core */
>> +	if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
>> +		printf("%d: error creating qid %d\n", __LINE__, i);
>> +		return -1;
>> +	}
>> +	tx_queue.queue_id = i;
>> +	tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
>> +
>> +	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
>> +		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
> s/tx_p_conf.dequeue_depth/wkr_p_conf.dequeue_depth

done, along with other similar coding errors.

--snip--

>> +
>> +int
>> +main(int argc, char **argv)
>> +{
>> +	struct worker_data *worker_data;
>> +	unsigned int num_ports;
>> +	int lcore_id;
>> +	int err;
>> +
>> +	signal(SIGINT, signal_handler);
>> +	signal(SIGTERM, signal_handler);
>> +	signal(SIGTSTP, signal_handler);
>> +
>> +	if (!quiet) {
>> +		printf("\nPort Workload distribution:\n");
>> +		uint32_t i;
>> +		uint64_t tot_pkts = 0;
>> +		uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
>> +		for (i = 0; i < num_workers; i++) {
>> +			char statname[64];
>> +			snprintf(statname, sizeof(statname), "port_%u_rx",
>> +					worker_data[i].port_id);
>> +			pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
>> +					dev_id, statname, NULL);
> As discussed, Check the the given xstat supported on the PMD first.

Checking has now been implemented. It'd done by calling 
rte_event_dev_xstats_by_name_get()
and seeing if the result is -ENOTSUP. However there is a bug in the 
function in that it is declared
as a uint64_t, but then returns a -ENOTSUP, so I have to cast the 
-ENOTSUP as a uint64_t for
comparison. This will need to be fixed when the function is patched.

                         retval = rte_event_dev_xstats_by_name_get(
                                         dev_id, statname, NULL);
                         if (retval != (uint64_t)-ENOTSUP) {
                                 pkts_per_wkr[i] =  retval;
                                 tot_pkts += pkts_per_wkr[i];
                         }



>> +			tot_pkts += pkts_per_wkr[i];
>> +		}
>> +		for (i = 0; i < num_workers; i++) {
>> +			float pc = pkts_per_wkr[i]  * 100 /
>> +				((float)tot_pkts);
>> +			printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
>> +					i, pc, pkts_per_wkr[i]);
>> +		}
>> +
>> +	}
>> +
>> +	return 0;
>> +}
> With above changes,
>
> Jerin Jacob<jerin.jacob at caviumnetworks.com>


Thanks for the reviews.

Regards,
Dave.



More information about the dev mailing list