@@ -53,6 +53,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
# library source files
SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
# export include files
SYMLINK-y-include +=
@@ -485,6 +485,10 @@ sw_probe(const char *name, const char *params)
return -EFAULT;
}
dev->dev_ops = &evdev_sw_ops;
+ dev->enqueue = sw_event_enqueue;
+ dev->enqueue_burst = sw_event_enqueue_burst;
+ dev->dequeue = sw_event_dequeue;
+ dev->dequeue_burst = sw_event_dequeue_burst;
sw = dev->data->dev_private;
sw->data = dev->data;
@@ -52,10 +52,35 @@
#define SW_DEQ_STAT_BUCKET_SHIFT 2 /* report dequeue burst sizes in buckets */
#define SCHED_DEQUEUE_BURST_SIZE 32 /* how many packets pulled from port by sched */
#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */
+#define NUM_SAMPLES 64 /* how many data points use for average stats */
/* have a new scheduling type for 1:1 queue to port links */
#define RTE_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+enum {
+ QE_FLAG_VALID_SHIFT = 0,
+ QE_FLAG_COMPLETE_SHIFT,
+ QE_FLAG_NOT_EOP_SHIFT,
+ _QE_FLAG_COUNT
+};
+
+#define QE_FLAG_VALID (1 << QE_FLAG_VALID_SHIFT) /* set for NEW, FWD, FRAG */
+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP */
+#define QE_FLAG_NOT_EOP (1 << QE_FLAG_NOT_EOP_SHIFT) /* set for FRAG only */
+
+static const uint8_t sw_qe_flag_map[] = {
+ QE_FLAG_VALID /* NEW Event */,
+ QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */,
+ QE_FLAG_COMPLETE /* RELEASE Event */,
+
+ /* Values which can be used for future support for partial
+ * events, i.e. where one event comes back to the scheduler
+ * as multiple which need to be tracked together
+ */
+ QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,
+};
+
+
#ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG
#define SW_LOG_INFO(fmt, args...) \
RTE_LOG(INFO, PMD, "[%s] %s() line %u: " fmt "\n", \
@@ -229,4 +254,12 @@ sw_pmd_priv_const(const struct rte_eventdev *eventdev)
extern int sched_quanta;
+uint16_t sw_event_enqueue(void *port, const struct rte_event *ev);
+uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[],
+ uint16_t num);
+
+uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
+ uint64_t wait);
+
#endif /* _SW_EVDEV_H_ */
new file mode 100644
@@ -0,0 +1,169 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+
+#include "sw_evdev.h"
+#include "event_ring.h"
+
+#define PORT_ENQUEUE_MAX_BURST_SIZE 64
+
+static inline void
+sw_event_release(struct sw_port *p, uint8_t index)
+{
+ /*
+ * Drops the next outstanding event in our history. Used on dequeue
+ * to clear any history before dequeuing more events.
+ */
+ RTE_SET_USED(index);
+
+ /* create drop message */
+ struct rte_event ev = {
+ .op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
+ };
+
+ uint16_t free_count;
+ qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
+
+ p->outstanding_releases--;
+}
+
+uint16_t
+sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
+{
+ int32_t i;
+ uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
+ struct sw_port *p = port;
+ struct sw_evdev *sw = (void *)p->sw;
+
+ uint32_t quantas_avail = *(uint32_t *)(&sw->inflight_quanta);
+ uint32_t credits_avail = quantas_avail * SW_INFLIGHT_QUANTA_SIZE;
+ uint32_t sw_inflights = (sw->nb_events_limit - credits_avail);
+
+ if (p->inflight_max < sw_inflights)
+ return 0;
+ if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
+ num = PORT_ENQUEUE_MAX_BURST_SIZE;
+
+ /* request SW_INFLIGHT_QUANTA_SIZE more credits if needed */
+ while (p->inflight_credits < num) {
+ int failed_to_get = rte_atomic32_dec_and_test(&sw->inflight_quanta);
+ if (failed_to_get)
+ num = p->inflight_credits;
+ else
+ p->inflight_credits += (SW_INFLIGHT_QUANTA_SIZE);
+ }
+
+ for (i = 0; i < num; i++) {
+ int op = ev[i].op;
+ p->inflight_credits -= (op != RTE_EVENT_OP_RELEASE);
+ new_ops[i] = sw_qe_flag_map[op];
+ const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
+ new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
+ if ((new_ops[i] & QE_FLAG_COMPLETE) && p->outstanding_releases)
+ p->outstanding_releases--;
+
+ if (invalid_qid)
+ p->stats.rx_dropped++;
+ }
+
+ /* returns number of events actually enqueued */
+ uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
+ new_ops);
+ if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
+ uint64_t burst_ticks = rte_get_timer_cycles() -
+ p->last_dequeue_ticks;
+ uint64_t burst_pkt_ticks = burst_ticks / p->last_dequeue_burst_sz;
+ p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES;
+ p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
+ p->last_dequeue_ticks = 0;
+ }
+ return enq;
+}
+
+uint16_t
+sw_event_enqueue(void *port, const struct rte_event *ev)
+{
+ return sw_event_enqueue_burst(port, ev, 1);
+}
+
+uint16_t
+sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
+ uint64_t wait)
+{
+ RTE_SET_USED(wait);
+ struct sw_port *p = (void *)port;
+ struct qe_ring *ring = p->cq_worker_ring;
+
+ /* check that all previous dequeues have been released */
+ if (!p->is_directed) {
+ uint16_t out_rels = p->outstanding_releases;
+ uint16_t i;
+ for (i = 0; i < out_rels; i++)
+ sw_event_release(p, i);
+ }
+
+ /* Intel modification: may not be in final API */
+ if (ev == 0)
+ return 0;
+
+ /* returns number of events actually dequeued */
+ uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
+ if (ndeq == 0) {
+ p->outstanding_releases = 0;
+ p->zero_polls++;
+ p->total_polls++;
+ goto end;
+ }
+
+ p->inflight_credits += ndeq;
+ p->outstanding_releases = ndeq;
+ p->last_dequeue_burst_sz = ndeq;
+ p->last_dequeue_ticks = rte_get_timer_cycles();
+ p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
+ p->total_polls++;
+
+end:
+ if (p->inflight_credits >= SW_INFLIGHT_QUANTA_SIZE * 2 &&
+ p->inflight_credits > SW_INFLIGHT_QUANTA_SIZE + ndeq) {
+ rte_atomic32_inc(&p->sw->inflight_quanta);
+ p->inflight_credits -= SW_INFLIGHT_QUANTA_SIZE;
+ }
+ return ndeq;
+}
+
+uint16_t
+sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait)
+{
+ return sw_event_dequeue_burst(port, ev, 1, wait);
+}