[dpdk-dev] [PATCH v6 11/21] event/sw: add scheduling logic
Hunt, David
david.hunt at intel.com
Thu Mar 30 12:07:11 CEST 2017
On 30/3/2017 12:25 AM, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richardson at intel.com>
>
> Add in the scheduling function which takes the events from the
> producer queues and buffers them before scheduling them to consumer
> queues. The scheduling logic includes support for atomic, reordered,
> and parallel scheduling of flows.
>
> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
> Signed-off-by: Gage Eads <gage.eads at intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren at intel.com>
>
> ---
>
> v6:
> - Fix handling of event priority normalization (Jerin)
> ---
> drivers/event/sw/Makefile | 1 +
> drivers/event/sw/sw_evdev.c | 1 +
> drivers/event/sw/sw_evdev.h | 11 +
> drivers/event/sw/sw_evdev_scheduler.c | 601 ++++++++++++++++++++++++++++++++++
> 4 files changed, 614 insertions(+)
> create mode 100644 drivers/event/sw/sw_evdev_scheduler.c
>
> diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
> index b6ecd91..a7f5b3d 100644
> --- a/drivers/event/sw/Makefile
> +++ b/drivers/event/sw/Makefile
> @@ -54,6 +54,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
> # library source files
> SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
> SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_scheduler.c
>
> # export include files
> SYMLINK-y-include +=
> diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
> index 2c28547..f91a04b 100644
> --- a/drivers/event/sw/sw_evdev.c
> +++ b/drivers/event/sw/sw_evdev.c
> @@ -557,6 +557,7 @@ sw_probe(const char *name, const char *params)
> dev->enqueue_burst = sw_event_enqueue_burst;
> dev->dequeue = sw_event_dequeue;
> dev->dequeue_burst = sw_event_dequeue_burst;
> + dev->schedule = sw_event_schedule;
>
> if (rte_eal_process_type() != RTE_PROC_PRIMARY)
> return 0;
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index ab372fd..7c157c7 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -248,8 +248,18 @@ struct sw_evdev {
> /* Cache how many packets are in each cq */
> uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned;
>
> + /* Array of pointers to load-balanced QIDs sorted by priority level */
> + struct sw_qid *qids_prioritized[RTE_EVENT_MAX_QUEUES_PER_DEV];
> +
> + /* Stats */
> + struct sw_point_stats stats __rte_cache_aligned;
> + uint64_t sched_called;
> int32_t sched_quanta;
> + uint64_t sched_no_iq_enqueues;
> + uint64_t sched_no_cq_enqueues;
> + uint64_t sched_cq_qid_called;
>
> + uint8_t started;
> uint32_t credit_update_quanta;
> };
>
> @@ -272,5 +282,6 @@ uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[],
> uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
> uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
> uint64_t wait);
> +void sw_event_schedule(struct rte_eventdev *dev);
>
> #endif /* _SW_EVDEV_H_ */
> diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c
> new file mode 100644
> index 0000000..c0fe6a3
> --- /dev/null
> +++ b/drivers/event/sw/sw_evdev_scheduler.c
> @@ -0,0 +1,601 @@
> +/*-
> + * BSD LICENSE
> + *
> + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + * * Redistributions of source code must retain the above copyright
> + * notice, this list of conditions and the following disclaimer.
> + * * Redistributions in binary form must reproduce the above copyright
> + * notice, this list of conditions and the following disclaimer in
> + * the documentation and/or other materials provided with the
> + * distribution.
> + * * Neither the name of Intel Corporation nor the names of its
> + * contributors may be used to endorse or promote products derived
> + * from this software without specific prior written permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <rte_ring.h>
> +#include <rte_hash_crc.h>
> +#include "sw_evdev.h"
> +#include "iq_ring.h"
> +#include "event_ring.h"
> +
> +#define SW_IQS_MASK (SW_IQS_MAX-1)
> +
> +/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
> + * CLZ twice is faster than caching the value due to data dependencies
> + */
> +#define PKT_MASK_TO_IQ(pkts) \
> + (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
> +
> +#if SW_IQS_MAX != 4
> +#error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
> +#endif
> +#define PRIO_TO_IQ(prio) (prio >> 6)
> +
> +#define MAX_PER_IQ_DEQUEUE 48
> +#define FLOWID_MASK (SW_QID_NUM_FIDS-1)
> +
> +static inline uint32_t
> +sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
> + uint32_t iq_num, unsigned int count)
> +{
> + struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
> + struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
> + uint32_t nb_blocked = 0;
> + uint32_t i;
> +
> + if (count > MAX_PER_IQ_DEQUEUE)
> + count = MAX_PER_IQ_DEQUEUE;
> +
> + /* This is the QID ID. The QID ID is static, hence it can be
> + * used to identify the stage of processing in history lists etc
> + */
> + uint32_t qid_id = qid->id;
> +
> + iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
> + for (i = 0; i < count; i++) {
> + const struct rte_event *qe = &qes[i];
> + /* use cheap bit mixing, we only need to lose a few bits */
> + uint32_t flow_id32 = (qes[i].flow_id) ^ (qes[i].flow_id >> 10);
> + const uint16_t flow_id = FLOWID_MASK & flow_id32;
> + struct sw_fid_t *fid = &qid->fids[flow_id];
> + int cq = fid->cq;
> +
> + if (cq < 0) {
> + uint32_t cq_idx = qid->cq_next_tx++;
> + if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
> + qid->cq_next_tx = 0;
> + cq = qid->cq_map[cq_idx];
> +
> + /* find least used */
> + int cq_free_cnt = sw->cq_ring_space[cq];
> + for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
> + cq_idx++) {
> + int test_cq = qid->cq_map[cq_idx];
> + int test_cq_free = sw->cq_ring_space[test_cq];
> + if (test_cq_free > cq_free_cnt) {
> + cq = test_cq;
> + cq_free_cnt = test_cq_free;
> + }
> + }
> +
> + fid->cq = cq; /* this pins early */
> + }
> +
> + if (sw->cq_ring_space[cq] == 0 ||
> + sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
> + blocked_qes[nb_blocked++] = *qe;
> + continue;
> + }
> +
> + struct sw_port *p = &sw->ports[cq];
> +
> + /* at this point we can queue up the packet on the cq_buf */
> + fid->pcount++;
> + p->cq_buf[p->cq_buf_count++] = *qe;
> + p->inflights++;
> + sw->cq_ring_space[cq]--;
> +
> + int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
> + p->hist_list[head].fid = flow_id;
> + p->hist_list[head].qid = qid_id;
> +
> + p->stats.tx_pkts++;
> + qid->stats.tx_pkts++;
> +
> + /* if we just filled in the last slot, flush the buffer */
> + if (sw->cq_ring_space[cq] == 0) {
> + struct qe_ring *worker = p->cq_worker_ring;
> + qe_ring_enqueue_burst(worker, p->cq_buf,
> + p->cq_buf_count,
> + &sw->cq_ring_space[cq]);
> + p->cq_buf_count = 0;
> + }
> + }
> + iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
> +
> + return count - nb_blocked;
> +}
> +
> +static inline uint32_t
> +sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
> + uint32_t iq_num, unsigned int count, int keep_order)
> +{
> + uint32_t i;
> + uint32_t cq_idx = qid->cq_next_tx;
> +
> + /* This is the QID ID. The QID ID is static, hence it can be
> + * used to identify the stage of processing in history lists etc
> + */
> + uint32_t qid_id = qid->id;
> +
> + if (count > MAX_PER_IQ_DEQUEUE)
> + count = MAX_PER_IQ_DEQUEUE;
> +
> + if (keep_order)
> + /* only schedule as many as we have reorder buffer entries */
> + count = RTE_MIN(count,
> + rte_ring_count(qid->reorder_buffer_freelist));
> +
> + for (i = 0; i < count; i++) {
> + const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
> + uint32_t cq_check_count = 0;
> + uint32_t cq;
> +
> + /*
> + * for parallel, just send to next available CQ in round-robin
> + * fashion. So scan for an available CQ. If all CQs are full
> + * just return and move on to next QID
> + */
> + do {
> + if (++cq_check_count > qid->cq_num_mapped_cqs)
> + goto exit;
> + cq = qid->cq_map[cq_idx];
> + if (++cq_idx == qid->cq_num_mapped_cqs)
> + cq_idx = 0;
> + } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
> + sw->ports[cq].inflights == SW_PORT_HIST_LIST);
> +
> + struct sw_port *p = &sw->ports[cq];
> + if (sw->cq_ring_space[cq] == 0 ||
> + p->inflights == SW_PORT_HIST_LIST)
> + break;
> +
> + sw->cq_ring_space[cq]--;
> +
> + qid->stats.tx_pkts++;
> +
> + const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
> +
> + p->hist_list[head].fid = qe->flow_id;
> + p->hist_list[head].qid = qid_id;
> +
> + if (keep_order)
> + rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
> + (void *)&p->hist_list[head].rob_entry);
> +
> + sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
> + iq_ring_pop(qid->iq[iq_num]);
> +
> + rte_compiler_barrier();
> + p->inflights++;
> + p->stats.tx_pkts++;
> + p->hist_head++;
> + }
> +exit:
> + qid->cq_next_tx = cq_idx;
> + return i;
> +}
> +
> +static uint32_t
> +sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
> + uint32_t iq_num, unsigned int count __rte_unused)
> +{
> + uint32_t cq_id = qid->cq_map[0];
> + struct sw_port *port = &sw->ports[cq_id];
> +
> + /* get max burst enq size for cq_ring */
> + uint32_t count_free = sw->cq_ring_space[cq_id];
> + if (count_free == 0)
> + return 0;
> +
> + /* burst dequeue from the QID IQ ring */
> + struct iq_ring *ring = qid->iq[iq_num];
> + uint32_t ret = iq_ring_dequeue_burst(ring,
> + &port->cq_buf[port->cq_buf_count], count_free);
> + port->cq_buf_count += ret;
> +
> + /* Update QID, Port and Total TX stats */
> + qid->stats.tx_pkts += ret;
> + port->stats.tx_pkts += ret;
> +
> + /* Subtract credits from cached value */
> + sw->cq_ring_space[cq_id] -= ret;
> +
> + return ret;
> +}
> +
> +static uint32_t
> +sw_schedule_qid_to_cq(struct sw_evdev *sw)
> +{
> + uint32_t pkts = 0;
> + uint32_t qid_idx;
> +
> + sw->sched_cq_qid_called++;
> +
> + for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
> + struct sw_qid *qid = sw->qids_prioritized[qid_idx];
> +
> + int type = qid->type;
> + int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
> +
> + /* zero mapped CQs indicates directed */
> + if (iq_num >= SW_IQS_MAX)
> + continue;
> +
> + uint32_t pkts_done = 0;
> + uint32_t count = iq_ring_count(qid->iq[iq_num]);
> +
> + if (count > 0) {
> + if (type == SW_SCHED_TYPE_DIRECT)
> + pkts_done += sw_schedule_dir_to_cq(sw, qid,
> + iq_num, count);
> + else if (type == RTE_SCHED_TYPE_ATOMIC)
> + pkts_done += sw_schedule_atomic_to_cq(sw, qid,
> + iq_num, count);
> + else
> + pkts_done += sw_schedule_parallel_to_cq(sw, qid,
> + iq_num, count,
> + type == RTE_SCHED_TYPE_ORDERED);
> + }
> +
> + /* Check if the IQ that was polled is now empty, and unset it
> + * in the IQ mask if its empty.
> + */
> + int all_done = (pkts_done == count);
> +
> + qid->iq_pkt_mask &= ~(all_done << (iq_num));
> + pkts += pkts_done;
> + }
> +
> + return pkts;
> +}
> +
> +/* This function will perform re-ordering of packets, and injecting into
> + * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
> + * contiguous in that array, this function accepts a "range" of QIDs to scan.
> + */
> +static uint16_t
> +sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
> +{
> + /* Perform egress reordering */
> + struct rte_event *qe;
> + uint32_t pkts_iter = 0;
> +
> + for (; qid_start < qid_end; qid_start++) {
> + struct sw_qid *qid = &sw->qids[qid_start];
> + int i, num_entries_in_use;
> +
> + if (qid->type != RTE_SCHED_TYPE_ORDERED)
> + continue;
> +
> + num_entries_in_use = rte_ring_free_count(
> + qid->reorder_buffer_freelist);
> +
> + for (i = 0; i < num_entries_in_use; i++) {
> + struct reorder_buffer_entry *entry;
> + int j;
> +
> + entry = &qid->reorder_buffer[qid->reorder_buffer_index];
> +
> + if (!entry->ready)
> + break;
> +
> + for (j = 0; j < entry->num_fragments; j++) {
> + uint16_t dest_qid;
> + uint16_t dest_iq;
> +
> + int idx = entry->fragment_index + j;
> + qe = &entry->fragments[idx];
> +
> + dest_qid = qe->queue_id;
> + dest_iq = PRIO_TO_IQ(qe->priority);
> +
> + if (dest_qid >= sw->qid_count) {
> + sw->stats.rx_dropped++;
> + continue;
> + }
> +
> + struct sw_qid *dest_qid_ptr =
> + &sw->qids[dest_qid];
> + const struct iq_ring *dest_iq_ptr =
> + dest_qid_ptr->iq[dest_iq];
> + if (iq_ring_free_count(dest_iq_ptr) == 0)
> + break;
> +
> + pkts_iter++;
> +
> + struct sw_qid *q = &sw->qids[dest_qid];
> + struct iq_ring *r = q->iq[dest_iq];
> +
> + /* we checked for space above, so enqueue must
> + * succeed
> + */
> + iq_ring_enqueue(r, qe);
> + q->iq_pkt_mask |= (1 << (dest_iq));
> + q->iq_pkt_count[dest_iq]++;
> + q->stats.rx_pkts++;
> + }
> +
> + entry->ready = (j != entry->num_fragments);
> + entry->num_fragments -= j;
> + entry->fragment_index += j;
> +
> + if (!entry->ready) {
> + entry->fragment_index = 0;
> +
> + rte_ring_sp_enqueue(
> + qid->reorder_buffer_freelist,
> + entry);
> +
> + qid->reorder_buffer_index++;
> + qid->reorder_buffer_index %= qid->window_size;
> + }
> + }
> + }
> + return pkts_iter;
> +}
> +
> +static inline void __attribute__((always_inline))
> +sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
> +{
> + RTE_SET_USED(sw);
> + struct qe_ring *worker = port->rx_worker_ring;
> + port->pp_buf_start = 0;
> + port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
> + RTE_DIM(port->pp_buf));
> +}
> +
> +static inline uint32_t __attribute__((always_inline))
> +__pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
> +{
> + static const struct reorder_buffer_entry dummy_rob;
> + uint32_t pkts_iter = 0;
> + struct sw_port *port = &sw->ports[port_id];
> +
> + /* If shadow ring has 0 pkts, pull from worker ring */
> + if (port->pp_buf_count == 0)
> + sw_refill_pp_buf(sw, port);
> +
> + while (port->pp_buf_count) {
> + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
> + struct sw_hist_list_entry *hist_entry = NULL;
> + uint8_t flags = qe->op;
> + const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
> + int needs_reorder = 0;
> + /* if no-reordering, having PARTIAL == NEW */
> + if (!allow_reorder && !eop)
> + flags = QE_FLAG_VALID;
> +
> + /*
> + * if we don't have space for this packet in an IQ,
> + * then move on to next queue. Technically, for a
> + * packet that needs reordering, we don't need to check
> + * here, but it simplifies things not to special-case
> + */
> + uint32_t iq_num = PRIO_TO_IQ(qe->priority);
> + struct sw_qid *qid = &sw->qids[qe->queue_id];
> +
> + if ((flags & QE_FLAG_VALID) &&
> + iq_ring_free_count(qid->iq[iq_num]) == 0)
> + break;
> +
> + /* now process based on flags. Note that for directed
> + * queues, the enqueue_flush masks off all but the
> + * valid flag. This makes FWD and PARTIAL enqueues just
> + * NEW type, and makes DROPS no-op calls.
> + */
> + if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
> + const uint32_t hist_tail = port->hist_tail &
> + (SW_PORT_HIST_LIST - 1);
> +
> + hist_entry = &port->hist_list[hist_tail];
> + const uint32_t hist_qid = hist_entry->qid;
> + const uint32_t hist_fid = hist_entry->fid;
> +
> + struct sw_fid_t *fid =
> + &sw->qids[hist_qid].fids[hist_fid];
> + fid->pcount -= eop;
> + if (fid->pcount == 0)
> + fid->cq = -1;
> +
> + if (allow_reorder) {
> + /* set reorder ready if an ordered QID */
> + uintptr_t rob_ptr =
> + (uintptr_t)hist_entry->rob_entry;
> + const uintptr_t valid = (rob_ptr != 0);
> + needs_reorder = valid;
> + rob_ptr |=
> + ((valid - 1) & (uintptr_t)&dummy_rob);
> + struct reorder_buffer_entry *tmp_rob_ptr =
> + (struct reorder_buffer_entry *)rob_ptr;
> + tmp_rob_ptr->ready = eop * needs_reorder;
> + }
> +
> + port->inflights -= eop;
> + port->hist_tail += eop;
> + }
> + if (flags & QE_FLAG_VALID) {
> + port->stats.rx_pkts++;
> +
> + if (allow_reorder && needs_reorder) {
> + struct reorder_buffer_entry *rob_entry =
> + hist_entry->rob_entry;
> +
> + /* Although fragmentation not currently
> + * supported by eventdev API, we support it
> + * here. Open: How do we alert the user that
> + * they've exceeded max frags?
> + */
> + int num_frag = rob_entry->num_fragments;
> + if (num_frag == SW_FRAGMENTS_MAX)
> + sw->stats.rx_dropped++;
> + else {
> + int idx = rob_entry->num_fragments++;
> + rob_entry->fragments[idx] = *qe;
> + }
> + goto end_qe;
> + }
> +
> + /* Use the iq_num from above to push the QE
> + * into the qid at the right priority
> + */
> +
> + qid->iq_pkt_mask |= (1 << (iq_num));
> + iq_ring_enqueue(qid->iq[iq_num], qe);
> + qid->iq_pkt_count[iq_num]++;
> + qid->stats.rx_pkts++;
> + pkts_iter++;
> + }
> +
> +end_qe:
> + port->pp_buf_start++;
> + port->pp_buf_count--;
> + } /* while (avail_qes) */
> +
> + return pkts_iter;
> +}
> +
> +static uint32_t
> +sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
> +{
> + return __pull_port_lb(sw, port_id, 1);
> +}
> +
> +static uint32_t
> +sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
> +{
> + return __pull_port_lb(sw, port_id, 0);
> +}
> +
> +static uint32_t
> +sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
> +{
> + uint32_t pkts_iter = 0;
> + struct sw_port *port = &sw->ports[port_id];
> +
> + /* If shadow ring has 0 pkts, pull from worker ring */
> + if (port->pp_buf_count == 0)
> + sw_refill_pp_buf(sw, port);
> +
> + while (port->pp_buf_count) {
> + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
> + uint8_t flags = qe->op;
> +
> + if ((flags & QE_FLAG_VALID) == 0)
> + goto end_qe;
> +
> + uint32_t iq_num = PRIO_TO_IQ(qe->priority);
> + struct sw_qid *qid = &sw->qids[qe->queue_id];
> + struct iq_ring *iq_ring = qid->iq[iq_num];
> +
> + if (iq_ring_free_count(iq_ring) == 0)
> + break; /* move to next port */
> +
> + port->stats.rx_pkts++;
> +
> + /* Use the iq_num from above to push the QE
> + * into the qid at the right priority
> + */
> + qid->iq_pkt_mask |= (1 << (iq_num));
> + iq_ring_enqueue(iq_ring, qe);
> + qid->iq_pkt_count[iq_num]++;
> + qid->stats.rx_pkts++;
> + pkts_iter++;
> +
> +end_qe:
> + port->pp_buf_start++;
> + port->pp_buf_count--;
> + } /* while port->pp_buf_count */
> +
> + return pkts_iter;
> +}
> +
> +void
> +sw_event_schedule(struct rte_eventdev *dev)
> +{
> + struct sw_evdev *sw = sw_pmd_priv(dev);
> + uint32_t in_pkts, out_pkts;
> + uint32_t out_pkts_total = 0, in_pkts_total = 0;
> + int32_t sched_quanta = sw->sched_quanta;
> + uint32_t i;
> +
> + sw->sched_called++;
> + if (!sw->started)
> + return;
> +
> + do {
> + uint32_t in_pkts_this_iteration = 0;
> +
> + /* Pull from rx_ring for ports */
> + do {
> + in_pkts = 0;
> + for (i = 0; i < sw->port_count; i++)
> + if (sw->ports[i].is_directed)
> + in_pkts += sw_schedule_pull_port_dir(sw, i);
> + else if (sw->ports[i].num_ordered_qids > 0)
> + in_pkts += sw_schedule_pull_port_lb(sw, i);
> + else
> + in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
> +
> + /* QID scan for re-ordered */
> + in_pkts += sw_schedule_reorder(sw, 0,
> + sw->qid_count);
> + in_pkts_this_iteration += in_pkts;
> + } while (in_pkts > 4 &&
> + (int)in_pkts_this_iteration < sched_quanta);
> +
> + out_pkts = 0;
> + out_pkts += sw_schedule_qid_to_cq(sw);
> + out_pkts_total += out_pkts;
> + in_pkts_total += in_pkts_this_iteration;
> +
> + if (in_pkts == 0 && out_pkts == 0)
> + break;
> + } while ((int)out_pkts_total < sched_quanta);
> +
> + /* push all the internal buffered QEs in port->cq_ring to the
> + * worker cores: aka, do the ring transfers batched.
> + */
> + for (i = 0; i < sw->port_count; i++) {
> + struct qe_ring *worker = sw->ports[i].cq_worker_ring;
> + qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
> + sw->ports[i].cq_buf_count,
> + &sw->cq_ring_space[i]);
> + sw->ports[i].cq_buf_count = 0;
> + }
> +
> + sw->stats.tx_pkts += out_pkts_total;
> + sw->stats.rx_pkts += in_pkts_total;
> +
> + sw->sched_no_iq_enqueues += (in_pkts_total == 0);
> + sw->sched_no_cq_enqueues += (out_pkts_total == 0);
> +
> +}
There's a couple of line-length issues in checkpatch, but the indentation
makes it very difficult to resolve, so I would suggest they're OK as
they are. So,
Acked-by: David Hunt <david.hunt at intel.com>
More information about the dev
mailing list