[dpdk-dev] [PATCH v6 11/21] event/sw: add scheduling logic

Hunt, David david.hunt at intel.com
Thu Mar 30 12:07:11 CEST 2017


On 30/3/2017 12:25 AM, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richardson at intel.com>
>
> Add in the scheduling function which takes the events from the
> producer queues and buffers them before scheduling them to consumer
> queues. The scheduling logic includes support for atomic, reordered,
> and parallel scheduling of flows.
>
> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
>
> ---
>
> v6:
> - Fix handling of event priority normalization (Jerin)
> ---
>   drivers/event/sw/Makefile             |   1 +
>   drivers/event/sw/sw_evdev.c           |   1 +
>   drivers/event/sw/sw_evdev.h           |  11 +
>   drivers/event/sw/sw_evdev_scheduler.c | 601 ++++++++++++++++++++++++++++++++++
>   4 files changed, 614 insertions(+)
>   create mode 100644 drivers/event/sw/sw_evdev_scheduler.c
>
> diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
> index b6ecd91..a7f5b3d 100644
> --- a/drivers/event/sw/Makefile
> +++ b/drivers/event/sw/Makefile
> @@ -54,6 +54,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
>   # library source files
>   SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
>   SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_scheduler.c
>   
>   # export include files
>   SYMLINK-y-include +=
> diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
> index 2c28547..f91a04b 100644
> --- a/drivers/event/sw/sw_evdev.c
> +++ b/drivers/event/sw/sw_evdev.c
> @@ -557,6 +557,7 @@ sw_probe(const char *name, const char *params)
>   	dev->enqueue_burst = sw_event_enqueue_burst;
>   	dev->dequeue = sw_event_dequeue;
>   	dev->dequeue_burst = sw_event_dequeue_burst;
> +	dev->schedule = sw_event_schedule;
>   
>   	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
>   		return 0;
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index ab372fd..7c157c7 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -248,8 +248,18 @@ struct sw_evdev {
>   	/* Cache how many packets are in each cq */
>   	uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned;
>   
> +	/* Array of pointers to load-balanced QIDs sorted by priority level */
> +	struct sw_qid *qids_prioritized[RTE_EVENT_MAX_QUEUES_PER_DEV];
> +
> +	/* Stats */
> +	struct sw_point_stats stats __rte_cache_aligned;
> +	uint64_t sched_called;
>   	int32_t sched_quanta;
> +	uint64_t sched_no_iq_enqueues;
> +	uint64_t sched_no_cq_enqueues;
> +	uint64_t sched_cq_qid_called;
>   
> +	uint8_t started;
>   	uint32_t credit_update_quanta;
>   };
>   
> @@ -272,5 +282,6 @@ uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[],
>   uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
>   uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
>   			uint64_t wait);
> +void sw_event_schedule(struct rte_eventdev *dev);
>   
>   #endif /* _SW_EVDEV_H_ */
> diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c
> new file mode 100644
> index 0000000..c0fe6a3
> --- /dev/null
> +++ b/drivers/event/sw/sw_evdev_scheduler.c
> @@ -0,0 +1,601 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <rte_ring.h>
> +#include <rte_hash_crc.h>
> +#include "sw_evdev.h"
> +#include "iq_ring.h"
> +#include "event_ring.h"
> +
> +#define SW_IQS_MASK (SW_IQS_MAX-1)
> +
> +/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
> + * CLZ twice is faster than caching the value due to data dependencies
> + */
> +#define PKT_MASK_TO_IQ(pkts) \
> +	(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
> +
> +#if SW_IQS_MAX != 4
> +#error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
> +#endif
> +#define PRIO_TO_IQ(prio) (prio >> 6)
> +
> +#define MAX_PER_IQ_DEQUEUE 48
> +#define FLOWID_MASK (SW_QID_NUM_FIDS-1)
> +
> +static inline uint32_t
> +sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
> +		uint32_t iq_num, unsigned int count)
> +{
> +	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
> +	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
> +	uint32_t nb_blocked = 0;
> +	uint32_t i;
> +
> +	if (count > MAX_PER_IQ_DEQUEUE)
> +		count = MAX_PER_IQ_DEQUEUE;
> +
> +	/* This is the QID ID. The QID ID is static, hence it can be
> +	 * used to identify the stage of processing in history lists etc
> +	 */
> +	uint32_t qid_id = qid->id;
> +
> +	iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
> +	for (i = 0; i < count; i++) {
> +		const struct rte_event *qe = &qes[i];
> +		/* use cheap bit mixing, we only need to lose a few bits */
> +		uint32_t flow_id32 = (qes[i].flow_id) ^ (qes[i].flow_id >> 10);
> +		const uint16_t flow_id = FLOWID_MASK & flow_id32;
> +		struct sw_fid_t *fid = &qid->fids[flow_id];
> +		int cq = fid->cq;
> +
> +		if (cq < 0) {
> +			uint32_t cq_idx = qid->cq_next_tx++;
> +			if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
> +				qid->cq_next_tx = 0;
> +			cq = qid->cq_map[cq_idx];
> +
> +			/* find least used */
> +			int cq_free_cnt = sw->cq_ring_space[cq];
> +			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
> +					cq_idx++) {
> +				int test_cq = qid->cq_map[cq_idx];
> +				int test_cq_free = sw->cq_ring_space[test_cq];
> +				if (test_cq_free > cq_free_cnt) {
> +					cq = test_cq;
> +					cq_free_cnt = test_cq_free;
> +				}
> +			}
> +
> +			fid->cq = cq; /* this pins early */
> +		}
> +
> +		if (sw->cq_ring_space[cq] == 0 ||
> +				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
> +			blocked_qes[nb_blocked++] = *qe;
> +			continue;
> +		}
> +
> +		struct sw_port *p = &sw->ports[cq];
> +
> +		/* at this point we can queue up the packet on the cq_buf */
> +		fid->pcount++;
> +		p->cq_buf[p->cq_buf_count++] = *qe;
> +		p->inflights++;
> +		sw->cq_ring_space[cq]--;
> +
> +		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
> +		p->hist_list[head].fid = flow_id;
> +		p->hist_list[head].qid = qid_id;
> +
> +		p->stats.tx_pkts++;
> +		qid->stats.tx_pkts++;
> +
> +		/* if we just filled in the last slot, flush the buffer */
> +		if (sw->cq_ring_space[cq] == 0) {
> +			struct qe_ring *worker = p->cq_worker_ring;
> +			qe_ring_enqueue_burst(worker, p->cq_buf,
> +					p->cq_buf_count,
> +					&sw->cq_ring_space[cq]);
> +			p->cq_buf_count = 0;
> +		}
> +	}
> +	iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
> +
> +	return count - nb_blocked;
> +}
> +
> +static inline uint32_t
> +sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
> +		uint32_t iq_num, unsigned int count, int keep_order)
> +{
> +	uint32_t i;
> +	uint32_t cq_idx = qid->cq_next_tx;
> +
> +	/* This is the QID ID. The QID ID is static, hence it can be
> +	 * used to identify the stage of processing in history lists etc
> +	 */
> +	uint32_t qid_id = qid->id;
> +
> +	if (count > MAX_PER_IQ_DEQUEUE)
> +		count = MAX_PER_IQ_DEQUEUE;
> +
> +	if (keep_order)
> +		/* only schedule as many as we have reorder buffer entries */
> +		count = RTE_MIN(count,
> +				rte_ring_count(qid->reorder_buffer_freelist));
> +
> +	for (i = 0; i < count; i++) {
> +		const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
> +		uint32_t cq_check_count = 0;
> +		uint32_t cq;
> +
> +		/*
> +		 *  for parallel, just send to next available CQ in round-robin
> +		 * fashion. So scan for an available CQ. If all CQs are full
> +		 * just return and move on to next QID
> +		 */
> +		do {
> +			if (++cq_check_count > qid->cq_num_mapped_cqs)
> +				goto exit;
> +			cq = qid->cq_map[cq_idx];
> +			if (++cq_idx == qid->cq_num_mapped_cqs)
> +				cq_idx = 0;
> +		} while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
> +				sw->ports[cq].inflights == SW_PORT_HIST_LIST);
> +
> +		struct sw_port *p = &sw->ports[cq];
> +		if (sw->cq_ring_space[cq] == 0 ||
> +				p->inflights == SW_PORT_HIST_LIST)
> +			break;
> +
> +		sw->cq_ring_space[cq]--;
> +
> +		qid->stats.tx_pkts++;
> +
> +		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
> +
> +		p->hist_list[head].fid = qe->flow_id;
> +		p->hist_list[head].qid = qid_id;
> +
> +		if (keep_order)
> +			rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
> +					(void *)&p->hist_list[head].rob_entry);
> +
> +		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
> +		iq_ring_pop(qid->iq[iq_num]);
> +
> +		rte_compiler_barrier();
> +		p->inflights++;
> +		p->stats.tx_pkts++;
> +		p->hist_head++;
> +	}
> +exit:
> +	qid->cq_next_tx = cq_idx;
> +	return i;
> +}
> +
> +static uint32_t
> +sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
> +		uint32_t iq_num, unsigned int count __rte_unused)
> +{
> +	uint32_t cq_id = qid->cq_map[0];
> +	struct sw_port *port = &sw->ports[cq_id];
> +
> +	/* get max burst enq size for cq_ring */
> +	uint32_t count_free = sw->cq_ring_space[cq_id];
> +	if (count_free == 0)
> +		return 0;
> +
> +	/* burst dequeue from the QID IQ ring */
> +	struct iq_ring *ring = qid->iq[iq_num];
> +	uint32_t ret = iq_ring_dequeue_burst(ring,
> +			&port->cq_buf[port->cq_buf_count], count_free);
> +	port->cq_buf_count += ret;
> +
> +	/* Update QID, Port and Total TX stats */
> +	qid->stats.tx_pkts += ret;
> +	port->stats.tx_pkts += ret;
> +
> +	/* Subtract credits from cached value */
> +	sw->cq_ring_space[cq_id] -= ret;
> +
> +	return ret;
> +}
> +
> +static uint32_t
> +sw_schedule_qid_to_cq(struct sw_evdev *sw)
> +{
> +	uint32_t pkts = 0;
> +	uint32_t qid_idx;
> +
> +	sw->sched_cq_qid_called++;
> +
> +	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
> +		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
> +
> +		int type = qid->type;
> +		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
> +
> +		/* zero mapped CQs indicates directed */
> +		if (iq_num >= SW_IQS_MAX)
> +			continue;
> +
> +		uint32_t pkts_done = 0;
> +		uint32_t count = iq_ring_count(qid->iq[iq_num]);
> +
> +		if (count > 0) {
> +			if (type == SW_SCHED_TYPE_DIRECT)
> +				pkts_done += sw_schedule_dir_to_cq(sw, qid,
> +						iq_num, count);
> +			else if (type == RTE_SCHED_TYPE_ATOMIC)
> +				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
> +						iq_num, count);
> +			else
> +				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
> +						iq_num, count,
> +						type == RTE_SCHED_TYPE_ORDERED);
> +		}
> +
> +		/* Check if the IQ that was polled is now empty, and unset it
> +		 * in the IQ mask if its empty.
> +		 */
> +		int all_done = (pkts_done == count);
> +
> +		qid->iq_pkt_mask &= ~(all_done << (iq_num));
> +		pkts += pkts_done;
> +	}
> +
> +	return pkts;
> +}
> +
> +/* This function will perform re-ordering of packets, and injecting into
> + * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
> + * contiguous in that array, this function accepts a "range" of QIDs to scan.
> + */
> +static uint16_t
> +sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
> +{
> +	/* Perform egress reordering */
> +	struct rte_event *qe;
> +	uint32_t pkts_iter = 0;
> +
> +	for (; qid_start < qid_end; qid_start++) {
> +		struct sw_qid *qid = &sw->qids[qid_start];
> +		int i, num_entries_in_use;
> +
> +		if (qid->type != RTE_SCHED_TYPE_ORDERED)
> +			continue;
> +
> +		num_entries_in_use = rte_ring_free_count(
> +					qid->reorder_buffer_freelist);
> +
> +		for (i = 0; i < num_entries_in_use; i++) {
> +			struct reorder_buffer_entry *entry;
> +			int j;
> +
> +			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
> +
> +			if (!entry->ready)
> +				break;
> +
> +			for (j = 0; j < entry->num_fragments; j++) {
> +				uint16_t dest_qid;
> +				uint16_t dest_iq;
> +
> +				int idx = entry->fragment_index + j;
> +				qe = &entry->fragments[idx];
> +
> +				dest_qid = qe->queue_id;
> +				dest_iq  = PRIO_TO_IQ(qe->priority);
> +
> +				if (dest_qid >= sw->qid_count) {
> +					sw->stats.rx_dropped++;
> +					continue;
> +				}
> +
> +				struct sw_qid *dest_qid_ptr =
> +					&sw->qids[dest_qid];
> +				const struct iq_ring *dest_iq_ptr =
> +					dest_qid_ptr->iq[dest_iq];
> +				if (iq_ring_free_count(dest_iq_ptr) == 0)
> +					break;
> +
> +				pkts_iter++;
> +
> +				struct sw_qid *q = &sw->qids[dest_qid];
> +				struct iq_ring *r = q->iq[dest_iq];
> +
> +				/* we checked for space above, so enqueue must
> +				 * succeed
> +				 */
> +				iq_ring_enqueue(r, qe);
> +				q->iq_pkt_mask |= (1 << (dest_iq));
> +				q->iq_pkt_count[dest_iq]++;
> +				q->stats.rx_pkts++;
> +			}
> +
> +			entry->ready = (j != entry->num_fragments);
> +			entry->num_fragments -= j;
> +			entry->fragment_index += j;
> +
> +			if (!entry->ready) {
> +				entry->fragment_index = 0;
> +
> +				rte_ring_sp_enqueue(
> +						qid->reorder_buffer_freelist,
> +						entry);
> +
> +				qid->reorder_buffer_index++;
> +				qid->reorder_buffer_index %= qid->window_size;
> +			}
> +		}
> +	}
> +	return pkts_iter;
> +}
> +
> +static inline void __attribute__((always_inline))
> +sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
> +{
> +	RTE_SET_USED(sw);
> +	struct qe_ring *worker = port->rx_worker_ring;
> +	port->pp_buf_start = 0;
> +	port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
> +			RTE_DIM(port->pp_buf));
> +}
> +
> +static inline uint32_t __attribute__((always_inline))
> +__pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
> +{
> +	static const struct reorder_buffer_entry dummy_rob;
> +	uint32_t pkts_iter = 0;
> +	struct sw_port *port = &sw->ports[port_id];
> +
> +	/* If shadow ring has 0 pkts, pull from worker ring */
> +	if (port->pp_buf_count == 0)
> +		sw_refill_pp_buf(sw, port);
> +
> +	while (port->pp_buf_count) {
> +		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
> +		struct sw_hist_list_entry *hist_entry = NULL;
> +		uint8_t flags = qe->op;
> +		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
> +		int needs_reorder = 0;
> +		/* if no-reordering, having PARTIAL == NEW */
> +		if (!allow_reorder && !eop)
> +			flags = QE_FLAG_VALID;
> +
> +		/*
> +		 * if we don't have space for this packet in an IQ,
> +		 * then move on to next queue. Technically, for a
> +		 * packet that needs reordering, we don't need to check
> +		 * here, but it simplifies things not to special-case
> +		 */
> +		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
> +		struct sw_qid *qid = &sw->qids[qe->queue_id];
> +
> +		if ((flags & QE_FLAG_VALID) &&
> +				iq_ring_free_count(qid->iq[iq_num]) == 0)
> +			break;
> +
> +		/* now process based on flags. Note that for directed
> +		 * queues, the enqueue_flush masks off all but the
> +		 * valid flag. This makes FWD and PARTIAL enqueues just
> +		 * NEW type, and makes DROPS no-op calls.
> +		 */
> +		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
> +			const uint32_t hist_tail = port->hist_tail &
> +					(SW_PORT_HIST_LIST - 1);
> +
> +			hist_entry = &port->hist_list[hist_tail];
> +			const uint32_t hist_qid = hist_entry->qid;
> +			const uint32_t hist_fid = hist_entry->fid;
> +
> +			struct sw_fid_t *fid =
> +				&sw->qids[hist_qid].fids[hist_fid];
> +			fid->pcount -= eop;
> +			if (fid->pcount == 0)
> +				fid->cq = -1;
> +
> +			if (allow_reorder) {
> +				/* set reorder ready if an ordered QID */
> +				uintptr_t rob_ptr =
> +					(uintptr_t)hist_entry->rob_entry;
> +				const uintptr_t valid = (rob_ptr != 0);
> +				needs_reorder = valid;
> +				rob_ptr |=
> +					((valid - 1) & (uintptr_t)&dummy_rob);
> +				struct reorder_buffer_entry *tmp_rob_ptr =
> +					(struct reorder_buffer_entry *)rob_ptr;
> +				tmp_rob_ptr->ready = eop * needs_reorder;
> +			}
> +
> +			port->inflights -= eop;
> +			port->hist_tail += eop;
> +		}
> +		if (flags & QE_FLAG_VALID) {
> +			port->stats.rx_pkts++;
> +
> +			if (allow_reorder && needs_reorder) {
> +				struct reorder_buffer_entry *rob_entry =
> +						hist_entry->rob_entry;
> +
> +				/* Although fragmentation not currently
> +				 * supported by eventdev API, we support it
> +				 * here. Open: How do we alert the user that
> +				 * they've exceeded max frags?
> +				 */
> +				int num_frag = rob_entry->num_fragments;
> +				if (num_frag == SW_FRAGMENTS_MAX)
> +					sw->stats.rx_dropped++;
> +				else {
> +					int idx = rob_entry->num_fragments++;
> +					rob_entry->fragments[idx] = *qe;
> +				}
> +				goto end_qe;
> +			}
> +
> +			/* Use the iq_num from above to push the QE
> +			 * into the qid at the right priority
> +			 */
> +
> +			qid->iq_pkt_mask |= (1 << (iq_num));
> +			iq_ring_enqueue(qid->iq[iq_num], qe);
> +			qid->iq_pkt_count[iq_num]++;
> +			qid->stats.rx_pkts++;
> +			pkts_iter++;
> +		}
> +
> +end_qe:
> +		port->pp_buf_start++;
> +		port->pp_buf_count--;
> +	} /* while (avail_qes) */
> +
> +	return pkts_iter;
> +}
> +
> +static uint32_t
> +sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
> +{
> +	return __pull_port_lb(sw, port_id, 1);
> +}
> +
> +static uint32_t
> +sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
> +{
> +	return __pull_port_lb(sw, port_id, 0);
> +}
> +
> +static uint32_t
> +sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
> +{
> +	uint32_t pkts_iter = 0;
> +	struct sw_port *port = &sw->ports[port_id];
> +
> +	/* If shadow ring has 0 pkts, pull from worker ring */
> +	if (port->pp_buf_count == 0)
> +		sw_refill_pp_buf(sw, port);
> +
> +	while (port->pp_buf_count) {
> +		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
> +		uint8_t flags = qe->op;
> +
> +		if ((flags & QE_FLAG_VALID) == 0)
> +			goto end_qe;
> +
> +		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
> +		struct sw_qid *qid = &sw->qids[qe->queue_id];
> +		struct iq_ring *iq_ring = qid->iq[iq_num];
> +
> +		if (iq_ring_free_count(iq_ring) == 0)
> +			break; /* move to next port */
> +
> +		port->stats.rx_pkts++;
> +
> +		/* Use the iq_num from above to push the QE
> +		 * into the qid at the right priority
> +		 */
> +		qid->iq_pkt_mask |= (1 << (iq_num));
> +		iq_ring_enqueue(iq_ring, qe);
> +		qid->iq_pkt_count[iq_num]++;
> +		qid->stats.rx_pkts++;
> +		pkts_iter++;
> +
> +end_qe:
> +		port->pp_buf_start++;
> +		port->pp_buf_count--;
> +	} /* while port->pp_buf_count */
> +
> +	return pkts_iter;
> +}
> +
> +void
> +sw_event_schedule(struct rte_eventdev *dev)
> +{
> +	struct sw_evdev *sw = sw_pmd_priv(dev);
> +	uint32_t in_pkts, out_pkts;
> +	uint32_t out_pkts_total = 0, in_pkts_total = 0;
> +	int32_t sched_quanta = sw->sched_quanta;
> +	uint32_t i;
> +
> +	sw->sched_called++;
> +	if (!sw->started)
> +		return;
> +
> +	do {
> +		uint32_t in_pkts_this_iteration = 0;
> +
> +		/* Pull from rx_ring for ports */
> +		do {
> +			in_pkts = 0;
> +			for (i = 0; i < sw->port_count; i++)
> +				if (sw->ports[i].is_directed)
> +					in_pkts += sw_schedule_pull_port_dir(sw, i);
> +				else if (sw->ports[i].num_ordered_qids > 0)
> +					in_pkts += sw_schedule_pull_port_lb(sw, i);
> +				else
> +					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
> +
> +			/* QID scan for re-ordered */
> +			in_pkts += sw_schedule_reorder(sw, 0,
> +					sw->qid_count);
> +			in_pkts_this_iteration += in_pkts;
> +		} while (in_pkts > 4 &&
> +				(int)in_pkts_this_iteration < sched_quanta);
> +
> +		out_pkts = 0;
> +		out_pkts += sw_schedule_qid_to_cq(sw);
> +		out_pkts_total += out_pkts;
> +		in_pkts_total += in_pkts_this_iteration;
> +
> +		if (in_pkts == 0 && out_pkts == 0)
> +			break;
> +	} while ((int)out_pkts_total < sched_quanta);
> +
> +	/* push all the internal buffered QEs in port->cq_ring to the
> +	 * worker cores: aka, do the ring transfers batched.
> +	 */
> +	for (i = 0; i < sw->port_count; i++) {
> +		struct qe_ring *worker = sw->ports[i].cq_worker_ring;
> +		qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
> +				sw->ports[i].cq_buf_count,
> +				&sw->cq_ring_space[i]);
> +		sw->ports[i].cq_buf_count = 0;
> +	}
> +
> +	sw->stats.tx_pkts += out_pkts_total;
> +	sw->stats.rx_pkts += in_pkts_total;
> +
> +	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
> +	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
> +
> +}

There's a couple of line-length issues in checkpatch, but the indentation
makes it very difficult to resolve, so I would suggest they're OK as 
they are. So,

Acked-by: David Hunt <david.hunt at intel.com>



More information about the dev mailing list