[dpdk-dev] [PATCH 2/2] event/sw: use dynamically-sized IQs

Gage Eads gage.eads at intel.com
Thu Nov 30 04:08:34 CET 2017


This commit introduces dynamically-sized IQs, by switching the underlying
data structure from a fixed-size ring to a linked list of queue 'chunks.'
This has a number of benefits:
- Certain corner cases were observed in which all of a pipeline's flows
  could be pinned to one port for extended periods, effectively turning a
  multi-core pipeline into single-core one. This was caused by an event
  producer having a larger new_event_threshold than the IQ depth, and
  injecting large numbers of packets that are ultimately backpressured in a
  worker's rx_ring, causing those packets' flows to be scheduled to that
  port.
  The dynamically sized IQ does not have this problem because each IQ can
  grow large enough to store all the system's events, such that
  backpressure will not reach the worker_ring.
- Slight performance improvement (~1-2%) in high throughput scenarios,
  tested with eventdev_pipeline_sw_pmd.

This implementation has a small increase in the queue storage memory
footprint (~70KB). This commit also removes the iq_size xstat, which no
longer applies to this implementation.

Signed-off-by: Gage Eads <gage.eads at intel.com>
---
 drivers/event/sw/iq_chunk.h           | 212 ++++++++++++++++++++++++++++++++++
 drivers/event/sw/iq_ring.h            | 172 ---------------------------
 drivers/event/sw/sw_evdev.c           |  59 ++++++----
 drivers/event/sw/sw_evdev.h           |  16 ++-
 drivers/event/sw/sw_evdev_scheduler.c |  40 +++----
 drivers/event/sw/sw_evdev_xstats.c    |  12 +-
 test/test/test_eventdev_sw.c          |  15 +--
 7 files changed, 288 insertions(+), 238 deletions(-)
 create mode 100644 drivers/event/sw/iq_chunk.h
 delete mode 100644 drivers/event/sw/iq_ring.h

diff --git a/drivers/event/sw/iq_chunk.h b/drivers/event/sw/iq_chunk.h
new file mode 100644
index 0000000..29f5a35
--- /dev/null
+++ b/drivers/event/sw/iq_chunk.h
@@ -0,0 +1,212 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _IQ_CHUNK_H_
+#define _IQ_CHUNK_H_
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <rte_eventdev.h>
+
+#define IQ_ROB_NAMESIZE 12
+
+struct sw_queue_chunk {
+	struct rte_event events[SW_EVS_PER_Q_CHUNK];
+	struct sw_queue_chunk *next;
+} __rte_cache_aligned;
+
+static __rte_always_inline bool
+iq_empty(struct sw_iq *iq)
+{
+	return (iq->count == 0);
+}
+
+static __rte_always_inline uint16_t
+iq_count(const struct sw_iq *iq)
+{
+	return iq->count;
+}
+
+static __rte_always_inline struct sw_queue_chunk *
+iq_alloc_chunk(struct sw_evdev *sw)
+{
+	struct sw_queue_chunk *chunk = sw->chunk_list_head;
+	sw->chunk_list_head = chunk->next;
+	chunk->next = NULL;
+	return chunk;
+}
+
+static __rte_always_inline void
+iq_free_chunk(struct sw_evdev *sw, struct sw_queue_chunk *chunk)
+{
+	chunk->next = sw->chunk_list_head;
+	sw->chunk_list_head = chunk;
+}
+
+static __rte_always_inline void
+iq_init(struct sw_evdev *sw, struct sw_iq *iq)
+{
+	iq->head = iq_alloc_chunk(sw);
+	iq->tail = iq->head;
+	iq->head_idx = 0;
+	iq->tail_idx = 0;
+}
+
+static __rte_always_inline void
+iq_enqueue(struct sw_evdev *sw, struct sw_iq *iq, const struct rte_event *ev)
+{
+	iq->tail->events[iq->tail_idx++] = *ev;
+	iq->count++;
+
+	if (unlikely(iq->tail_idx == SW_EVS_PER_Q_CHUNK)) {
+		/* The number of chunks is defined in relation to the total
+		 * number of inflight events and number of IQS such that
+		 * allocation will always succeed.
+		 */
+		struct sw_queue_chunk *chunk = iq_alloc_chunk(sw);
+		iq->tail->next = chunk;
+		iq->tail = chunk;
+		iq->tail_idx = 0;
+	}
+}
+
+static __rte_always_inline void
+iq_pop(struct sw_evdev *sw, struct sw_iq *iq)
+{
+	iq->head_idx++;
+	iq->count--;
+
+	if (unlikely(iq->head_idx == SW_EVS_PER_Q_CHUNK)) {
+		struct sw_queue_chunk *next = iq->head->next;
+		iq_free_chunk(sw, iq->head);
+		iq->head = next;
+		iq->head_idx = 0;
+	}
+}
+
+static __rte_always_inline const struct rte_event *
+iq_peek(struct sw_iq *iq)
+{
+	return &iq->head->events[iq->head_idx];
+}
+
+/* Note: the caller must ensure that count <= iq_count() */
+static __rte_always_inline uint16_t
+iq_dequeue_burst(struct sw_evdev *sw,
+		 struct sw_iq *iq,
+		 struct rte_event *ev,
+		 uint16_t count)
+{
+	struct sw_queue_chunk *current;
+	uint16_t total, index;
+
+	count = RTE_MIN(count, iq_count(iq));
+
+	current = iq->head;
+	index = iq->head_idx;
+	total = 0;
+
+	/* Loop over the chunks */
+	while (1) {
+		struct sw_queue_chunk *next;
+		for (; index < SW_EVS_PER_Q_CHUNK;) {
+			ev[total++] = current->events[index++];
+
+			if (unlikely(total == count))
+				goto done;
+		}
+
+		/* Move to the next chunk */
+		next = current->next;
+		iq_free_chunk(sw, current);
+		current = next;
+		index = 0;
+	}
+
+done:
+	if (unlikely(index == SW_EVS_PER_Q_CHUNK)) {
+		struct sw_queue_chunk *next = iq->head->next;
+		iq_free_chunk(sw, current);
+		iq->head = next;
+		iq->head_idx = 0;
+	} else {
+		iq->head = current;
+		iq->head_idx = index;
+	}
+
+	iq->count -= total;
+
+	return total;
+}
+
+static __rte_always_inline void
+iq_put_back(struct sw_evdev *sw,
+	    struct sw_iq *iq,
+	    struct rte_event *ev,
+	    unsigned int count)
+{
+	/* Put back events that fit in the current head chunk. If necessary,
+	 * put back events in a new head chunk. The caller must ensure that
+	 * count <= SW_EVS_PER_Q_CHUNK, to ensure that at most one new head is
+	 * needed.
+	 */
+	uint16_t avail_space = iq->head_idx;
+
+	if (avail_space >= count) {
+		const uint16_t idx = avail_space - count;
+		uint16_t i;
+
+		for (i = 0; i < count; i++)
+			iq->head->events[idx + i] = ev[i];
+
+		iq->head_idx = idx;
+	} else if (avail_space < count) {
+		const uint16_t remaining = count - avail_space;
+		struct sw_queue_chunk *new_head;
+		uint16_t i;
+
+		for (i = 0; i < avail_space; i++)
+			iq->head->events[i] = ev[remaining + i];
+
+		new_head = iq_alloc_chunk(sw);
+		new_head->next = iq->head;
+		iq->head = new_head;
+		iq->head_idx = SW_EVS_PER_Q_CHUNK - remaining;
+
+		for (i = 0; i < remaining; i++)
+			iq->head->events[iq->head_idx + i] = ev[i];
+	}
+
+	iq->count += count;
+}
+
+#endif /* _IQ_CHUNK_H_ */
diff --git a/drivers/event/sw/iq_ring.h b/drivers/event/sw/iq_ring.h
deleted file mode 100644
index 64cf678..0000000
--- a/drivers/event/sw/iq_ring.h
+++ /dev/null
@@ -1,172 +0,0 @@
-/*-
- *   BSD LICENSE
- *
- *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
- *
- *   Redistribution and use in source and binary forms, with or without
- *   modification, are permitted provided that the following conditions
- *   are met:
- *
- *     * Redistributions of source code must retain the above copyright
- *       notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above copyright
- *       notice, this list of conditions and the following disclaimer in
- *       the documentation and/or other materials provided with the
- *       distribution.
- *     * Neither the name of Intel Corporation nor the names of its
- *       contributors may be used to endorse or promote products derived
- *       from this software without specific prior written permission.
- *
- *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-/*
- * Ring structure definitions used for the internal ring buffers of the
- * SW eventdev implementation. These are designed for single-core use only.
- */
-#ifndef _IQ_RING_
-#define _IQ_RING_
-
-#include <stdint.h>
-
-#include <rte_common.h>
-#include <rte_memory.h>
-#include <rte_malloc.h>
-#include <rte_eventdev.h>
-
-#define IQ_RING_NAMESIZE 12
-#define QID_IQ_DEPTH 512
-#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1)
-
-struct iq_ring {
-	char name[IQ_RING_NAMESIZE] __rte_cache_aligned;
-	uint16_t write_idx;
-	uint16_t read_idx;
-
-	struct rte_event ring[QID_IQ_DEPTH];
-};
-
-static inline struct iq_ring *
-iq_ring_create(const char *name, unsigned int socket_id)
-{
-	struct iq_ring *retval;
-
-	retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id);
-	if (retval == NULL)
-		goto end;
-
-	snprintf(retval->name, sizeof(retval->name), "%s", name);
-	retval->write_idx = retval->read_idx = 0;
-end:
-	return retval;
-}
-
-static inline void
-iq_ring_destroy(struct iq_ring *r)
-{
-	rte_free(r);
-}
-
-static __rte_always_inline uint16_t
-iq_ring_count(const struct iq_ring *r)
-{
-	return r->write_idx - r->read_idx;
-}
-
-static __rte_always_inline uint16_t
-iq_ring_free_count(const struct iq_ring *r)
-{
-	return QID_IQ_MASK - iq_ring_count(r);
-}
-
-static __rte_always_inline uint16_t
-iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
-{
-	const uint16_t read = r->read_idx;
-	uint16_t write = r->write_idx;
-	const uint16_t space = read + QID_IQ_MASK - write;
-	uint16_t i;
-
-	if (space < nb_qes)
-		nb_qes = space;
-
-	for (i = 0; i < nb_qes; i++, write++)
-		r->ring[write & QID_IQ_MASK] = qes[i];
-
-	r->write_idx = write;
-
-	return nb_qes;
-}
-
-static __rte_always_inline uint16_t
-iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
-{
-	uint16_t read = r->read_idx;
-	const uint16_t write = r->write_idx;
-	const uint16_t items = write - read;
-	uint16_t i;
-
-	for (i = 0; i < nb_qes; i++, read++)
-		qes[i] = r->ring[read & QID_IQ_MASK];
-
-	if (items < nb_qes)
-		nb_qes = items;
-
-	r->read_idx += nb_qes;
-
-	return nb_qes;
-}
-
-/* assumes there is space, from a previous dequeue_burst */
-static __rte_always_inline uint16_t
-iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
-{
-	uint16_t i, read = r->read_idx;
-
-	for (i = nb_qes; i-- > 0; )
-		r->ring[--read & QID_IQ_MASK] = qes[i];
-
-	r->read_idx = read;
-	return nb_qes;
-}
-
-static __rte_always_inline const struct rte_event *
-iq_ring_peek(const struct iq_ring *r)
-{
-	return &r->ring[r->read_idx & QID_IQ_MASK];
-}
-
-static __rte_always_inline void
-iq_ring_pop(struct iq_ring *r)
-{
-	r->read_idx++;
-}
-
-static __rte_always_inline int
-iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe)
-{
-	const uint16_t read = r->read_idx;
-	const uint16_t write = r->write_idx;
-	const uint16_t space = read + QID_IQ_MASK - write;
-
-	if (space == 0)
-		return -1;
-
-	r->ring[write & QID_IQ_MASK] = *qe;
-
-	r->write_idx = write + 1;
-
-	return 0;
-}
-
-#endif
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index 5d03b7b..9d91ab8 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -42,7 +42,7 @@
 #include <rte_service_component.h>
 
 #include "sw_evdev.h"
-#include "iq_ring.h"
+#include "iq_chunk.h"
 
 #define EVENTDEV_NAME_SW_PMD event_sw
 #define NUMA_NODE_ARG "numa_node"
@@ -242,17 +242,11 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type,
 	unsigned int i;
 	int dev_id = sw->data->dev_id;
 	int socket_id = sw->data->socket_id;
-	char buf[IQ_RING_NAMESIZE];
+	char buf[IQ_ROB_NAMESIZE];
 	struct sw_qid *qid = &sw->qids[idx];
 
-	for (i = 0; i < SW_IQS_MAX; i++) {
-		snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i);
-		qid->iq[i] = iq_ring_create(buf, socket_id);
-		if (!qid->iq[i]) {
-			SW_LOG_DBG("ring create failed");
-			goto cleanup;
-		}
-	}
+	for (i = 0; i < SW_IQS_MAX; i++)
+		iq_init(sw, &qid->iq[i]);
 
 	/* Initialize the FID structures to no pinning (-1), and zero packets */
 	const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
@@ -332,8 +326,8 @@ qid_init(struct sw_evdev *sw, unsigned int idx, int type,
 
 cleanup:
 	for (i = 0; i < SW_IQS_MAX; i++) {
-		if (qid->iq[i])
-			iq_ring_destroy(qid->iq[i]);
+		if (qid->iq[i].head)
+			iq_free_chunk(sw, qid->iq[i].head);
 	}
 
 	if (qid->reorder_buffer) {
@@ -356,8 +350,11 @@ sw_queue_release(struct rte_eventdev *dev, uint8_t id)
 	struct sw_qid *qid = &sw->qids[id];
 	uint32_t i;
 
-	for (i = 0; i < SW_IQS_MAX; i++)
-		iq_ring_destroy(qid->iq[i]);
+	for (i = 0; i < SW_IQS_MAX; i++) {
+		if (!qid->iq[i].head)
+			continue;
+		iq_free_chunk(sw, qid->iq[i].head);
+	}
 
 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
 		rte_free(qid->reorder_buffer);
@@ -439,12 +436,33 @@ sw_dev_configure(const struct rte_eventdev *dev)
 	struct sw_evdev *sw = sw_pmd_priv(dev);
 	const struct rte_eventdev_data *data = dev->data;
 	const struct rte_event_dev_config *conf = &data->dev_conf;
+	int num_chunks, i;
 
 	sw->qid_count = conf->nb_event_queues;
 	sw->port_count = conf->nb_event_ports;
 	sw->nb_events_limit = conf->nb_events_limit;
 	rte_atomic32_set(&sw->inflights, 0);
 
+	/* Number of chunks sized for worst-case spread of events across IQs */
+	num_chunks = ((SW_INFLIGHT_EVENTS_TOTAL/SW_EVS_PER_Q_CHUNK)+1) +
+			sw->qid_count*SW_IQS_MAX*2;
+
+	/* If this is a reconfiguration, free the previous IQ allocation */
+	if (sw->chunks)
+		rte_free(sw->chunks);
+
+	sw->chunks = rte_malloc_socket(NULL,
+				       sizeof(struct sw_queue_chunk) *
+				       num_chunks,
+				       0,
+				       sw->data->socket_id);
+	if (!sw->chunks)
+		return -ENOMEM;
+
+	sw->chunk_list_head = NULL;
+	for (i = 0; i < num_chunks; i++)
+		iq_free_chunk(sw, &sw->chunks[i]);
+
 	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
 		return -ENOTSUP;
 
@@ -618,17 +636,16 @@ sw_dump(struct rte_eventdev *dev, FILE *f)
 		uint32_t iq;
 		uint32_t iq_printed = 0;
 		for (iq = 0; iq < SW_IQS_MAX; iq++) {
-			if (!qid->iq[iq]) {
+			if (!qid->iq[iq].head) {
 				fprintf(f, "\tiq %d is not initialized.\n", iq);
 				iq_printed = 1;
 				continue;
 			}
-			uint32_t used = iq_ring_count(qid->iq[iq]);
-			uint32_t free = iq_ring_free_count(qid->iq[iq]);
-			const char *col = (free == 0) ? COL_RED : COL_RESET;
+			uint32_t used = iq_count(&qid->iq[iq]);
+			const char *col = COL_RESET;
 			if (used > 0) {
-				fprintf(f, "\t%siq %d: Used %d\tFree %d"
-					COL_RESET"\n", col, iq, used, free);
+				fprintf(f, "\t%siq %d: Used %d"
+					COL_RESET"\n", col, iq, used);
 				iq_printed = 1;
 			}
 		}
@@ -657,7 +674,7 @@ sw_start(struct rte_eventdev *dev)
 
 	/* check all queues are configured and mapped to ports*/
 	for (i = 0; i < sw->qid_count; i++)
-		if (sw->qids[i].iq[0] == NULL ||
+		if (sw->qids[i].iq[0].head == NULL ||
 				sw->qids[i].cq_num_mapped_cqs == 0) {
 			SW_LOG_ERR("Queue %d not configured\n", i);
 			return -ENOLINK;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index e0dec91..0859388 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -49,6 +49,10 @@
 #define MAX_SW_PROD_Q_DEPTH 4096
 #define SW_FRAGMENTS_MAX 16
 
+/* Should be power-of-two minus one, to leave room for the next pointer */
+#define SW_EVS_PER_Q_CHUNK 255
+#define SW_Q_CHUNK_SIZE ((SW_EVS_PER_Q_CHUNK + 1) * sizeof(struct rte_event))
+
 /* report dequeue burst sizes in buckets */
 #define SW_DEQ_STAT_BUCKET_SHIFT 2
 /* how many packets pulled from port by sched */
@@ -130,6 +134,14 @@ struct reorder_buffer_entry {
 	struct rte_event fragments[SW_FRAGMENTS_MAX];
 };
 
+struct sw_iq {
+	struct sw_queue_chunk *head;
+	struct sw_queue_chunk *tail;
+	uint16_t head_idx;
+	uint16_t tail_idx;
+	uint16_t count;
+};
+
 struct sw_qid {
 	/* set when the QID has been initialized */
 	uint8_t initialized;
@@ -142,7 +154,7 @@ struct sw_qid {
 	struct sw_point_stats stats;
 
 	/* Internal priority rings for packets */
-	struct iq_ring *iq[SW_IQS_MAX];
+	struct sw_iq iq[SW_IQS_MAX];
 	uint32_t iq_pkt_mask; /* A mask to indicate packets in an IQ */
 	uint64_t iq_pkt_count[SW_IQS_MAX];
 
@@ -253,6 +265,8 @@ struct sw_evdev {
 
 	/* Internal queues - one per logical queue */
 	struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+	struct sw_queue_chunk *chunk_list_head;
+	struct sw_queue_chunk *chunks;
 
 	/* Cache how many packets are in each cq */
 	uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned;
diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c
index 8a2c9d4..7ea7d0f 100644
--- a/drivers/event/sw/sw_evdev_scheduler.c
+++ b/drivers/event/sw/sw_evdev_scheduler.c
@@ -34,7 +34,7 @@
 #include <rte_hash_crc.h>
 #include <rte_event_ring.h>
 #include "sw_evdev.h"
-#include "iq_ring.h"
+#include "iq_chunk.h"
 
 #define SW_IQS_MASK (SW_IQS_MAX-1)
 
@@ -71,7 +71,7 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 	 */
 	uint32_t qid_id = qid->id;
 
-	iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
+	iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
 	for (i = 0; i < count; i++) {
 		const struct rte_event *qe = &qes[i];
 		const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
@@ -130,7 +130,7 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 			p->cq_buf_count = 0;
 		}
 	}
-	iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
+	iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
 
 	return count - nb_blocked;
 }
@@ -156,7 +156,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 				rte_ring_count(qid->reorder_buffer_freelist));
 
 	for (i = 0; i < count; i++) {
-		const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
+		const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
 		uint32_t cq_check_count = 0;
 		uint32_t cq;
 
@@ -193,7 +193,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 					(void *)&p->hist_list[head].rob_entry);
 
 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
-		iq_ring_pop(qid->iq[iq_num]);
+		iq_pop(sw, &qid->iq[iq_num]);
 
 		rte_compiler_barrier();
 		p->inflights++;
@@ -218,8 +218,8 @@ sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 		return 0;
 
 	/* burst dequeue from the QID IQ ring */
-	struct iq_ring *ring = qid->iq[iq_num];
-	uint32_t ret = iq_ring_dequeue_burst(ring,
+	struct sw_iq *iq = &qid->iq[iq_num];
+	uint32_t ret = iq_dequeue_burst(sw, iq,
 			&port->cq_buf[port->cq_buf_count], count_free);
 	port->cq_buf_count += ret;
 
@@ -252,7 +252,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw)
 			continue;
 
 		uint32_t pkts_done = 0;
-		uint32_t count = iq_ring_count(qid->iq[iq_num]);
+		uint32_t count = iq_count(&qid->iq[iq_num]);
 
 		if (count > 0) {
 			if (type == SW_SCHED_TYPE_DIRECT)
@@ -324,22 +324,15 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
 					continue;
 				}
 
-				struct sw_qid *dest_qid_ptr =
-					&sw->qids[dest_qid];
-				const struct iq_ring *dest_iq_ptr =
-					dest_qid_ptr->iq[dest_iq];
-				if (iq_ring_free_count(dest_iq_ptr) == 0)
-					break;
-
 				pkts_iter++;
 
 				struct sw_qid *q = &sw->qids[dest_qid];
-				struct iq_ring *r = q->iq[dest_iq];
+				struct sw_iq *iq = &q->iq[dest_iq];
 
 				/* we checked for space above, so enqueue must
 				 * succeed
 				 */
-				iq_ring_enqueue(r, qe);
+				iq_enqueue(sw, iq, qe);
 				q->iq_pkt_mask |= (1 << (dest_iq));
 				q->iq_pkt_count[dest_iq]++;
 				q->stats.rx_pkts++;
@@ -404,10 +397,6 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
 		struct sw_qid *qid = &sw->qids[qe->queue_id];
 
-		if ((flags & QE_FLAG_VALID) &&
-				iq_ring_free_count(qid->iq[iq_num]) == 0)
-			break;
-
 		/* now process based on flags. Note that for directed
 		 * queues, the enqueue_flush masks off all but the
 		 * valid flag. This makes FWD and PARTIAL enqueues just
@@ -471,7 +460,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
 			 */
 
 			qid->iq_pkt_mask |= (1 << (iq_num));
-			iq_ring_enqueue(qid->iq[iq_num], qe);
+			iq_enqueue(sw, &qid->iq[iq_num], qe);
 			qid->iq_pkt_count[iq_num]++;
 			qid->stats.rx_pkts++;
 			pkts_iter++;
@@ -516,10 +505,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
 
 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
 		struct sw_qid *qid = &sw->qids[qe->queue_id];
-		struct iq_ring *iq_ring = qid->iq[iq_num];
-
-		if (iq_ring_free_count(iq_ring) == 0)
-			break; /* move to next port */
+		struct sw_iq *iq = &qid->iq[iq_num];
 
 		port->stats.rx_pkts++;
 
@@ -527,7 +513,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
 		 * into the qid at the right priority
 		 */
 		qid->iq_pkt_mask |= (1 << (iq_num));
-		iq_ring_enqueue(iq_ring, qe);
+		iq_enqueue(sw, iq, qe);
 		qid->iq_pkt_count[iq_num]++;
 		qid->stats.rx_pkts++;
 		pkts_iter++;
diff --git a/drivers/event/sw/sw_evdev_xstats.c b/drivers/event/sw/sw_evdev_xstats.c
index 61a5c33..4917f24 100644
--- a/drivers/event/sw/sw_evdev_xstats.c
+++ b/drivers/event/sw/sw_evdev_xstats.c
@@ -32,7 +32,7 @@
 
 #include <rte_event_ring.h>
 #include "sw_evdev.h"
-#include "iq_ring.h"
+#include "iq_chunk.h"
 
 enum xstats_type {
 	/* common stats */
@@ -53,7 +53,6 @@ enum xstats_type {
 	pkt_cycles,
 	poll_return, /* for zero-count and used also for port bucket loop */
 	/* qid_specific */
-	iq_size,
 	iq_used,
 	/* qid port mapping specific */
 	pinned,
@@ -144,7 +143,6 @@ get_qid_stat(const struct sw_evdev *sw, uint16_t obj_idx,
 			return infl;
 		} while (0);
 		break;
-	case iq_size: return RTE_DIM(qid->iq[0]->ring);
 	default: return -1;
 	}
 }
@@ -157,7 +155,7 @@ get_qid_iq_stat(const struct sw_evdev *sw, uint16_t obj_idx,
 	const int iq_idx = extra_arg;
 
 	switch (type) {
-	case iq_used: return iq_ring_count(qid->iq[iq_idx]);
+	case iq_used: return iq_count(&qid->iq[iq_idx]);
 	default: return -1;
 	}
 }
@@ -236,13 +234,13 @@ sw_xstats_init(struct sw_evdev *sw)
 	/* all bucket dequeues are allowed to be reset, handled in loop below */
 
 	static const char * const qid_stats[] = {"rx", "tx", "drop",
-			"inflight", "iq_size"
+			"inflight"
 	};
 	static const enum xstats_type qid_types[] = { rx, tx, dropped,
-			inflight, iq_size
+			inflight
 	};
 	static const uint8_t qid_reset_allowed[] = {1, 1, 1,
-			0, 0
+			0
 	};
 
 	static const char * const qid_iq_stats[] = { "used" };
diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c
index 7219886..2c3a1e2 100644
--- a/test/test/test_eventdev_sw.c
+++ b/test/test/test_eventdev_sw.c
@@ -920,8 +920,8 @@ xstats_tests(struct test *t)
 	ret = rte_event_dev_xstats_names_get(evdev,
 					RTE_EVENT_DEV_XSTATS_QUEUE,
 					0, xstats_names, ids, XSTATS_MAX);
-	if (ret != 17) {
-		printf("%d: expected 17 stats, got return %d\n", __LINE__, ret);
+	if (ret != 16) {
+		printf("%d: expected 16 stats, got return %d\n", __LINE__, ret);
 		return -1;
 	}
 
@@ -937,8 +937,8 @@ xstats_tests(struct test *t)
 	ret = rte_event_dev_xstats_get(evdev,
 					RTE_EVENT_DEV_XSTATS_QUEUE,
 					0, ids, values, ret);
-	if (ret != 17) {
-		printf("%d: expected 17 stats, got return %d\n", __LINE__, ret);
+	if (ret != 16) {
+		printf("%d: expected 16 stats, got return %d\n", __LINE__, ret);
 		return -1;
 	}
 
@@ -1100,7 +1100,6 @@ xstats_tests(struct test *t)
 		3 /* tx */,
 		0 /* drop */,
 		3 /* inflights */,
-		512 /* iq size */,
 		0, 0, 0, 0, /* iq 0, 1, 2, 3 used */
 		/* QID-to-Port: pinned_flows, packets */
 		0, 0,
@@ -1131,7 +1130,6 @@ xstats_tests(struct test *t)
 		0 /* tx */,
 		0 /* drop */,
 		3 /* inflight */,
-		512 /* iq size */,
 		0, 0, 0, 0, /* 4 iq used */
 		/* QID-to-Port: pinned_flows, packets */
 		0, 0,
@@ -1683,7 +1681,7 @@ xstats_id_reset_tests(struct test *t)
 		goto fail;
 
 /* num queue stats */
-#define NUM_Q_STATS 17
+#define NUM_Q_STATS 16
 /* queue offset from start of the devices whole xstats.
  * This will break every time we add a statistic to a device/port/queue
  */
@@ -1708,7 +1706,6 @@ xstats_id_reset_tests(struct test *t)
 		"qid_0_tx",
 		"qid_0_drop",
 		"qid_0_inflight",
-		"qid_0_iq_size",
 		"qid_0_iq_0_used",
 		"qid_0_iq_1_used",
 		"qid_0_iq_2_used",
@@ -1727,7 +1724,6 @@ xstats_id_reset_tests(struct test *t)
 		7, /* tx */
 		0, /* drop */
 		7, /* inflight */
-		512, /* iq size */
 		0, /* iq 0 used */
 		0, /* iq 1 used */
 		0, /* iq 2 used */
@@ -1743,7 +1739,6 @@ xstats_id_reset_tests(struct test *t)
 		0, /* tx */
 		0, /* drop */
 		7, /* inflight */
-		512, /* iq size */
 		0, /* iq 0 used */
 		0, /* iq 1 used */
 		0, /* iq 2 used */
-- 
2.7.4



More information about the dev mailing list