new file mode 100644
@@ -0,0 +1,59 @@
+# BSD LICENSE
+#
+# Copyright(c) 2016 Intel Corporation. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+
+# library name
+LIB = librte_pmd_evdev_sw.a
+
+# build flags
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+# library version
+LIBABIVER := 1
+
+# versioning export map
+EXPORT_MAP := rte_pmd_evdev_sw_version.map
+
+# library source files
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev_worker.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev_scheduler.c
+
+# export include files
+SYMLINK-y-include +=
+
+# library dependencies
+DEPDIRS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += lib/librte_eventdev
+
+include $(RTE_SDK)/mk/rte.lib.mk
new file mode 100644
@@ -0,0 +1,142 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _EVENT_RING_
+#define _EVENT_RING_
+
+#include <stdint.h>
+#include <x86intrin.h>
+
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+
+#define QE_RING_NAMESIZE 32
+
+struct qe_ring {
+ char name[QE_RING_NAMESIZE] __rte_cache_aligned;
+ uint32_t ring_size; /* size of memory block allocated to the ring */
+ uint32_t mask; /* mask for read/write values == ring_size -1 */
+ uint32_t size; /* actual usable space in the ring */
+ volatile uint32_t write_idx __rte_cache_aligned;
+ volatile uint32_t read_idx __rte_cache_aligned;
+
+ struct rte_event ring[0] __rte_cache_aligned;
+};
+
+#ifndef force_inline
+#define force_inline inline __attribute__((always_inline))
+#endif
+
+static inline struct qe_ring * __attribute__((cold))
+qe_ring_create(const char *name, unsigned int size, unsigned socket_id)
+{
+ struct qe_ring *retval;
+ const uint32_t ring_size = rte_align32pow2(size + 1);
+ size_t memsize = sizeof(*retval) +
+ (ring_size * sizeof(retval->ring[0]));
+
+ retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id);
+ if (retval == NULL)
+ goto end;
+
+ snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name);
+ retval->ring_size = ring_size;
+ retval->mask = ring_size - 1;
+ retval->size = size;
+end:
+ return retval;
+}
+
+static inline void
+qe_ring_destroy(struct qe_ring *r)
+{
+ rte_free(r);
+}
+
+static force_inline unsigned int
+qe_ring_count(const struct qe_ring *r)
+{
+ return r->write_idx - r->read_idx;
+}
+
+static force_inline unsigned int
+qe_ring_free_count(const struct qe_ring *r)
+{
+ return r->size - qe_ring_count(r);
+}
+
+static force_inline unsigned int
+qe_ring_enqueue_burst(struct qe_ring *r, struct rte_event *qes,
+ unsigned int nb_qes, uint16_t *free_count)
+{
+ const uint32_t size = r->size;
+ const uint32_t mask = r->mask;
+ const uint32_t read = r->read_idx;
+ uint32_t write = r->write_idx;
+ const uint32_t space = read + size - write;
+ uint32_t i;
+
+ if (space < nb_qes)
+ nb_qes = space;
+
+ for (i = 0; i < nb_qes; i++, write++)
+ r->ring[write & mask] = qes[i];
+
+ r->write_idx = write;
+
+ *free_count = space - nb_qes;
+
+ return nb_qes;
+}
+
+static force_inline unsigned int
+qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes,
+ unsigned int nb_qes)
+{
+ const uint32_t mask = r->mask;
+ uint32_t read = r->read_idx;
+ const uint32_t write = r->write_idx;
+ const uint32_t items = write - read;
+ uint32_t i;
+
+ if (items < nb_qes)
+ nb_qes = items;
+
+ for (i = 0; i < nb_qes; i++, read++)
+ qes[i] = r->ring[read & mask];
+
+ r->read_idx += nb_qes;
+
+ return nb_qes;
+}
+
+#endif
new file mode 100644
@@ -0,0 +1,160 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _IQ_RING_
+#define _IQ_RING_
+
+#include <stdint.h>
+#include <x86intrin.h>
+
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+#include <rte_eventdev.h>
+
+#define IQ_RING_NAMESIZE 12
+#define QID_IQ_DEPTH 128
+#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1)
+
+struct iq_ring {
+ char name[IQ_RING_NAMESIZE] __rte_cache_aligned;
+ uint16_t write_idx;
+ uint16_t read_idx;
+
+ struct rte_event ring[QID_IQ_DEPTH];
+};
+
+#ifndef force_inline
+#define force_inline inline __attribute__((always_inline))
+#endif
+
+static inline struct iq_ring * __attribute__((cold))
+iq_ring_create(const char *name, unsigned socket_id)
+{
+ struct iq_ring *retval;
+
+ retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id);
+ if (retval == NULL)
+ goto end;
+
+ snprintf(retval->name, sizeof(retval->name), "%s", name);
+ retval->write_idx = retval->read_idx = 0;
+end:
+ return retval;
+}
+
+static inline void
+iq_ring_destroy(struct iq_ring *r)
+{
+ rte_free(r);
+}
+
+static force_inline uint16_t
+iq_ring_count(const struct iq_ring *r)
+{
+ return r->write_idx - r->read_idx;
+}
+
+static force_inline uint16_t
+iq_ring_free_count(const struct iq_ring *r)
+{
+ return QID_IQ_MASK - iq_ring_count(r);
+}
+
+static force_inline uint16_t
+iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+ const uint16_t read = r->read_idx;
+ uint16_t write = r->write_idx;
+ const uint16_t space = read + QID_IQ_MASK - write;
+ uint16_t i;
+
+ if (space < nb_qes)
+ nb_qes = space;
+
+ for (i = 0; i < nb_qes; i++, write++)
+ r->ring[write & QID_IQ_MASK] = qes[i];
+
+ r->write_idx = write;
+
+ return nb_qes;
+}
+
+static force_inline uint16_t
+iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+ uint16_t read = r->read_idx;
+ const uint16_t write = r->write_idx;
+ const uint16_t items = write - read;
+ uint16_t i;
+
+ for (i = 0; i < nb_qes; i++, read++)
+ qes[i] = r->ring[read & QID_IQ_MASK];
+
+ if (items < nb_qes)
+ nb_qes = items;
+
+ r->read_idx += nb_qes;
+
+ return nb_qes;
+}
+
+static force_inline const struct rte_event *
+iq_ring_peek(const struct iq_ring *r)
+{
+ return &r->ring[r->read_idx & QID_IQ_MASK];
+}
+
+static force_inline void
+iq_ring_pop(struct iq_ring *r)
+{
+ r->read_idx++;
+}
+
+static force_inline int
+iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe)
+{
+ const uint16_t read = r->read_idx;
+ const uint16_t write = r->write_idx;
+ const uint16_t space = read + QID_IQ_MASK - write;
+
+ if (space == 0)
+ return -1;
+
+ r->ring[write & QID_IQ_MASK] = *qe;
+
+ r->write_idx = write + 1;
+
+ return 0;
+}
+
+#endif
new file mode 100644
@@ -0,0 +1,3 @@
+DPDK_17.02 {
+ local: *;
+};
new file mode 100644
@@ -0,0 +1,619 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <string.h>
+
+#include <rte_vdev.h>
+#include <rte_memzone.h>
+#include <rte_kvargs.h>
+#include <rte_ring.h>
+#include <rte_eventdev_pmd.h>
+
+#include "sw_evdev.h"
+#include "iq_ring.h"
+
+#define NUMA_NODE_ARG "numa_node"
+
+static int
+sw_dev_stats_get(const struct rte_event_dev *dev,
+ struct rte_event_dev_stats *stats)
+{
+ const struct sw_evdev *sw = (const void *)dev;
+ unsigned int i;
+
+ if (dev == NULL || stats == NULL)
+ return -EINVAL;
+
+ memset(stats, 0, sizeof(*stats));
+
+ stats->rx_pkts = sw->stats.rx_pkts;
+ stats->rx_dropped = sw->stats.rx_dropped;
+ stats->tx_pkts = sw->stats.tx_pkts;
+
+ for (i = 0; i < sw->port_count; i++) {
+ stats->port_rx_pkts[i] = sw->ports[i].stats.rx_pkts;
+ stats->port_rx_dropped[i] = sw->ports[i].stats.rx_dropped;
+ stats->port_inflight[i] = sw->ports[i].inflights;
+ stats->port_tx_pkts[i] = sw->ports[i].stats.tx_pkts;
+ }
+
+ for (i = 0; i < sw->qid_count; i++) {
+ stats->queue_rx_pkts[i] = sw->qids[i].stats.rx_pkts;
+ stats->queue_rx_dropped[i] = sw->qids[i].stats.rx_dropped;
+ stats->queue_tx_pkts[i] = sw->qids[i].stats.tx_pkts;
+ }
+ return 0;
+}
+
+static int
+sw_port_link(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event_queue_link link[], int num)
+{
+ struct sw_evdev *sw = (void *)dev;
+ struct sw_port *p = &sw->ports[port_id];
+ int i;
+
+ if (link == NULL) {
+ /* TODO: map all queues */
+ rte_errno = -EDQUOT;
+ return 0;
+ }
+ if (port_id > sw->port_count) {
+ rte_errno = -EINVAL;
+ return 0;
+ }
+
+ for (i = 0; i < num; i++) {
+ struct sw_qid *q;
+ uint32_t qid = link[i].queue_id;
+ if (qid >= sw->qid_count) {
+ break; /* error - invalid QIDs */
+ }
+ q = &sw->qids[qid];
+
+ /* check for qid map overflow */
+ if (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map))
+ break;
+
+ if (p->is_directed && p->num_qids_mapped > 0)
+ break;
+
+ if (q->type == RTE_SCHED_TYPE_DIRECT) {
+ /* check directed qids only map to one port */
+ if (p->num_qids_mapped > 0)
+ break;
+ /* check port only takes a directed flow */
+ if (num > 1)
+ break;
+
+ p->is_directed = 1;
+ p->num_qids_mapped = 1;
+ } else if (q->type == RTE_SCHED_TYPE_ORDERED) {
+ p->num_ordered_qids++;
+ p->num_qids_mapped++;
+ } else if (q->type == RTE_SCHED_TYPE_ATOMIC) {
+ p->num_qids_mapped++;
+ }
+
+ q->cq_map[q->cq_num_mapped_cqs++] = port_id;
+ }
+ return i;
+}
+
+static void
+sw_dump(FILE *f, const struct rte_event_dev *dev)
+{
+ static const char *q_type_strings[] = {"Ordered" , "Atomic",
+ "Parallel", "Directed"
+ };
+ uint32_t i;
+ const struct sw_evdev *sw = (const void *)dev;
+ fprintf(f, "EventDev %s: ports %d, qids %d\n", sw->dev.name,
+ sw->port_count, sw->qid_count);
+
+ fprintf(f, "\trx %"PRIu64"\n\tdrop %"PRIu64"\n\ttx %"PRIu64"\n",
+ sw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts);
+ fprintf(f, "\tsched calls: %"PRIu64"\n", sw->sched_called);
+ fprintf(f, "\tsched cq/qid call: %"PRIu64"\n", sw->sched_cq_qid_called);
+ fprintf(f, "\tsched no IQ enq: %"PRIu64"\n", sw->sched_no_iq_enqueues);
+ fprintf(f, "\tsched no CQ enq: %"PRIu64"\n", sw->sched_no_cq_enqueues);
+ fprintf(f, "\toverloads %"PRIu64"\t%s\n", sw->sched_overload_counter,
+ sw->overloaded ? " [OVERLOADED NOW]" : "");
+
+#define COL_RED "\x1b[31m"
+#define COL_RESET "\x1b[0m"
+
+ for (i = 0; i < sw->port_count; i++) {
+ const struct sw_port *p = &sw->ports[i];
+ fprintf(f, " Port %d %s %s\n", i,
+ p->is_directed ? " (SingleCons)" : "",
+ p->overloaded ? " ["COL_RED"OVERLOAD"COL_RESET"]" : "");
+ fprintf(f, "\trx %"PRIu64"\n\tdrop %"PRIu64"\n\ttx %"PRIu64"\n"
+ "\tinf %d\n", sw->ports[i].stats.rx_pkts,
+ sw->ports[i].stats.rx_dropped,
+ sw->ports[i].stats.tx_pkts, sw->ports[i].inflights);
+
+ uint64_t rx_used = qe_ring_count(p->rx_worker_ring);
+ uint64_t rx_free = qe_ring_free_count(p->rx_worker_ring);
+ const char *rxcol = (rx_free == 0) ? COL_RED : COL_RESET;
+ fprintf(f, "\t%srx ring used: %ld\tfree: %ld"COL_RESET"\n",
+ rxcol, rx_used, rx_free);
+
+ uint64_t tx_used = qe_ring_count(p->cq_worker_ring);
+ uint64_t tx_free = qe_ring_free_count(p->cq_worker_ring);
+ const char *txcol = (tx_free == 0) ? COL_RED : COL_RESET;
+ fprintf(f, "\t%scq ring used: %ld\tfree: %ld"COL_RESET"\n",
+ txcol, tx_used, tx_free);
+ }
+
+ for (i = 0; i < sw->qid_count; i++) {
+ fprintf(f, " Queue %d (%s)\n", i, q_type_strings[sw->qids[i].type]);
+ fprintf(f, "\trx %"PRIu64"\n\tdrop %"PRIu64"\n\ttx %"PRIu64"\n",
+ sw->qids[i].stats.rx_pkts, sw->qids[i].stats.rx_dropped,
+ sw->qids[i].stats.tx_pkts);
+ uint32_t iq;
+ for(iq = 0; iq < SW_IQS_MAX; iq++) {
+ uint32_t used = iq_ring_count(sw->qids[i].iq[iq]);
+ uint32_t free = iq_ring_free_count(sw->qids[i].iq[iq]);
+ const char *col = (free == 0) ? COL_RED : COL_RESET;
+ fprintf(f, "\t%siq %d: Used %d\tFree %d"COL_RESET"\n",
+ col, iq, used, free);
+ }
+ }
+}
+
+static int
+sw_port_setup(struct rte_event_dev *dev, uint8_t port_id,
+ const struct rte_event_port_conf *conf)
+{
+ struct sw_evdev *sw = (void *)dev;
+ struct sw_port *p = &sw->ports[port_id];
+ char buf[QE_RING_NAMESIZE];
+ unsigned i;
+
+ if (conf->enqueue_queue_depth >
+ dev->info.max_event_port_enqueue_queue_depth ||
+ conf->dequeue_queue_depth >
+ dev->info.max_event_port_dequeue_queue_depth){
+ rte_errno = EINVAL;
+ return -1;
+ }
+
+ *p = (struct sw_port){0}; /* zero entire structure */
+ p->id = port_id;
+
+ /* TODO: how do we work with an overload scheme here?
+ * For now, still use a huge buffer, with per-port thresholds.
+ * When it fills beyond the configured max size, we throttle.
+ */
+ snprintf(buf, sizeof(buf), "%s_%s", dev->name, "rx_worker_ring");
+ p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
+ dev->socket_id);
+ if (p->rx_worker_ring == NULL)
+ return -1;
+
+ /* threshold is number of free spaces that are left in ring
+ * before overload should kick in. QE ring returns free_count,
+ * so storing this way makes more sense than actual depth
+ */
+ uint32_t requested = MAX_SW_PROD_Q_DEPTH - conf->new_event_threshold;
+ p->overload_threshold = requested > 255 ? 255 : requested;
+
+ snprintf(buf, sizeof(buf), "%s_%s", dev->name, "cq_worker_ring");
+ p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_queue_depth,
+ dev->socket_id);
+ if (p->cq_worker_ring == NULL) {
+ qe_ring_destroy(p->rx_worker_ring);
+ return -1;
+ }
+ sw->cq_ring_space[port_id] = conf->dequeue_queue_depth;
+
+ /* set hist list contents to empty */
+ for (i = 0; i < SW_PORT_HIST_LIST; i++) {
+ p->hist_list[i].fid = -1;
+ p->hist_list[i].qid = -1;
+ }
+
+ return 0;
+}
+
+static int
+sw_port_cleanup(struct sw_evdev *sw, uint8_t port_id)
+{
+ struct sw_port *p = &sw->ports[port_id];
+
+ qe_ring_destroy(p->rx_worker_ring);
+ qe_ring_destroy(p->cq_worker_ring);
+ memset(p, 0, sizeof(*p));
+
+ return 0;
+}
+
+static uint8_t
+sw_port_count(struct rte_event_dev *dev)
+{
+ struct sw_evdev *sw = (void *)dev;
+ return sw->port_count;
+}
+
+
+static uint16_t
+sw_queue_count(struct rte_event_dev *dev)
+{
+ struct sw_evdev *sw = (void *)dev;
+ return sw->qid_count;
+}
+
+static int32_t
+qid_cleanup(struct sw_evdev *sw, uint32_t idx)
+{
+ struct sw_qid *qid = &sw->qids[idx];
+ uint32_t i;
+
+ for (i = 0; i < SW_IQS_MAX; i++) {
+ iq_ring_destroy(qid->iq[i]);
+ }
+
+ if (qid->type == RTE_SCHED_TYPE_ORDERED) {
+ rte_free(qid->reorder_buffer);
+ rte_ring_free(qid->reorder_buffer_freelist);
+ }
+ memset(qid, 0, sizeof(*qid));
+
+ return 0;
+}
+
+static int32_t
+qid_init(struct sw_evdev *sw, unsigned idx, int type,
+ const struct rte_event_queue_conf *queue_conf)
+{
+ int i;
+ int socket_id = sw->dev.socket_id;
+ char buf[IQ_RING_NAMESIZE];
+ struct sw_qid *qid = &sw->qids[idx];
+
+ for (i = 0; i < SW_IQS_MAX; i++) {
+ snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i);
+ qid->iq[i] = iq_ring_create(buf, socket_id);
+ if (!qid->iq[i]) {
+ SW_LOG_DBG("ring create failed");
+ goto cleanup;
+ }
+ }
+
+ /* Initialize the iq packet mask to 1, as __builtin_clz() is undefined
+ * if the value passed in is zero.
+ */
+ qid->iq_pkt_mask = 1;
+
+ /* Initialize the FID structures to no pinning (-1), and zero packets */
+ struct sw_fid_t fid = {.cq = -1, .count = 0};
+ for (i = 0; i < SW_QID_NUM_FIDS; i++)
+ qid->fids[i] = fid;
+
+ qid->id = idx;
+ qid->type = type;
+ qid->priority = queue_conf->priority;
+
+ if (qid->type == RTE_SCHED_TYPE_ORDERED) {
+ uint32_t window_size;
+
+ /* rte_ring and window_size_mask require require window_size to
+ * be a power-of-2.
+ */
+ window_size = rte_align32pow2(
+ queue_conf->nb_atomic_order_sequences);
+
+ qid->window_size = window_size - 1;
+
+ if (!window_size) {
+ SW_LOG_DBG("invalid reorder_window_size for ordered queue\n");
+ goto cleanup;
+ }
+
+ snprintf(buf, sizeof(buf), "%s_iq_%d_rob", sw->dev.name, i);
+ qid->reorder_buffer = rte_zmalloc_socket(buf,
+ window_size * sizeof(qid->reorder_buffer[0]),
+ 0, socket_id);
+ if (!qid->reorder_buffer) {
+ SW_LOG_DBG("reorder_buffer malloc failed\n");
+ goto cleanup;
+ }
+
+ memset(&qid->reorder_buffer[0],
+ 0,
+ window_size * sizeof(qid->reorder_buffer[0]));
+
+ snprintf(buf, sizeof(buf), "%s_iq_%d_freelist", sw->dev.name, i);
+ qid->reorder_buffer_freelist = rte_ring_create(buf,
+ window_size,
+ socket_id,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (!qid->reorder_buffer_freelist) {
+ SW_LOG_DBG("freelist ring create failed");
+ goto cleanup;
+ }
+
+ /* Populate the freelist with reorder buffer entries. Enqueue
+ * 'window_size - 1' entries because the rte_ring holds only
+ * that many.
+ */
+ for (i = 0; i < (int) window_size - 1; i++) {
+ if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist,
+ &qid->reorder_buffer[i]) < 0)
+ goto cleanup;
+ }
+
+ qid->reorder_buffer_index = 0;
+ qid->cq_next_tx = 0;
+ }
+
+ return 0;
+
+cleanup:
+ for (i = 0; i < SW_IQS_MAX; i++) {
+ if (qid->iq[i])
+ iq_ring_destroy(qid->iq[i]);
+ }
+
+ if (qid->reorder_buffer) {
+ rte_free(qid->reorder_buffer);
+ qid->reorder_buffer = NULL;
+ }
+
+ if (qid->reorder_buffer_freelist) {
+ rte_ring_free(qid->reorder_buffer_freelist);
+ qid->reorder_buffer_freelist = NULL;
+ }
+
+ return -EINVAL;
+}
+
+static int
+sw_queue_setup(struct rte_event_dev *dev,
+ uint8_t queue_id,
+ const struct rte_event_queue_conf *conf)
+{
+ int type;
+ if (conf->nb_atomic_flows > 0 &&
+ conf ->nb_atomic_order_sequences > 0)
+ return -1;
+
+ if (conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER)
+ type = RTE_SCHED_TYPE_DIRECT;
+ else if (conf->nb_atomic_flows > 0)
+ type = RTE_SCHED_TYPE_ATOMIC;
+ else if (conf->nb_atomic_order_sequences > 0)
+ type = RTE_SCHED_TYPE_ORDERED;
+ else
+ type = RTE_SCHED_TYPE_PARALLEL;
+
+ return qid_init((void *)dev, queue_id, type, conf);
+}
+
+static int
+sw_dev_configure(struct rte_event_dev *dev,
+ struct rte_event_dev_config *config)
+{
+ struct sw_evdev *se = (void *)dev;
+
+ if (config->nb_event_queues > dev->info.max_event_queues ||
+ config->nb_event_ports > dev->info.max_event_ports)
+ return -1;
+
+ se->qid_count = config->nb_event_queues;
+ se->port_count = config->nb_event_ports;
+ return 0;
+}
+
+static int
+assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
+{
+ int *socket_id = opaque;
+ *socket_id = atoi(value);
+ if (*socket_id > RTE_MAX_NUMA_NODES)
+ return -1;
+ return 0;
+}
+
+static inline void
+swap_ptr(void *a, void *b)
+{
+ void *tmp = a;
+ a = b;
+ b= tmp;
+}
+
+static int
+sw_start(struct rte_event_dev *dev)
+{
+ unsigned int i, j;
+ struct sw_evdev *sw = (void *)dev;
+ /* check all ports are set up */
+ for (i = 0; i < sw->port_count; i++)
+ if (sw->ports[i].rx_worker_ring == NULL)
+ return -1;
+
+ /* check all queues are configured and mapped to ports*/
+ for (i = 0; i < sw->qid_count; i++)
+ if (sw->qids[i].iq[0] == NULL ||
+ sw->qids[i].cq_num_mapped_cqs == 0)
+ return -1;
+
+ /* build up our prioritized array of qids */
+ /* We don't use qsort here, as if all/multiple entries have the same
+ * priority, the result is non-deterministic. From "man 3 qsort":
+ * "If two members compare as equal, their order in the sorted
+ * array is undefined."
+ */
+ for (i = 0; i < sw->qid_count; i++) {
+ sw->qids_prioritized[i] = &sw->qids[i];
+ for (j = i; j > 0; j--)
+ if (sw->qids_prioritized[j]->priority <
+ sw->qids_prioritized[j-1]->priority)
+ swap_ptr(sw->qids_prioritized[j],
+ sw->qids_prioritized[j-1]);
+ }
+ sw->started = 1;
+ return 0;
+}
+
+static void
+sw_stop(struct rte_event_dev *dev)
+{
+ struct sw_evdev *sw = (void *)dev;
+ sw->started = 0;
+}
+static int
+sw_close(struct rte_event_dev *dev)
+{
+ struct sw_evdev *sw = (void *)dev;
+ uint32_t i;
+
+ for(i = 0; i < sw->qid_count; i++) {
+ qid_cleanup(sw, i);
+ }
+ sw->qid_count = 0;
+
+ for (i = 0; i < sw->port_count; i++) {
+ sw_port_cleanup(sw, i);
+ }
+ sw->port_count = 0;
+
+ memset(&sw->stats, 0, sizeof(sw->stats));
+
+ return 0;
+}
+
+static int
+sw_probe(const char *name, const char *params)
+{
+ static const struct rte_event_dev_ops evdev_sw_ops = {
+ .configure = sw_dev_configure,
+ .queue_setup = sw_queue_setup,
+ .queue_count = sw_queue_count,
+ .port_setup = sw_port_setup,
+ .port_link = sw_port_link,
+ .port_count = sw_port_count,
+ .start = sw_start,
+ .stop = sw_stop,
+ .close = sw_close,
+ .stats_get = sw_dev_stats_get,
+ .dump = sw_dump,
+
+ .enqueue = sw_event_enqueue,
+ .enqueue_burst = sw_event_enqueue_burst,
+ .dequeue = sw_event_dequeue,
+ .dequeue_burst = sw_event_dequeue_burst,
+ .release = sw_event_release,
+ .schedule = sw_event_schedule,
+ };
+ static const char *args[] = { NUMA_NODE_ARG, NULL };
+ const struct rte_memzone *mz;
+ struct sw_evdev *se;
+ struct rte_event_dev_info evdev_sw_info = {
+ .driver_name = PMD_NAME,
+ .max_event_queues = SW_QIDS_MAX,
+ .max_event_queue_flows = SW_QID_NUM_FIDS,
+ .max_event_queue_priority_levels = SW_Q_PRIORITY_MAX,
+ .max_event_priority_levels = SW_IQS_MAX,
+ .max_event_ports = SW_PORTS_MAX,
+ .max_event_port_dequeue_queue_depth = MAX_SW_CONS_Q_DEPTH,
+ .max_event_port_enqueue_queue_depth = MAX_SW_PROD_Q_DEPTH,
+ /* for event limits, there is no hard limit, but it
+ * depends on number of Queues configured and depth of
+ * producer/consumer queues
+ */
+ .max_num_events = -1,
+ .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
+ RTE_EVENT_DEV_CAP_EVENT_QOS),
+ };
+ int socket_id = 0;
+
+ if (params != NULL && params[0] != '\0') {
+ struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
+
+ if (!kvlist) {
+ RTE_LOG(INFO, PMD,
+ "Ignoring unsupported parameters when creating device '%s'\n",
+ name);
+ } else {
+ int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
+ assign_numa_node, &socket_id);
+ rte_kvargs_free(kvlist);
+
+ if (ret != 0) {
+ RTE_LOG(ERR, PMD,
+ "%s: Error parsing numa node parameter",
+ name);
+ return ret;
+ }
+ }
+ }
+
+ RTE_LOG(INFO, PMD, "Creating eventdev sw device %s, on numa node %d\n",
+ name, socket_id);
+
+ mz = rte_memzone_reserve(name, sizeof(*se), socket_id, 0);
+ if (mz == NULL)
+ return -1; /* memzone_reserve sets rte_errno on error */
+
+ se = mz->addr;
+ se->mz = mz;
+ snprintf(se->dev.name, sizeof(se->dev.name), "%s", name);
+ se->dev.configured = false;
+ se->dev.info = evdev_sw_info;
+ se->dev.ops = &evdev_sw_ops;
+ se->dev.socket_id = socket_id;
+
+ return rte_event_dev_register(&se->dev);
+}
+
+static int
+sw_remove(const char *name)
+{
+ if (name == NULL)
+ return -EINVAL;
+
+ RTE_LOG(INFO, PMD, "Closing eventdev sw device %s\n", name);
+ /* TODO unregister eventdev and release memzone */
+
+ return 0;
+}
+
+static struct rte_vdev_driver evdev_sw_pmd_drv = {
+ .probe = sw_probe,
+ .remove = sw_remove
+};
+
+RTE_PMD_REGISTER_VDEV(evdev_sw, evdev_sw_pmd_drv);
+RTE_PMD_REGISTER_PARAM_STRING(evdev_sw,"numa_node=<int>");
new file mode 100644
@@ -0,0 +1,234 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _SW_EVDEV_H_
+#define _SW_EVDEV_H_
+
+#include <rte_eventdev.h>
+#include <rte_eventdev_pmd.h>
+#include "event_ring.h"
+
+#define PMD_NAME "evdev_sw"
+
+#define SW_QIDS_MAX 128
+#define SW_QID_NUM_FIDS 16384
+#define SW_IQS_MAX 4
+#define SW_Q_PRIORITY_MAX 255
+#define SW_PORTS_MAX 128
+#define MAX_SW_CONS_Q_DEPTH 255
+
+/* allow for lots of over-provisioning */
+#define MAX_SW_PROD_Q_DEPTH 4096
+
+#define SW_FRAGMENTS_MAX 16
+#define PORT_DEQUEUE_BURST_SIZE 16
+#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH + (MAX_SW_CONS_Q_DEPTH*2))
+
+#define SW_PORT_OVERLOAD_THRES (512)
+
+#define RTE_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+
+#ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG
+#define SW_LOG_INFO(fmt, args...) \
+ RTE_LOG(INFO, PMD, "[%s] %s() line %u: " fmt "\n", \
+ PMD_NAME, \
+ __func__, __LINE__, ## args)
+
+#define SW_LOG_DBG(fmt, args...) \
+ RTE_LOG(DEBUG, PMD, "[%s] %s() line %u: " fmt "\n", \
+ PMD_NAME, \
+ __func__, __LINE__, ## args)
+#else
+#define SW_LOG_INFO(fmt, args...)
+#define SW_LOG_DBG(fmt, args...)
+#endif
+
+enum {
+ QE_FLAG_VALID_SHIFT = 0,
+ QE_FLAG_COMPLETE_SHIFT,
+ QE_FLAG_NOT_EOP_SHIFT,
+ _QE_FLAG_COUNT
+};
+
+#define QE_FLAG_VALID (1 << QE_FLAG_VALID_SHIFT) /* set for NEW, FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP */
+#define QE_FLAG_NOT_EOP (1 << QE_FLAG_NOT_EOP_SHIFT) /* set for FRAG only */
+
+static const uint8_t sw_qe_flag_map[] = {
+ QE_FLAG_VALID /* RTE_QEENT_OP_NEW */,
+ QE_FLAG_VALID | QE_FLAG_COMPLETE /* RTE_QEENT_OP_FWD */,
+ QE_FLAG_COMPLETE /* RTE_QEENT_OP_DROP */,
+ QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
+/* Records basic event stats at a given point. Used in port and qid structs */
+struct sw_point_stats {
+ uint64_t rx_pkts;
+ uint64_t rx_dropped;
+ uint64_t tx_pkts;
+};
+
+struct reorder_buffer_entry {
+ uint16_t num_fragments; /**< Number of packet fragments */
+ uint16_t fragment_index; /**< Points to the oldest valid frag */
+ uint8_t ready; /**< Entry is ready to be reordered */
+ struct rte_event fragments[SW_FRAGMENTS_MAX];
+};
+
+struct sw_hist_list_entry {
+ int32_t qid;
+ int32_t fid;
+ struct reorder_buffer_entry *rob_entry;
+};
+
+struct sw_port {
+ /* A numeric ID for the port. This should be used to access the
+ * statistics as returned by *rte_event_dev_stats_get*, and in other
+ * places where the API requires accessing a port by integer. It is not
+ * valid to assume that ports will be allocated in a linear sequence.
+ */
+ uint8_t id;
+
+ /** Indicates if this port is overloaded, and we need to throttle input */
+ uint8_t overloaded;
+ uint8_t overload_threshold;
+
+ int16_t is_directed; /** Takes from a single directed QID */
+ int16_t num_ordered_qids; /** For loadbalanced we can optimise pulling
+ packets from producers if there is no reordering
+ involved */
+
+ /* track packets in and out of this port */
+ struct sw_point_stats stats;
+
+ /** Ring and buffer for pulling events from workers for scheduling */
+ struct qe_ring *rx_worker_ring __rte_cache_aligned;
+ uint32_t pp_buf_start;
+ uint32_t pp_buf_count;
+ struct rte_event pp_buf[PORT_DEQUEUE_BURST_SIZE];
+
+
+ /** Ring and buffer for pushing packets to workers after scheduling */
+ struct qe_ring *cq_worker_ring __rte_cache_aligned;
+ uint16_t cq_buf_count;
+ uint16_t outstanding_releases; /* num releases yet to be completed */
+ struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH];
+
+ /* History list structs, containing info on pkts egressed to worker */
+ uint16_t hist_head __rte_cache_aligned;
+ uint16_t hist_tail;
+ uint16_t inflights;
+ struct sw_hist_list_entry hist_list[SW_PORT_HIST_LIST];
+
+ uint8_t num_qids_mapped;
+};
+
+struct sw_fid_t {
+ /* which CQ this FID is currently pinned to */
+ uint32_t cq;
+ /* number of packets gone to the CQ with this FID */
+ uint32_t count;
+};
+
+struct sw_qid {
+ /* The type of this QID */
+ int type;
+ /* Integer ID representing the queue. This is used in history lists,
+ * to identify the stage of processing. */
+ uint32_t id;
+ struct sw_point_stats stats;
+
+ /* Internal priority rings for packets */
+ struct iq_ring *iq[SW_IQS_MAX];
+ uint32_t iq_pkt_mask; /* A mask to indicate packets in an IQ */
+ uint64_t iq_pkt_count[SW_IQS_MAX];
+
+ /* Information on what CQs are polling this IQ */
+ uint32_t cq_num_mapped_cqs;
+ uint32_t cq_next_tx; /* cq to write next (non-atomic) packet */
+ uint32_t cq_map[SW_PORTS_MAX];
+
+ /* Track flow ids for atomic load balancing */
+ struct sw_fid_t fids[SW_QID_NUM_FIDS];
+
+ /* Track packet order for reordering when needed */
+ struct reorder_buffer_entry *reorder_buffer; /* packets awaiting reordering */
+ struct rte_ring *reorder_buffer_freelist; /* available reorder slots */
+ uint32_t reorder_buffer_index; /* oldest valid reorder buffer entry */
+ uint32_t window_size; /* Used to wrap reorder_buffer_index */
+
+ uint8_t priority;
+};
+
+struct sw_evdev {
+ /* must be the first item in the private dev struct */
+ struct rte_event_dev dev;
+
+ const struct rte_memzone *mz;
+
+ /* Contains all ports - load balanced and directed */
+ struct sw_port ports[SW_PORTS_MAX];
+ uint32_t port_count;
+ uint16_t cq_ring_space[SW_PORTS_MAX]; /* How many packets are in the cq */
+
+ /* All qids - allocated in one slab for vectorization */
+ struct sw_qid qids[SW_QIDS_MAX];
+ uint32_t qid_count;
+
+ /* Array of pointers to load-balanced QIDs sorted by priority level */
+ struct sw_qid *qids_prioritized[SW_QIDS_MAX];
+
+ /* Stats */
+ struct sw_point_stats stats __rte_cache_aligned;
+ uint64_t sched_called;
+ uint64_t sched_no_iq_enqueues;
+ uint64_t sched_no_cq_enqueues;
+ uint64_t sched_cq_qid_called;
+ uint64_t sched_overload_counter;
+
+ uint8_t started;
+
+ uint32_t overloaded __rte_cache_aligned;
+};
+
+int sw_event_enqueue(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event *ev, bool pin_event);
+int sw_event_enqueue_burst(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event ev[], int num, bool pin_event);
+bool sw_event_dequeue(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event *ev, uint64_t wait);
+int sw_event_dequeue_burst(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event *ev, int num, uint64_t wait);
+void sw_event_release(struct rte_event_dev *dev, uint8_t port_id, uint8_t index);
+int sw_event_schedule(struct rte_event_dev *dev);
+
+#endif /* _SW_EVDEV_H_ */
new file mode 100644
@@ -0,0 +1,660 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_ring.h>
+#include "sw_evdev.h"
+#include "iq_ring.h"
+
+#define SW_IQS_MASK (SW_IQS_MAX-1)
+
+/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
+ * CLZ twice is faster than caching the value due to data dependencies
+ */
+#define PKT_MASK_TO_IQ(pkts) \
+ (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
+
+/* Clamp the highest priorities to the max value as allowed by
+ * the mask. Assums MASK is (powerOfTwo - 1). Priority 0 (highest) are shifted
+ * into leftmost IQ so that clz() reads it first on dequeue
+ */
+#define PRIO_TO_IQ(prio) (prio > SW_IQS_MASK ? SW_IQS_MASK : prio)
+
+static inline uint32_t
+sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
+ uint32_t iq_num, unsigned int count)
+{
+ uint32_t i;
+
+ if(count == 0)
+ return 0;
+
+ /* This is the QID ID. The QID ID is static, hence it can be
+ * used to identify the stage of processing in history lists etc */
+ uint32_t qid_id = qid->id;
+
+ for (i = 0; i < count; i++) {
+ const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
+ struct sw_fid_t *fid = &qid->fids[qe->flow_id];
+ int cq = fid->cq;
+
+ /* If no CQ is assigned, pick one */
+ if (cq < 0) {
+ /* select CQ based on least inflights,
+ * defaulting to the first mapped CQ
+ */
+ uint32_t cq_idx = qid->cq_next_tx++;
+ if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
+ qid->cq_next_tx = 0;
+ cq = qid->cq_map[cq_idx];
+ int cq_free_cnt = sw->cq_ring_space[cq];
+
+ for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; cq_idx++) {
+ int test_cq = qid->cq_map[cq_idx];
+ int test_cq_free = sw->cq_ring_space[test_cq];
+
+ if (test_cq_free > cq_free_cnt)
+ cq = test_cq, cq_free_cnt = test_cq_free;
+ }
+ }
+
+ struct sw_port *p = &sw->ports[cq];
+
+ /* If the destination CQ or its history list is full, move on
+ * to the next queue.
+ */
+ if (sw->cq_ring_space[cq] == 0 ||
+ p->inflights == SW_PORT_HIST_LIST) {
+ struct qe_ring *worker = sw->ports[cq].cq_worker_ring;
+ qe_ring_enqueue_burst(worker, sw->ports[cq].cq_buf,
+ sw->ports[cq].cq_buf_count,
+ &sw->cq_ring_space[cq]);
+ sw->ports[cq].cq_buf_count = 0;
+#if 0
+ printf("%s cq %d was 0, now %d\n", __func__,
+ cq, sw->cq_ring_space[cq]);
+#endif
+ if(sw->cq_ring_space[cq] == 0)
+ break;
+ }
+
+ sw->cq_ring_space[cq]--;
+
+ /* store which CQ this FID is active on,
+ * for future pkts of the same flow
+ */
+ fid->cq = cq;
+ fid->count++;
+
+ qid->stats.tx_pkts++;
+ sw->ports[cq].inflights++;
+
+ int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
+
+ p->hist_list[head].fid = qe->flow_id;
+ p->hist_list[head].qid = qid_id;
+
+ p->hist_head++;
+ p->stats.tx_pkts++;
+ sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
+ iq_ring_pop(qid->iq[iq_num]);
+ }
+ return i;
+}
+
+static inline uint32_t
+sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
+ uint32_t iq_num, unsigned int count, int keep_order)
+{
+ uint32_t i;
+ uint32_t cq_idx = qid->cq_next_tx;
+
+ /* This is the QID ID. The QID ID is static, hence it can be
+ * used to identify the stage of processing in history lists etc */
+ uint32_t qid_id = qid->id;
+
+
+ if (keep_order)
+ /* only schedule as many as we have reorder buffer entries */
+ count = RTE_MIN(count, rte_ring_count(qid->reorder_buffer_freelist));
+
+ for (i = 0; i < count; i++) {
+ const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
+ uint32_t cq_check_count = 0;
+ uint32_t cq;
+
+ /*
+ * for parallel, just send to next available CQ in round-robin
+ * fashion. So scan for an available CQ. If all CQs are full
+ * just return and move on to next QID
+ */
+ do {
+ if (++cq_check_count > qid->cq_num_mapped_cqs)
+ goto exit;
+ cq = qid->cq_map[cq_idx];
+ if (++cq_idx == qid->cq_num_mapped_cqs)
+ cq_idx = 0;
+ } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
+ sw->ports[cq].inflights == SW_PORT_HIST_LIST);
+
+ struct sw_port *p = &sw->ports[cq];
+ if (sw->cq_ring_space[cq] == 0 ||
+ p->inflights == SW_PORT_HIST_LIST)
+ break;
+
+ sw->cq_ring_space[cq]--;
+
+ qid->stats.tx_pkts++;
+
+ const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
+
+ p->hist_list[head].fid = qe->flow_id;
+ p->hist_list[head].qid = qid_id;
+
+ if (keep_order)
+ rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
+ (void *)&p->hist_list[head].rob_entry);
+
+ sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
+ iq_ring_pop(qid->iq[iq_num]);
+
+ rte_compiler_barrier();
+ p->inflights++;
+ p->stats.tx_pkts++;
+ p->hist_head++;
+ }
+exit:
+ qid->cq_next_tx = cq_idx;
+ return i;
+}
+
+static uint32_t
+sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
+ uint32_t iq_num, unsigned int count)
+{
+ uint32_t cq_id = qid->cq_map[0];
+ struct sw_port *port = &sw->ports[cq_id];
+
+ /* get max burst enq size for cq_ring */
+ uint32_t count_free = sw->cq_ring_space[cq_id];
+ if (count == 0 || count_free == 0)
+ return 0;
+
+ /* burst dequeue from the QID IQ ring */
+ struct iq_ring *ring = qid->iq[iq_num];
+ uint32_t ret = iq_ring_dequeue_burst(ring,
+ &port->cq_buf[port->cq_buf_count], count_free);
+ port->cq_buf_count += ret;
+
+ /* Update QID, Port and Total TX stats */
+ qid->stats.tx_pkts += ret;
+ port->stats.tx_pkts += ret;
+
+ /* Subtract credits from cached value */
+ sw->cq_ring_space[cq_id] -= ret;
+
+ return ret;
+}
+
+static uint32_t
+sw_schedule_qid_to_cq(struct sw_evdev *sw)
+{
+ uint32_t pkts = 0;
+ uint32_t qid_idx;
+
+ sw->sched_cq_qid_called++;
+
+ for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
+ /* make the QID lookup here be based on priority of the QID */
+ struct sw_qid *qid = sw->qids_prioritized[qid_idx];
+
+ int type = qid->type;
+ int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
+
+ /* zero mapped CQs indicates directed */
+ if (iq_num >= SW_IQS_MAX)
+ continue;
+
+ unsigned int count = iq_ring_count(qid->iq[iq_num]);
+ uint32_t pkts_done = 0;
+
+ if (type == RTE_SCHED_TYPE_DIRECT)
+ pkts_done += sw_schedule_dir_to_cq(sw, qid,
+ iq_num, count);
+ else if (type == RTE_SCHED_TYPE_ATOMIC)
+ pkts_done += sw_schedule_atomic_to_cq(sw, qid,
+ iq_num, count);
+ else
+ pkts_done += sw_schedule_parallel_to_cq(sw, qid,
+ iq_num, count,
+ (type == RTE_SCHED_TYPE_ORDERED));
+
+ /* Check if the IQ that was polled is now empty, and unset it
+ * in the IQ mask if its empty.
+ */
+ int all_done = (pkts_done == count);
+
+ qid->iq_pkt_mask &= ~(all_done << (iq_num));
+ pkts += pkts_done;
+ }
+
+ return pkts;
+}
+
+/* This function will perform re-ordering of packets, and injecting into
+ * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
+ * contiguous in that array, this function accepts a "range" of QIDs to scan.
+ */
+static uint16_t
+sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
+{
+ /* Perform egress reordering */
+ struct rte_event *qe;
+ uint32_t pkts_iter = 0;
+
+ for (; qid_start < qid_end; qid_start++) {
+ struct sw_qid *qid = &sw->qids[qid_start];
+ int i, num_entries_in_use;
+
+ if (qid->type != RTE_SCHED_TYPE_ORDERED)
+ continue;
+
+ num_entries_in_use = rte_ring_free_count(
+ qid->reorder_buffer_freelist);
+
+ for (i = 0; i < num_entries_in_use; i++) {
+ struct reorder_buffer_entry *entry;
+ int j;
+
+ entry = &qid->reorder_buffer[qid->reorder_buffer_index];
+
+ if (!entry->ready)
+ break;
+
+ for (j = 0; j < entry->num_fragments; j++) {
+ uint16_t dest_qid;
+ uint16_t dest_iq;
+
+ qe = &entry->fragments[entry->fragment_index + j];
+
+ dest_qid = qe->flow_id;
+ dest_iq = PRIO_TO_IQ(qe->priority);
+
+ if(dest_qid >= sw->qid_count) {
+ sw->stats.rx_dropped++;
+ continue;
+ }
+
+ struct sw_qid *dest_qid_ptr = &sw->qids[dest_qid];
+ const struct iq_ring *dest_iq_ptr = dest_qid_ptr->iq[dest_iq];
+ if (iq_ring_free_count(dest_iq_ptr) == 0)
+ break;
+
+ pkts_iter++;
+
+ struct sw_qid *q = &sw->qids[dest_qid];
+ struct iq_ring *r = q->iq[dest_iq];
+
+ /* we checked for space above, so enqueue must
+ * succeed
+ */
+ iq_ring_enqueue(r, qe);
+ q->iq_pkt_mask |= (1 << (dest_iq));
+ q->iq_pkt_count[dest_iq]++;
+ q->stats.rx_pkts++;
+ }
+
+ entry->ready = (j != entry->num_fragments);
+ entry->num_fragments -= j;
+ entry->fragment_index += j;
+
+ if (!entry->ready) {
+ entry->fragment_index = 0;
+
+ rte_ring_sp_enqueue(qid->reorder_buffer_freelist,
+ entry);
+
+ qid->reorder_buffer_index++;
+ qid->reorder_buffer_index %= qid->window_size;
+ }
+ }
+ }
+ return pkts_iter;
+}
+
+static uint32_t
+sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
+{
+ uint32_t pkts_iter = 0;
+ struct sw_port *port = &sw->ports[port_id];
+ struct qe_ring *worker = port->rx_worker_ring;
+
+ /* If shadow ring has 0 pkts, pull from worker ring */
+ if(port->pp_buf_count == 0) {
+ port->pp_buf_start = 0;
+ port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
+ RTE_DIM(port->pp_buf));
+
+ if (port->overloaded &&
+ qe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) {
+ port->overloaded = 0;
+ sw->sched_overload_counter++;
+ rte_atomic32_dec((void *)&sw->overloaded);
+ }
+ }
+
+ while (port->pp_buf_count) {
+ const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
+ struct sw_hist_list_entry *hist_entry = NULL;
+ uint8_t flags = qe->operation;
+ const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
+ int needs_reorder = 0;
+
+ static const struct reorder_buffer_entry dummy_rob;
+
+ /*
+ * if we don't have space for this packet in an IQ,
+ * then move on to next queue. Technically, for a
+ * packet that needs reordering, we don't need to check
+ * here, but it simplifies things not to special-case
+ */
+ uint32_t iq_num = PRIO_TO_IQ(qe->priority);
+ struct sw_qid *qid = &sw->qids[qe->queue_id];
+ struct iq_ring *iq_ring = qid->iq[iq_num];
+
+ if ((flags & QE_FLAG_VALID) &&
+ iq_ring_free_count(iq_ring) == 0)
+ break;
+
+ /* now process based on flags. Note that for directed
+ * queues, the enqueue_flush masks off all but the
+ * valid flag. This makes FWD and partial enqueues just
+ * NEW type, and makes DROPS no-op calls.
+ */
+ if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
+ const uint32_t hist_tail = port->hist_tail &
+ (SW_PORT_HIST_LIST - 1);
+
+ hist_entry = &port->hist_list[hist_tail];
+ const uint32_t hist_qid = hist_entry->qid;
+ const uint32_t hist_fid = hist_entry->fid;
+
+ struct sw_fid_t *fid = &sw->qids[hist_qid].fids[hist_fid];
+ fid->count -= eop;
+ if (fid->count == 0)
+ fid->cq = -1;
+
+ /* set reorder ready if an ordered QID */
+ uintptr_t rob_ptr = (uintptr_t)hist_entry->rob_entry;
+ const uintptr_t valid = (rob_ptr != 0);
+ needs_reorder = valid;
+ rob_ptr |= ((valid - 1) & (uintptr_t)&dummy_rob);
+ ((struct reorder_buffer_entry*)rob_ptr)->ready =
+ eop * needs_reorder;
+
+ port->inflights -= eop;
+ port->hist_tail += eop;
+ }
+ if (flags & QE_FLAG_VALID) {
+ port->stats.rx_pkts++;
+
+ if (needs_reorder) {
+ struct reorder_buffer_entry *rob_entry =
+ hist_entry->rob_entry;
+
+ //TODO: How do we alert the user that they've exceeded max frags?
+ if (rob_entry->num_fragments == SW_FRAGMENTS_MAX)
+ sw->stats.rx_dropped++;
+ else
+ rob_entry->fragments[rob_entry->num_fragments++] = *qe;
+ goto end_qe;
+ }
+
+ /* Use the iq_num from above to push the QE
+ * into the qid at the right priority
+ */
+
+ qid->iq_pkt_mask |= (1 << (iq_num));
+ iq_ring_enqueue(iq_ring, qe);
+ qid->iq_pkt_count[iq_num]++;
+ qid->stats.rx_pkts++;
+ pkts_iter++;
+ }
+
+ end_qe:
+ port->pp_buf_start++;
+ port->pp_buf_count--;
+ } /* while (avail_qes) */
+
+ return pkts_iter;
+}
+
+static uint32_t
+sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
+{
+ uint32_t pkts_iter = 0;
+ struct sw_port *port = &sw->ports[port_id];
+ struct qe_ring *worker = port->rx_worker_ring;
+
+ /* If shadow ring has 0 pkts, pull from worker ring */
+ if (port->pp_buf_count == 0) {
+ port->pp_buf_start = 0;
+ port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
+ RTE_DIM(port->pp_buf));
+
+ if (port->overloaded &&
+ qe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) {
+ port->overloaded = 0;
+ sw->sched_overload_counter++;
+ rte_atomic32_dec((void *)&sw->overloaded);
+ }
+ }
+
+ while (port->pp_buf_count) {
+ const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
+ uint8_t flags = qe->operation;
+
+ if ((flags & QE_FLAG_VALID) == 0)
+ goto end_qe;
+
+ uint32_t iq_num = PRIO_TO_IQ(qe->priority);
+ struct sw_qid *qid = &sw->qids[qe->queue_id];
+ struct iq_ring *iq_ring = qid->iq[iq_num];
+
+ if (iq_ring_free_count(iq_ring) == 0)
+ break; /* move to next port */
+
+ port->stats.rx_pkts++;
+
+ /* Use the iq_num from above to push the QE
+ * into the qid at the right priority
+ */
+ qid->iq_pkt_mask |= (1 << (iq_num));
+ iq_ring_enqueue(iq_ring, qe);
+ qid->iq_pkt_count[iq_num]++;
+ qid->stats.rx_pkts++;
+ pkts_iter++;
+
+ end_qe:
+ port->pp_buf_start++;
+ port->pp_buf_count--;
+ } /* while port->pp_buf_count */
+
+ return pkts_iter;
+}
+
+static uint32_t
+sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
+{
+ uint32_t pkts_iter = 0;
+ struct sw_port *port = &sw->ports[port_id];
+ struct qe_ring *worker = port->rx_worker_ring;
+
+ if (port->pp_buf_count == 0) {
+ port->pp_buf_start = 0;
+ port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
+ RTE_DIM(port->pp_buf));
+
+ if (port->overloaded &&
+ qe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) {
+ port->overloaded = 0;
+ sw->sched_overload_counter++;
+ rte_atomic32_dec((void *)&sw->overloaded);
+ }
+ }
+
+ while (port->pp_buf_count) {
+ const struct rte_event *ev = &port->pp_buf[port->pp_buf_start];
+ struct sw_hist_list_entry *hist_entry = NULL;
+ uint8_t flags = ev->operation;
+
+ /* for fragments, ignore completion
+ * NOTE: if not_eop flag is set, completion flag must
+ * also be set so we can use xor */
+ flags ^= !(flags & QE_FLAG_NOT_EOP) >>
+ (QE_FLAG_NOT_EOP_SHIFT - QE_FLAG_COMPLETE_SHIFT);
+
+ /*
+ * if we don't have space for this packet in an IQ,
+ * then move on to next queue.
+ */
+ uint32_t iq_num = PRIO_TO_IQ(ev->priority);
+ struct sw_qid *qid = &sw->qids[ev->queue_id];
+ struct iq_ring *iq_ring = qid->iq[iq_num];
+
+ if ((flags & QE_FLAG_VALID) &&
+ iq_ring_free_count(iq_ring) == 0)
+ break;
+
+ /* now process based on flags. Note that for directed
+ * queues, the enqueue_flush masks off all but the
+ * valid flag. This makes FWD and partial enqueues just
+ * NEW type, and makes DROPS no-op calls.
+ */
+ if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
+ const uint32_t hist_tail = port->hist_tail &
+ (SW_PORT_HIST_LIST - 1);
+
+ hist_entry = &port->hist_list[hist_tail];
+ const uint32_t hist_qid = hist_entry->qid;
+ const uint32_t hist_fid = hist_entry->fid;
+
+ struct sw_fid_t *fid = &sw->qids[hist_qid].fids[hist_fid];
+ fid->count--;
+ if (fid->count == 0)
+ fid->cq = -1;
+
+ port->inflights --;
+ port->hist_tail ++;
+ }
+ if (flags & QE_FLAG_VALID) {
+ port->stats.rx_pkts++;
+
+ /* Use the iq_num from above to push the QE
+ * into the qid at the right priority
+ */
+
+ qid->iq_pkt_mask |= (1 << (iq_num));
+ iq_ring_enqueue(iq_ring, ev);
+ qid->iq_pkt_count[iq_num]++;
+ qid->stats.rx_pkts++;
+ pkts_iter++;
+ }
+
+ port->pp_buf_start++;
+ port->pp_buf_count--;
+ } /* while (avail_qes) */
+
+ return pkts_iter;
+}
+
+int
+sw_event_schedule(struct rte_event_dev *dev)
+{
+ static const uint32_t num_pkts = 256;
+ struct sw_evdev *sw = (struct sw_evdev *)dev;
+ uint32_t in_pkts, out_pkts;
+ uint32_t out_pkts_total = 0, in_pkts_total = 0;
+ uint32_t i;
+
+ sw->sched_called++;
+ if (!sw->started)
+ return -1;
+
+ do {
+ uint32_t in_pkts_this_iteration = 0;
+
+ /* Pull from rx_ring for ports */
+ do {
+ in_pkts = 0;
+ for (i = 0; i < sw->port_count; i++)
+ /* TODO: use a function pointer in the port itself */
+ if (sw->ports[i].is_directed)
+ in_pkts += sw_schedule_pull_port_dir(sw, i);
+ else if (sw->ports[i].num_ordered_qids > 0)
+ in_pkts += sw_schedule_pull_port_lb(sw, i);
+ else
+ in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
+
+ /* QID scan for re-ordered */
+ in_pkts += sw_schedule_reorder(sw, 0,
+ sw->qid_count);
+ in_pkts_this_iteration += in_pkts;
+ } while (in_pkts > 0 && in_pkts_this_iteration < num_pkts);
+
+ out_pkts = 0;
+ out_pkts += sw_schedule_qid_to_cq(sw);
+ out_pkts_total += out_pkts;
+ in_pkts_total += in_pkts_this_iteration;
+
+ if (in_pkts == 0 && out_pkts == 0)
+ break;
+ } while (out_pkts_total < num_pkts);
+
+ /* push all the internal buffered QEs in port->cq_ring to the
+ * worker cores: aka, do the ring transfers batched.
+ */
+ for(i = 0; i < sw->port_count; i++) {
+ struct qe_ring *worker = sw->ports[i].cq_worker_ring;
+ qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
+ sw->ports[i].cq_buf_count,
+ &sw->cq_ring_space[i]);
+ sw->ports[i].cq_buf_count = 0;
+ }
+
+ sw->stats.tx_pkts += out_pkts_total;
+ sw->stats.rx_pkts += in_pkts_total;
+
+ sw->sched_no_iq_enqueues += (in_pkts_total == 0);
+ sw->sched_no_cq_enqueues += (out_pkts_total == 0);
+
+ return out_pkts_total;
+}
new file mode 100644
@@ -0,0 +1,218 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "sw_evdev.h"
+
+#include <rte_atomic.h>
+#include <rte_hash_crc.h>
+
+#define FLOWID_MASK (SW_QID_NUM_FIDS-1)
+
+static inline void
+sw_overload_check_and_set(struct sw_evdev *sw, struct sw_port *p,
+ uint16_t free_count)
+{
+ if (!p->overloaded &&
+ free_count < MAX_SW_PROD_Q_DEPTH - p->overload_threshold) {
+ p->overloaded = 1;
+ rte_atomic32_inc((void *)&sw->overloaded);
+ }
+}
+
+int
+sw_event_enqueue(struct rte_event_dev *dev, uint8_t port_id, struct rte_event *ev,
+ bool pin_event)
+{
+ RTE_SET_USED(pin_event);
+ uint16_t free_count;
+ struct sw_evdev *sw = (void *)dev;
+
+ if(port_id >= sw->port_count)
+ return -1;
+
+ struct sw_port *p = &sw->ports[port_id];
+ /* TODO: Concider optimization: keep port overloaded in flat array in
+ * sw instance, do a lookup and just one return branch together with
+ * port_id check above */
+ if(sw->overloaded && ev->operation == RTE_EVENT_OP_NEW)
+ return -ENOSPC;
+
+ ev->operation = sw_qe_flag_map[ev->operation];
+ const uint8_t invalid_qid = (ev[0].queue_id >= sw->qid_count);
+ ev[0].operation &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
+ /* mask flowID to valid range after a crc to jumble bits */
+ ev[0].flow_id = FLOWID_MASK & rte_hash_crc_4byte(ev[0].flow_id, -1);
+
+ if(invalid_qid) {
+ p->stats.rx_dropped++;
+ }
+
+ unsigned int num_enq = qe_ring_enqueue_burst(p->rx_worker_ring,
+ ev, 1, &free_count);
+
+ sw_overload_check_and_set(sw, p, free_count);
+
+ /* TODO: Discuss on ML and fix this inconsistency in API:
+ * num_enq is the number of packet enqueued, so
+ * 0 = no packets
+ * 1 = got a packet
+ * This is different to how currently documented in API.
+ */
+ return num_enq;
+}
+
+int
+sw_event_enqueue_burst(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event ev[], int num, bool pin_event)
+{
+ /* TODO: change enqueue API to uint32_t for num? */
+ int32_t i;
+ uint16_t free_count;
+ struct sw_evdev *sw = (void *)dev;
+
+ if(port_id >= sw->port_count)
+ return 0;
+
+ struct sw_port *p = &sw->ports[port_id];
+ RTE_SET_USED(pin_event);
+
+ for (i = 0; i < num; i++) {
+ /* optimize to two loops, with and without overload */
+ if(sw->overloaded && ev[i].operation == RTE_EVENT_OP_NEW)
+ return -ENOSPC;
+
+ ev[i].operation = sw_qe_flag_map[ev[i].operation];
+ const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
+ ev[i].operation &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
+ ev[i].flow_id = FLOWID_MASK & rte_hash_crc_4byte(ev[i].flow_id, -1);
+
+ if(invalid_qid) {
+ p->stats.rx_dropped++;
+ }
+ }
+
+ /* returns number of events actually enqueued */
+ uint32_t deq = qe_ring_enqueue_burst(p->rx_worker_ring, ev, num,
+ &free_count);
+ sw_overload_check_and_set(sw, p, free_count);
+ return deq;
+}
+
+bool
+sw_event_dequeue(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event *ev, uint64_t wait)
+{
+ RTE_SET_USED(wait);
+ struct sw_evdev *sw = (void *)dev;
+
+ if(port_id >= sw->port_count)
+ return 0;
+
+ struct sw_port *p = &sw->ports[port_id];
+ struct qe_ring *ring = p->cq_worker_ring;
+
+ /* check that all previous dequeus have been released */
+ uint16_t out_rels = p->outstanding_releases;
+ uint16_t i;
+ for(i = 0; i < out_rels; i++) {
+ sw_event_release(dev, port_id, i);
+ }
+
+ /* Intel modification: may not be in final API */
+ if(ev == 0)
+ return 0;
+
+ /* returns number of events actually dequeued, after storing */
+ uint32_t ndeq = qe_ring_dequeue_burst(ring, ev, 1);
+ p->outstanding_releases = ndeq;
+ return ndeq;
+}
+
+int
+sw_event_dequeue_burst(struct rte_event_dev *dev, uint8_t port_id,
+ struct rte_event *ev, int num, uint64_t wait)
+{
+ RTE_SET_USED(wait);
+ struct sw_evdev *sw = (void *)dev;
+
+ if(port_id >= sw->port_count)
+ return 0;
+
+ struct sw_port *p = &sw->ports[port_id];
+ struct qe_ring *ring = p->cq_worker_ring;
+
+ /* check that all previous dequeus have been released */
+ if (!p->is_directed) {
+ uint16_t out_rels = p->outstanding_releases;
+ uint16_t i;
+ for(i = 0; i < out_rels; i++) {
+ sw_event_release(dev, port_id, i);
+ }
+ }
+
+ /* Intel modification: may not be in final API */
+ if(ev == 0)
+ return 0;
+
+ /* returns number of events actually dequeued */
+ uint32_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
+ p->outstanding_releases = ndeq;
+ return ndeq;
+}
+
+void
+sw_event_release(struct rte_event_dev *dev, uint8_t port_id, uint8_t index)
+{
+ struct sw_evdev *sw = (void *)dev;
+ struct sw_port *p = &sw->ports[port_id];
+ RTE_SET_USED(p);
+ RTE_SET_USED(index);
+
+ /* This function "hints" the scheduler that packet *index* of the
+ * previous burst:
+ * (Atomic) has completed is critical section
+ * (Ordered) is ready for egress
+ *
+ * It is not mandatory to implement this functionality, but it may
+ * improve load-balancing / parallelism in the packet flows.
+ */
+
+ /* create drop message */
+ struct rte_event ev = {
+ .operation = sw_qe_flag_map[RTE_EVENT_OP_DROP],
+ };
+
+ uint16_t free_count;
+ qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
+
+ p->outstanding_releases--;
+}