[dpdk-dev] [RFC PATCH] eventdev: add buffered enqueue and flush APIs

Gage Eads gage.eads at intel.com
Fri Dec 2 20:45:56 CET 2016


This commit adds buffered enqueue functionality to the eventdev API.
It is conceptually similar to the ethdev API's tx buffering, however
with a smaller API surface and no dropping of events.

Signed-off-by: Gage Eads <gage.eads at intel.com>
---
 lib/librte_eventdev/rte_eventdev.c |  29 ++++++++++
 lib/librte_eventdev/rte_eventdev.h | 106 +++++++++++++++++++++++++++++++++++++
 2 files changed, 135 insertions(+)

diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index 17ce5c3..564573f 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -219,6 +219,7 @@
 	uint16_t *links_map;
 	uint8_t *ports_dequeue_depth;
 	uint8_t *ports_enqueue_depth;
+	struct rte_eventdev_enqueue_buffer *port_buffers;
 	unsigned int i;
 
 	EDEV_LOG_DEBUG("Setup %d ports on device %u", nb_ports,
@@ -272,6 +273,19 @@
 					"nb_ports %u", nb_ports);
 			return -(ENOMEM);
 		}
+
+		/* Allocate memory to store port enqueue buffers */
+		dev->data->port_buffers =
+			rte_zmalloc_socket("eventdev->port_buffers",
+			sizeof(dev->data->port_buffers[0]) * nb_ports,
+			RTE_CACHE_LINE_SIZE, dev->data->socket_id);
+		if (dev->data->port_buffers == NULL) {
+			dev->data->nb_ports = 0;
+			EDEV_LOG_ERR("failed to get memory for port enq"
+				     " buffers, nb_ports %u", nb_ports);
+			return -(ENOMEM);
+		}
+
 	} else if (dev->data->ports != NULL && nb_ports != 0) {/* re-config */
 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
 
@@ -279,6 +293,7 @@
 		ports_dequeue_depth = dev->data->ports_dequeue_depth;
 		ports_enqueue_depth = dev->data->ports_enqueue_depth;
 		links_map = dev->data->links_map;
+		port_buffers = dev->data->port_buffers;
 
 		for (i = nb_ports; i < old_nb_ports; i++)
 			(*dev->dev_ops->port_release)(ports[i]);
@@ -324,6 +339,17 @@
 			return -(ENOMEM);
 		}
 
+		/* Realloc memory to store port enqueue buffers */
+		port_buffers = rte_realloc(dev->data->port_buffers,
+			sizeof(dev->data->port_buffers[0]) * nb_ports,
+			RTE_CACHE_LINE_SIZE);
+		if (port_buffers == NULL) {
+			dev->data->nb_ports = 0;
+			EDEV_LOG_ERR("failed to realloc mem for port enq"
+				     " buffers, nb_ports %u", nb_ports);
+			return -(ENOMEM);
+		}
+
 		if (nb_ports > old_nb_ports) {
 			uint8_t new_ps = nb_ports - old_nb_ports;
 
@@ -336,12 +362,15 @@
 			memset(links_map +
 				(old_nb_ports * RTE_EVENT_MAX_QUEUES_PER_DEV),
 				0, sizeof(ports_enqueue_depth[0]) * new_ps);
+			memset(port_buffers + old_nb_ports, 0,
+				sizeof(port_buffers[0]) * new_ps);
 		}
 
 		dev->data->ports = ports;
 		dev->data->ports_dequeue_depth = ports_dequeue_depth;
 		dev->data->ports_enqueue_depth = ports_enqueue_depth;
 		dev->data->links_map = links_map;
+		dev->data->port_buffers = port_buffers;
 	} else if (dev->data->ports != NULL && nb_ports == 0) {
 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
 
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index 778d6dc..3f24342 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -246,6 +246,7 @@
 #include <rte_dev.h>
 #include <rte_memory.h>
 #include <rte_errno.h>
+#include <rte_memcpy.h>
 
 #define EVENTDEV_NAME_SKELETON_PMD event_skeleton
 /**< Skeleton event device PMD name */
@@ -965,6 +966,26 @@ typedef uint16_t (*event_dequeue_burst_t)(void *port, struct rte_event ev[],
 #define RTE_EVENTDEV_NAME_MAX_LEN	(64)
 /**< @internal Max length of name of event PMD */
 
+#define RTE_EVENT_BUF_MAX 16
+/**< Maximum number of events in an enqueue buffer. */
+
+/**
+ * @internal
+ * An enqueue buffer for each port.
+ *
+ * The reason this struct is in the header is for inlining the function calls
+ * to enqueue, as doing a function call per packet would incur significant
+ * performance overhead.
+ *
+ * \see rte_event_enqueue_buffer(), rte_event_enqueue_buffer_flush()
+ */
+struct rte_eventdev_enqueue_buffer {
+	/**> Count of events in this buffer */
+	uint16_t count;
+	/**> Array of events in this buffer */
+	struct rte_event events[RTE_EVENT_BUF_MAX];
+} __rte_cache_aligned;
+
 /**
  * @internal
  * The data part, with no function pointers, associated with each device.
@@ -983,6 +1004,8 @@ struct rte_eventdev_data {
 	/**< Number of event ports. */
 	void **ports;
 	/**< Array of pointers to ports. */
+	struct rte_eventdev_enqueue_buffer *port_buffers;
+	/**< Array of port enqueue buffers. */
 	uint8_t *ports_dequeue_depth;
 	/**< Array of port dequeue depth. */
 	uint8_t *ports_enqueue_depth;
@@ -1132,6 +1155,89 @@ struct rte_eventdev {
 }
 
 /**
+ * Flush the enqueue buffer of the event port specified by *port_id*, in the
+ * event device specified by *dev_id*.
+ *
+ * This function attempts to flush as many of the buffered events as possible,
+ * and returns the number of flushed events. Any unflushed events remain in
+ * the buffer.
+ *
+ * @param dev_id
+ *   The identifier of the device.
+ * @param port_id
+ *   The identifier of the event port.
+ *
+ * @return
+ *   The number of event objects actually flushed to the event device.
+ *
+ * \see rte_event_enqueue_buffer(), rte_event_enqueue_burst()
+ * \see rte_event_port_enqueue_depth()
+ */
+static inline int
+rte_event_enqueue_buffer_flush(uint8_t dev_id, uint8_t port_id)
+{
+	struct rte_eventdev *dev = &rte_eventdevs[dev_id];
+	struct rte_eventdev_enqueue_buffer *buf =
+		&dev->data->port_buffers[port_id];
+	int n;
+
+	n = rte_event_enqueue_burst(dev_id, port_id, buf->events, buf->count);
+
+	if (n != buf->count)
+		memmove(buf->events, &buf->events[n], buf->count - n);
+
+	buf->count -= n;
+
+	return n;
+}
+
+/**
+ * Buffer an event object supplied in *rte_event* structure for future
+ * enqueueing on an event device designated by its *dev_id* through the event
+ * port specified by *port_id*.
+ *
+ * This function takes a single event and buffers it for later enqueuing to the
+ * queue specified in the event structure. If the buffer is full, the
+ * function will attempt to flush the buffer before buffering the event.
+ * If the flush operation fails, the previously buffered events remain in the
+ * buffer and an error is returned to the user to indicate that *ev* was not
+ * buffered.
+ *
+ * @param dev_id
+ *   The identifier of the device.
+ * @param port_id
+ *   The identifier of the event port.
+ * @param ev
+ *   Pointer to struct rte_event
+ *
+ * @return
+ *  - 0 on success
+ *  - <0 on failure. Failure can occur if the event port's output queue is
+ *     backpressured, for instance.
+ *
+ * \see rte_event_enqueue_buffer_flush(), rte_event_enqueue_burst()
+ * \see rte_event_port_enqueue_depth()
+ */
+static inline int
+rte_event_enqueue_buffer(uint8_t dev_id, uint8_t port_id, struct rte_event *ev)
+{
+	struct rte_eventdev *dev = &rte_eventdevs[dev_id];
+	struct rte_eventdev_enqueue_buffer *buf =
+		&dev->data->port_buffers[port_id];
+	int ret;
+
+	/* If necessary, flush the enqueue buffer to make space for ev. */
+	if (buf->count == RTE_EVENT_BUF_MAX) {
+		ret = rte_event_enqueue_buffer_flush(dev_id, port_id);
+		if (ret == 0)
+			return -ENOSPC;
+	}
+
+	rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
+	return 0;
+}
+
+/**
  * Converts nanoseconds to *wait* value for rte_event_dequeue()
  *
  * If the device is configured with RTE_EVENT_DEV_CFG_PER_DEQUEUE_WAIT flag then
-- 
1.9.1



More information about the dev mailing list