[dpdk-dev] [PATCH v6 15/23] eventtimer: add buffering of timer expiry events

Erik Gabriel Carrillo erik.g.carrillo at intel.com
Thu Jan 11 01:21:06 CET 2018


Buffer timer expiry events generated while walking a "run list"
in rte_timer_manage, and burst enqueue them to an event device
to the extent possible.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
---
 lib/librte_eventdev/rte_event_timer_adapter.c | 118 +++++++++++++++++++++++---
 1 file changed, 108 insertions(+), 10 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index 8bd9ebc..176cc5b 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -447,7 +447,93 @@ rte_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
 }
 
 /*
- * Software event timer adapter ops definitions
+ * Software event timer adapter buffer helper functions
+ */
+
+#define EVENT_BUFFER_SZ 1024
+#if EVENT_BUFFER_SZ < 1 || EVENT_BUFFER_SZ > 65536
+#error "EVENT_BUFFER_SZ must be between 1 and 65536"
+#endif
+
+#define EVENT_BUFFER_BATCHSZ 32
+
+struct event_buffer {
+	uint16_t head;
+	uint16_t tail;
+	uint16_t count;
+	struct rte_event events[EVENT_BUFFER_SZ];
+} __rte_cache_aligned;
+
+static inline bool
+event_buffer_full(struct event_buffer *bufp)
+{
+	return (bufp->tail + 1) % EVENT_BUFFER_SZ == bufp->head;
+}
+
+static inline bool
+event_buffer_batch_ready(struct event_buffer *bufp)
+{
+	return bufp->count >= EVENT_BUFFER_BATCHSZ;
+}
+
+static void
+event_buffer_init(struct event_buffer *bufp)
+{
+	bufp->head = bufp->tail = bufp->count = 0;
+	memset(&bufp->events, 0, sizeof(struct rte_event) * EVENT_BUFFER_SZ);
+}
+
+static int
+event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)
+{
+	uint16_t *tailp = &bufp->tail;
+	struct rte_event *buf_eventp;
+
+	if (event_buffer_full(bufp))
+		return -1;
+
+	buf_eventp = &bufp->events[*tailp];
+	rte_memcpy(buf_eventp, eventp, sizeof(struct rte_event));
+
+	*tailp = (*tailp + 1) % EVENT_BUFFER_SZ;
+	bufp->count++;
+
+	return 0;
+}
+
+static void
+event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
+		   uint16_t *nb_events_flushed)
+{
+	uint16_t n = 0;
+	uint16_t *headp = &bufp->head;
+	uint16_t *tailp = &bufp->tail;
+	struct rte_event *events = bufp->events;
+
+	if (*tailp > *headp)
+		n = *tailp - *headp;
+	else if (*tailp < *headp)
+		n = EVENT_BUFFER_SZ - *headp;
+	else {  /* buffer empty */
+		*nb_events_flushed = 0;
+		return;
+	}
+
+	*nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
+						     &events[*headp], n);
+	if (*nb_events_flushed != n && rte_errno == -EINVAL) {
+		/* We must have hit a bad event...skip it to ensure we don't
+		 * hang on it.
+		 */
+		(*nb_events_flushed)++;
+	}
+
+	*headp = (*headp + *nb_events_flushed) % EVENT_BUFFER_SZ;
+	bufp->count -= *nb_events_flushed;
+}
+
+/*
+ * Software event timer adapter implementation
  */
 
 struct rte_event_timer_adapter_sw_data {
@@ -461,6 +547,8 @@ struct rte_event_timer_adapter_sw_data {
 	struct rte_mempool *msg_pool;
 	/* Mempool containing timer objects */
 	struct rte_mempool *tim_pool;
+	/* Buffered timer expiry events to be enqueued to an event device. */
+	struct event_buffer buffer;
 };
 
 enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
@@ -473,7 +561,8 @@ struct msg {
 static void
 sw_event_timer_cb(struct rte_timer *tim, void *arg)
 {
-	uint16_t n;
+	uint16_t nb_evs_flushed;
+	int ret;
 	struct rte_event_timer *evtim;
 	struct rte_event_timer_adapter *adapter;
 	struct rte_event_timer_adapter_sw_data *sw_data;
@@ -482,14 +571,10 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg)
 	adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1];
 	sw_data = adapter->data->adapter_priv;
 
-	n = rte_event_enqueue_burst(adapter->data->event_dev_id,
-				    adapter->data->event_port_id,
-				    &evtim->ev,
-				    1);
-	if (n != 1 && rte_errno == -ENOSPC) {
-		/* If we couldn't enqueue because the event port was
-		 * backpressured, put the timer back in the skiplist with an
-		 * immediate expiry value so we can process it again on the
+	ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+	if (ret < 0) {
+		/* If event buffer is full, put timer back in list with
+		 * immediate expiry value, so that we process it again on the
 		 * next iteration.
 		 */
 		rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
@@ -500,6 +585,13 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg)
 		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
 		rte_mempool_put(sw_data->tim_pool, (void **)&tim);
 	}
+
+	if (event_buffer_batch_ready(&sw_data->buffer)) {
+		event_buffer_flush(&sw_data->buffer,
+				   adapter->data->event_dev_id,
+				   adapter->data->event_port_id,
+				   &nb_evs_flushed);
+	}
 }
 
 static __rte_always_inline uint64_t
@@ -658,10 +750,14 @@ sw_event_timer_adapter_service_func(void *arg)
 
 	rte_timer_manage();
 
+	event_buffer_flush(&sw_data->buffer, adapter->data->event_dev_id,
+			   adapter->data->event_port_id, &nb_events);
+
 	/* Could use for stats */
 	RTE_SET_USED(nb_events);
 	RTE_SET_USED(ret);
 
+
 	return 0;
 }
 
@@ -726,6 +822,8 @@ sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
 		return -1;
 	}
 
+	event_buffer_init(&sw_data->buffer);
+
 	/* Register a service component */
 	memset(&service, 0, sizeof(service));
 	snprintf(service.name, RTE_SERVICE_NAME_MAX,
-- 
2.6.4



More information about the dev mailing list