[dpdk-dev,v5,07/20] event/sw: add support for event ports
Checks
Commit Message
From: Bruce Richardson <bruce.richardson@intel.com>
Add in the data-structures for the ports used by workers to send
packets to/from the scheduler. Also add in the functions to
create/destroy those ports.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
---
v5:
- Add inflights in this patch to resolve compilation issue
---
drivers/event/sw/event_ring.h | 185 ++++++++++++++++++++++++++++++++++++++++++
drivers/event/sw/sw_evdev.c | 88 ++++++++++++++++++++
drivers/event/sw/sw_evdev.h | 80 ++++++++++++++++++
3 files changed, 353 insertions(+)
create mode 100644 drivers/event/sw/event_ring.h
Comments
On Fri, Mar 24, 2017 at 04:53:02PM +0000, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richardson@intel.com>
>
> Add in the data-structures for the ports used by workers to send
> packets to/from the scheduler. Also add in the functions to
> create/destroy those ports.
>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
>
> ---
>
> v5:
> - Add inflights in this patch to resolve compilation issue
> ---
> drivers/event/sw/event_ring.h | 185 ++++++++++++++++++++++++++++++++++++++++++
> drivers/event/sw/sw_evdev.c | 88 ++++++++++++++++++++
> drivers/event/sw/sw_evdev.h | 80 ++++++++++++++++++
> 3 files changed, 353 insertions(+)
> create mode 100644 drivers/event/sw/event_ring.h
>
>
> #define EVENTDEV_NAME_SW_PMD event_sw
> #define NUMA_NODE_ARG "numa_node"
> #define SCHED_QUANTA_ARG "sched_quanta"
> #define CREDIT_QUANTA_ARG "credit_quanta"
>
> +static void
> +sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
> +
> +static int
> +sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
> + const struct rte_event_port_conf *conf)
> +{
> + struct sw_evdev *sw = sw_pmd_priv(dev);
> + struct sw_port *p = &sw->ports[port_id];
> + char buf[QE_RING_NAMESIZE];
> + unsigned int i;
> +
> + struct rte_event_dev_info info;
> + sw_info_get(dev, &info);
> +
> + uint8_t enq_oversize =
> + conf->enqueue_depth > info.max_event_port_enqueue_depth;
> + uint8_t deq_oversize =
> + conf->dequeue_depth > info.max_event_port_dequeue_depth;
> + if (enq_oversize || deq_oversize)
> + return -EINVAL;
I think, implicitly this check is addressed in rte_event_dev_configure()
and rte_event_port_setup() parameters check in common code.
If so, you can remove it.
> +
> +
> + /* detect re-configuring and return credits to instance if needed */
> + if (p->initialized) {
> + /* taking credits from pool is done one quanta at a time, and
> + * credits may be spend (counted in p->inflights) or still
> + * available in the port (p->inflight_credits). We must return
> + * the sum to no leak credits
> + */
> + int possible_inflights = p->inflight_credits + p->inflights;
> + rte_atomic32_sub(&sw->inflights, possible_inflights);
> + }
> +
> + *p = (struct sw_port){0}; /* zero entire structure */
> + p->id = port_id;
> + p->sw = sw;
> +
> + snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
> + "rx_worker_ring");
> + p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
> + dev->data->socket_id);
> + if (p->rx_worker_ring == NULL) {
> + printf("%s %d: error creating RX worker ring\n",
> + __func__, __LINE__);
s/printf/SW_LOG_ERR
> + return -1;
> + }
> +
> + p->inflight_max = conf->new_event_threshold;
> +
> + snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
> + "cq_worker_ring");
> + p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth,
> + dev->data->socket_id);
> + if (p->cq_worker_ring == NULL) {
> + qe_ring_destroy(p->rx_worker_ring);
> + printf("%s %d: error creating CQ worker ring\n",
> + __func__, __LINE__);
s/printf/SW_LOG_ERR
> + return -1;
> + }
> + sw->cq_ring_space[port_id] = conf->dequeue_depth;
> +
> + /* set hist list contents to empty */
> + for (i = 0; i < SW_PORT_HIST_LIST; i++) {
> + p->hist_list[i].fid = -1;
> + p->hist_list[i].qid = -1;
> + }
> + dev->data->ports[port_id] = p;
> + p->initialized = 1;
I think, we can add rte_smb_wmb() here to be in _very_ safer side as port
will be used by other cores after the setup().
> +
> + return 0;
> +}
> +
> +static void
> +sw_port_release(void *port)
> +{
> + struct sw_port *p = (void *)port;
> + if (p == NULL)
> + return;
> +
> + qe_ring_destroy(p->rx_worker_ring);
> + qe_ring_destroy(p->cq_worker_ring);
> + memset(p, 0, sizeof(*p));
> +}
> +
> static int32_t
> qid_init(struct sw_evdev *sw, unsigned int idx, int type,
> const struct rte_event_queue_conf *queue_conf)
> @@ -314,6 +400,8 @@ sw_probe(const char *name, const char *params)
> .queue_setup = sw_queue_setup,
> .queue_release = sw_queue_release,
> .port_def_conf = sw_port_def_conf,
> + .port_setup = sw_port_setup,
> + .port_release = sw_port_release,
> };
>
With suggested changes,
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
new file mode 100644
@@ -0,0 +1,185 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Generic ring structure for passing events from one core to another.
+ *
+ * Used by the software scheduler for the producer and consumer rings for
+ * each port, i.e. for passing events from worker cores to scheduler and
+ * vice-versa. Designed for single-producer, single-consumer use with two
+ * cores working on each ring.
+ */
+
+#ifndef _EVENT_RING_
+#define _EVENT_RING_
+
+#include <stdint.h>
+
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+
+#define QE_RING_NAMESIZE 32
+
+struct qe_ring {
+ char name[QE_RING_NAMESIZE] __rte_cache_aligned;
+ uint32_t ring_size; /* size of memory block allocated to the ring */
+ uint32_t mask; /* mask for read/write values == ring_size -1 */
+ uint32_t size; /* actual usable space in the ring */
+ volatile uint32_t write_idx __rte_cache_aligned;
+ volatile uint32_t read_idx __rte_cache_aligned;
+
+ struct rte_event ring[0] __rte_cache_aligned;
+};
+
+#ifndef force_inline
+#define force_inline inline __attribute__((always_inline))
+#endif
+
+static inline struct qe_ring *
+qe_ring_create(const char *name, unsigned int size, unsigned int socket_id)
+{
+ struct qe_ring *retval;
+ const uint32_t ring_size = rte_align32pow2(size + 1);
+ size_t memsize = sizeof(*retval) +
+ (ring_size * sizeof(retval->ring[0]));
+
+ retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id);
+ if (retval == NULL)
+ goto end;
+
+ snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name);
+ retval->ring_size = ring_size;
+ retval->mask = ring_size - 1;
+ retval->size = size;
+end:
+ return retval;
+}
+
+static inline void
+qe_ring_destroy(struct qe_ring *r)
+{
+ rte_free(r);
+}
+
+static force_inline unsigned int
+qe_ring_count(const struct qe_ring *r)
+{
+ return r->write_idx - r->read_idx;
+}
+
+static force_inline unsigned int
+qe_ring_free_count(const struct qe_ring *r)
+{
+ return r->size - qe_ring_count(r);
+}
+
+static force_inline unsigned int
+qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes,
+ unsigned int nb_qes, uint16_t *free_count)
+{
+ const uint32_t size = r->size;
+ const uint32_t mask = r->mask;
+ const uint32_t read = r->read_idx;
+ uint32_t write = r->write_idx;
+ const uint32_t space = read + size - write;
+ uint32_t i;
+
+ if (space < nb_qes)
+ nb_qes = space;
+
+ for (i = 0; i < nb_qes; i++, write++)
+ r->ring[write & mask] = qes[i];
+
+ rte_smp_wmb();
+
+ if (nb_qes != 0)
+ r->write_idx = write;
+
+ *free_count = space - nb_qes;
+
+ return nb_qes;
+}
+
+static force_inline unsigned int
+qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes,
+ unsigned int nb_qes, uint8_t *ops)
+{
+ const uint32_t size = r->size;
+ const uint32_t mask = r->mask;
+ const uint32_t read = r->read_idx;
+ uint32_t write = r->write_idx;
+ const uint32_t space = read + size - write;
+ uint32_t i;
+
+ if (space < nb_qes)
+ nb_qes = space;
+
+ for (i = 0; i < nb_qes; i++, write++) {
+ r->ring[write & mask] = qes[i];
+ r->ring[write & mask].op = ops[i];
+ }
+
+ rte_smp_wmb();
+
+ if (nb_qes != 0)
+ r->write_idx = write;
+
+ return nb_qes;
+}
+
+static force_inline unsigned int
+qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes,
+ unsigned int nb_qes)
+{
+ const uint32_t mask = r->mask;
+ uint32_t read = r->read_idx;
+ const uint32_t write = r->write_idx;
+ const uint32_t items = write - read;
+ uint32_t i;
+
+ if (items < nb_qes)
+ nb_qes = items;
+
+
+ for (i = 0; i < nb_qes; i++, read++)
+ qes[i] = r->ring[read & mask];
+
+ rte_smp_rmb();
+
+ if (nb_qes != 0)
+ r->read_idx += nb_qes;
+
+ return nb_qes;
+}
+
+#endif
@@ -39,12 +39,98 @@
#include "sw_evdev.h"
#include "iq_ring.h"
+#include "event_ring.h"
#define EVENTDEV_NAME_SW_PMD event_sw
#define NUMA_NODE_ARG "numa_node"
#define SCHED_QUANTA_ARG "sched_quanta"
#define CREDIT_QUANTA_ARG "credit_quanta"
+static void
+sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
+
+static int
+sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
+ const struct rte_event_port_conf *conf)
+{
+ struct sw_evdev *sw = sw_pmd_priv(dev);
+ struct sw_port *p = &sw->ports[port_id];
+ char buf[QE_RING_NAMESIZE];
+ unsigned int i;
+
+ struct rte_event_dev_info info;
+ sw_info_get(dev, &info);
+
+ uint8_t enq_oversize =
+ conf->enqueue_depth > info.max_event_port_enqueue_depth;
+ uint8_t deq_oversize =
+ conf->dequeue_depth > info.max_event_port_dequeue_depth;
+ if (enq_oversize || deq_oversize)
+ return -EINVAL;
+
+
+ /* detect re-configuring and return credits to instance if needed */
+ if (p->initialized) {
+ /* taking credits from pool is done one quanta at a time, and
+ * credits may be spend (counted in p->inflights) or still
+ * available in the port (p->inflight_credits). We must return
+ * the sum to no leak credits
+ */
+ int possible_inflights = p->inflight_credits + p->inflights;
+ rte_atomic32_sub(&sw->inflights, possible_inflights);
+ }
+
+ *p = (struct sw_port){0}; /* zero entire structure */
+ p->id = port_id;
+ p->sw = sw;
+
+ snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
+ "rx_worker_ring");
+ p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
+ dev->data->socket_id);
+ if (p->rx_worker_ring == NULL) {
+ printf("%s %d: error creating RX worker ring\n",
+ __func__, __LINE__);
+ return -1;
+ }
+
+ p->inflight_max = conf->new_event_threshold;
+
+ snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
+ "cq_worker_ring");
+ p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth,
+ dev->data->socket_id);
+ if (p->cq_worker_ring == NULL) {
+ qe_ring_destroy(p->rx_worker_ring);
+ printf("%s %d: error creating CQ worker ring\n",
+ __func__, __LINE__);
+ return -1;
+ }
+ sw->cq_ring_space[port_id] = conf->dequeue_depth;
+
+ /* set hist list contents to empty */
+ for (i = 0; i < SW_PORT_HIST_LIST; i++) {
+ p->hist_list[i].fid = -1;
+ p->hist_list[i].qid = -1;
+ }
+ dev->data->ports[port_id] = p;
+ p->initialized = 1;
+
+ return 0;
+}
+
+static void
+sw_port_release(void *port)
+{
+ struct sw_port *p = (void *)port;
+ if (p == NULL)
+ return;
+
+ qe_ring_destroy(p->rx_worker_ring);
+ qe_ring_destroy(p->cq_worker_ring);
+ memset(p, 0, sizeof(*p));
+}
+
static int32_t
qid_init(struct sw_evdev *sw, unsigned int idx, int type,
const struct rte_event_queue_conf *queue_conf)
@@ -314,6 +400,8 @@ sw_probe(const char *name, const char *params)
.queue_setup = sw_queue_setup,
.queue_release = sw_queue_release,
.port_def_conf = sw_port_def_conf,
+ .port_setup = sw_port_setup,
+ .port_release = sw_port_release,
};
static const char *const args[] = {
@@ -49,6 +49,13 @@
#define MAX_SW_PROD_Q_DEPTH 4096
#define SW_FRAGMENTS_MAX 16
+/* report dequeue burst sizes in buckets */
+#define SW_DEQ_STAT_BUCKET_SHIFT 2
+/* how many packets pulled from port by sched */
+#define SCHED_DEQUEUE_BURST_SIZE 32
+
+#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */
+
#define EVENTDEV_NAME_SW_PMD event_sw
#define SW_PMD_NAME RTE_STR(event_sw)
@@ -129,12 +136,82 @@ struct sw_qid {
uint8_t priority;
};
+struct sw_hist_list_entry {
+ int32_t qid;
+ int32_t fid;
+ struct reorder_buffer_entry *rob_entry;
+};
+
+struct sw_evdev;
+
+struct sw_port {
+ /* new enqueue / dequeue API doesn't have an instance pointer, only the
+ * pointer to the port being enqueue/dequeued from
+ */
+ struct sw_evdev *sw;
+
+ /* set when the port is initialized */
+ uint8_t initialized;
+ /* A numeric ID for the port */
+ uint8_t id;
+
+ int16_t is_directed; /** Takes from a single directed QID */
+ /**
+ * For loadbalanced we can optimise pulling packets from
+ * producers if there is no reordering involved
+ */
+ int16_t num_ordered_qids;
+
+ /** Ring and buffer for pulling events from workers for scheduling */
+ struct qe_ring *rx_worker_ring __rte_cache_aligned;
+ /** Ring and buffer for pushing packets to workers after scheduling */
+ struct qe_ring *cq_worker_ring;
+
+ /* hole */
+
+ /* num releases yet to be completed on this port */
+ uint16_t outstanding_releases __rte_cache_aligned;
+ uint16_t inflight_max; /* app requested max inflights for this port */
+ uint16_t inflight_credits; /* num credits this port has right now */
+
+ uint16_t last_dequeue_burst_sz; /* how big the burst was */
+ uint64_t last_dequeue_ticks; /* used to track burst processing time */
+ uint64_t avg_pkt_ticks; /* tracks average over NUM_SAMPLES burst */
+ uint64_t total_polls; /* how many polls were counted in stats */
+ uint64_t zero_polls; /* tracks polls returning nothing */
+ uint32_t poll_buckets[MAX_SW_CONS_Q_DEPTH >> SW_DEQ_STAT_BUCKET_SHIFT];
+ /* bucket values in 4s for shorter reporting */
+
+ /* History list structs, containing info on pkts egressed to worker */
+ uint16_t hist_head __rte_cache_aligned;
+ uint16_t hist_tail;
+ uint16_t inflights;
+ struct sw_hist_list_entry hist_list[SW_PORT_HIST_LIST];
+
+ /* track packets in and out of this port */
+ struct sw_point_stats stats;
+
+
+ uint32_t pp_buf_start;
+ uint32_t pp_buf_count;
+ uint16_t cq_buf_count;
+ struct rte_event pp_buf[SCHED_DEQUEUE_BURST_SIZE];
+ struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH];
+
+ uint8_t num_qids_mapped;
+};
+
struct sw_evdev {
struct rte_eventdev_data *data;
uint32_t port_count;
uint32_t qid_count;
+ /* Contains all ports - load balanced and directed */
+ struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned;
+
+ rte_atomic32_t inflights __rte_cache_aligned;
+
/*
* max events in this instance. Cached here for performance.
* (also available in data->conf.nb_events_limit)
@@ -144,6 +221,9 @@ struct sw_evdev {
/* Internal queues - one per logical queue */
struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+ /* Cache how many packets are in each cq */
+ uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned;
+
int32_t sched_quanta;
uint32_t credit_update_quanta;