[dpdk-dev] Application used for DSW event_dev performance testing

Mattias Rönnblom mattias.ronnblom at ericsson.com
Wed Nov 28 18:09:02 CET 2018


On 2018-11-28 17:55, Mattias Rönnblom wrote:
> Attached is a small DSW throughput test program, that I thought might 
> help you to find the issue.

Looks like DPDK's mailman didn't like my attachment.

--

/*
  * dswtp - A simple DSW eventdev scheduler throughput demo program.
  *
  * SPDX-License-Identifier: BSD-3-Clause
  *
  * Copyright(c) 2018 Ericsson AB
  * Mattias Rönnblom <mattias.ronnblom at ericsson.com>
  */

#include <inttypes.h>
#include <stdbool.h>
#include <stdio.h>

#include <rte_atomic.h>
#include <rte_cycles.h>
#include <rte_eal.h>
#include <rte_eventdev.h>
#include <rte_lcore.h>
#include <rte_malloc.h>
#include <rte_pause.h>
#include <rte_random.h>

#define EVENT_DEV_ID (0)
#define NUM_IN_FLIGHT_EVENTS (4096)
#define EVENTDEV_MAX_EVENTS (NUM_IN_FLIGHT_EVENTS * 2)
#define EVENTDEV_PORT_NEW_THRESHOLD (NUM_IN_FLIGHT_EVENTS)
#define NUM_FLOWS (1024)

#define ITER_PER_SYNC (32)

#define DEQUEUE_BURST_SIZE (32)
#define ENQUEUE_BURST_SIZE (32)

struct worker_ctx
{
	uint8_t event_dev_id;
	uint8_t event_port_id;

	uint32_t events_to_produce;

	uint16_t num_stages;
	uint32_t stage_work;
	int64_t num_events;

	rte_atomic64_t *events_finished;
} __rte_cache_aligned;

static void
usage(const char *name)
{
	printf("%s <num-stages> <stage-proc-cycles> <num-events[M]>\n", name);
}

static int64_t
sync_event_count(rte_atomic64_t *total_events_finished,
		 uint32_t *finished_since_sync)
{
	if (*finished_since_sync > 0) {
		int64_t total;

		total = rte_atomic64_add_return(total_events_finished,
						*finished_since_sync);

		*finished_since_sync = 0;

		return total;
	} else
		return rte_atomic64_read(total_events_finished);
}

static void
cycle_consume(uint64_t work)
{
	uint64_t deadline;

	if (likely(work == 0))
	    return;

	deadline = rte_get_timer_cycles() + work;
	while (rte_get_timer_cycles() < deadline)
		rte_pause();
}

static int
worker_start(void *arg)
{
	struct worker_ctx *ctx = arg;
	uint8_t dev_id = ctx->event_dev_id;
	uint8_t port_id = ctx->event_port_id;
	uint32_t num_produced = 0;
	uint32_t finished_since_sync = 0;
	uint16_t iter_since_sync = 0;

	for (;;) {
		uint16_t dequeued;
		uint16_t i;
                 uint16_t enqueued = 0;

		if (unlikely(num_produced < ctx->events_to_produce)) {
			struct rte_event ev = {
				.op = RTE_EVENT_OP_NEW,
				.queue_id = 0,
				.sched_type = RTE_SCHED_TYPE_ATOMIC,
				.flow_id = rte_rand() % NUM_FLOWS
			};
			if (rte_event_enqueue_new_burst(dev_id, port_id,
							&ev, 1) == 1)
				num_produced++;
		}

		struct rte_event evs[DEQUEUE_BURST_SIZE];
		dequeued = rte_event_dequeue_burst(dev_id, port_id, evs,
						     DEQUEUE_BURST_SIZE, 0);

		for (i = 0; i < dequeued; i++) {
			struct rte_event *ev = &evs[i];
			uint16_t this_stage = ev->queue_id;
			uint16_t next_stage_num = this_stage + 1;

			cycle_consume(ctx->stage_work);

			ev->op = RTE_EVENT_OP_FORWARD;

			if (next_stage_num == ctx->num_stages) {
				finished_since_sync++;
				ev->queue_id = 0;
			} else
				ev->queue_id = next_stage_num;
		}

                 do {
			uint16_t left = dequeued - enqueued;
			uint16_t burst_size =
				RTE_MIN(left, ENQUEUE_BURST_SIZE);
			enqueued +=
				rte_event_enqueue_burst(dev_id, port_id,
							evs+enqueued,
							burst_size);
                 } while (unlikely(enqueued != dequeued));

		iter_since_sync++;
		if (unlikely(iter_since_sync == ITER_PER_SYNC)) {
			int64_t total =
				sync_event_count(ctx->events_finished,
						 &finished_since_sync);
			if (total >= ctx->num_events)
				break;
			iter_since_sync = 0;
		}
	}

	return 0;
}

static void
setup_event_dev(uint16_t num_stages, struct worker_ctx *worker_ctxs,
		unsigned num_workers)
{
	unsigned i;
	struct rte_event_dev_info dev_info;

	for (i=0; i < num_workers; i++)
		worker_ctxs[i].event_dev_id = EVENT_DEV_ID;

	rte_event_dev_info_get(EVENT_DEV_ID, &dev_info);

	struct rte_event_dev_config config = {
		.nb_event_queues = num_stages,
		.nb_event_ports = num_workers,
		.nb_events_limit = EVENTDEV_MAX_EVENTS,
		.nb_event_queue_flows = dev_info.max_event_queue_flows,
		.nb_event_port_dequeue_depth = DEQUEUE_BURST_SIZE,
		.nb_event_port_enqueue_depth = ENQUEUE_BURST_SIZE
	};

	int rc = rte_event_dev_configure(EVENT_DEV_ID, &config);
	if (rc)
		rte_panic("Failed to configure the event dev\n");

	struct rte_event_queue_conf queue_config = {
		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
	};

	for (i=0; i<num_stages; i++) {
		uint8_t queue_id = i;
		queue_config.schedule_type = RTE_SCHED_TYPE_ATOMIC;
		queue_config.nb_atomic_flows = NUM_FLOWS;
		queue_config.nb_atomic_order_sequences = NUM_FLOWS;

		if (rte_event_queue_setup(EVENT_DEV_ID, queue_id,
                                           &queue_config))
			rte_panic("Unable to setup queue %d\n", queue_id);
	}

	struct rte_event_port_conf port_config = {
		.new_event_threshold = EVENTDEV_PORT_NEW_THRESHOLD,
		.dequeue_depth = DEQUEUE_BURST_SIZE,
		.enqueue_depth = ENQUEUE_BURST_SIZE
	};

	for (i=0; i<num_workers; i++) {
		uint8_t event_port_id = i;
		worker_ctxs[i].event_port_id = event_port_id;
		if (rte_event_port_setup(EVENT_DEV_ID, event_port_id,
					 &port_config) < 0)
			rte_panic("Failed to create worker port #%d\n",
				  event_port_id);
	}

	for (i=0; i<num_workers; i++) {
		uint8_t event_port_id = i;
		if (rte_event_port_link(EVENT_DEV_ID, event_port_id,
					NULL, NULL, 0)
		    != (int)num_stages)
			rte_panic("Failed to map worker ports\n");
	}

	if (rte_event_dev_start(EVENT_DEV_ID))
		rte_panic("Unable to start eventdev\n");
}

static double
tsc_to_s(uint64_t tsc)
{
	return (double)tsc/(double)rte_get_timer_hz();
}

int main(int argc, char *argv[])
{
	int rc;
	unsigned i;
	unsigned num_workers;
	uint16_t num_stages;
	uint32_t stage_work;
	int64_t num_events;
	struct worker_ctx *worker_ctxs;
	rte_atomic64_t *events_finished;
	unsigned lcore_id;
	uint64_t start;
	uint64_t latency;
	uint64_t ideal_latency;

	rc = rte_eal_init(argc, argv);
	if (rc < 0)
		rte_panic("Invalid EAL arguments\n");

	argc -= rc;
	argv += rc;

	if (argc != 4) {
		usage(argv[0]);
		exit(EXIT_FAILURE);
	}

	num_stages = atoi(argv[1]);
	stage_work = atoi(argv[2]);
	num_events = atof(argv[3]) * 1e6;

	num_workers = rte_lcore_count();

	worker_ctxs = rte_malloc("worker-ctx",
				 sizeof(struct worker_ctx) * num_workers,
				 RTE_CACHE_LINE_SIZE);
	events_finished = rte_malloc("finished-events", sizeof(rte_atomic64_t),
				   RTE_CACHE_LINE_SIZE);

	if (worker_ctxs == NULL || events_finished == NULL)
		rte_panic("Unable to allocate memory\n");

	rte_atomic64_init(events_finished);

	for (i=0; i<num_workers; i++) {
		struct worker_ctx *w = &worker_ctxs[i];
		*w = (struct worker_ctx) {
			.event_dev_id = EVENT_DEV_ID,
			.event_port_id = i,
			.events_to_produce = NUM_IN_FLIGHT_EVENTS/num_workers,
			.num_stages = num_stages,
			.stage_work = stage_work,
			.num_events = num_events,
			.events_finished = events_finished
		};
	}

	setup_event_dev(num_stages, worker_ctxs, num_workers);

	start = rte_get_timer_cycles();
	rte_compiler_barrier();

	i = 0;
	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
		if (rte_eal_remote_launch(worker_start, &(worker_ctxs[i]),
					  lcore_id))
			rte_panic("Failed to launch worker");
		i++;
	}

	worker_start(&worker_ctxs[num_workers-1]);

	rte_eal_mp_wait_lcore();

	rte_compiler_barrier();
	latency = rte_get_timer_cycles() - start;
	ideal_latency = (stage_work * num_stages * num_events) / num_workers;

	printf("Workers: %d\n", num_workers);
	printf("Stages: %d\n", num_stages);
	printf("Per-stage application processing: %d TSC cycles\n",
                stage_work);
	printf("Events: %"PRId64" M\n", num_events/1000000);
	if (stage_work > 0)
		printf("Ideal latency: %.2f s\n", tsc_to_s(ideal_latency));
	printf("Actual latency: %.2f s\n", tsc_to_s(latency));

	if (stage_work > 0)
		printf("Ideal scheduling rate: %.2f M events/s\n",
		       (num_events*num_stages)/tsc_to_s(ideal_latency)/1e6);
	printf("Actual scheduling rate: %.2f M events/s\n",
	       (num_events*num_stages)/tsc_to_s(latency)/1e6);

	if (stage_work > 0) {
		uint64_t per_stage_oh =
			(latency - ideal_latency) / (num_events * num_stages);
		printf("Scheduling overhead: %"PRId64" TSC cycles/stage\n",
		       per_stage_oh);
	}

	rte_event_dev_stop(EVENT_DEV_ID);

	rte_exit(0, NULL);
}


More information about the dev mailing list