[dpdk-dev] [PATCH v6 15/23] eventtimer: add buffering of timer expiry events
Erik Gabriel Carrillo
erik.g.carrillo at intel.com
Thu Jan 11 01:21:06 CET 2018
Buffer timer expiry events generated while walking a "run list"
in rte_timer_manage, and burst enqueue them to an event device
to the extent possible.
Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo at intel.com>
---
lib/librte_eventdev/rte_event_timer_adapter.c | 118 +++++++++++++++++++++++---
1 file changed, 108 insertions(+), 10 deletions(-)
diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index 8bd9ebc..176cc5b 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -447,7 +447,93 @@ rte_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
}
/*
- * Software event timer adapter ops definitions
+ * Software event timer adapter buffer helper functions
+ */
+
+#define EVENT_BUFFER_SZ 1024
+#if EVENT_BUFFER_SZ < 1 || EVENT_BUFFER_SZ > 65536
+#error "EVENT_BUFFER_SZ must be between 1 and 65536"
+#endif
+
+#define EVENT_BUFFER_BATCHSZ 32
+
+struct event_buffer {
+ uint16_t head;
+ uint16_t tail;
+ uint16_t count;
+ struct rte_event events[EVENT_BUFFER_SZ];
+} __rte_cache_aligned;
+
+static inline bool
+event_buffer_full(struct event_buffer *bufp)
+{
+ return (bufp->tail + 1) % EVENT_BUFFER_SZ == bufp->head;
+}
+
+static inline bool
+event_buffer_batch_ready(struct event_buffer *bufp)
+{
+ return bufp->count >= EVENT_BUFFER_BATCHSZ;
+}
+
+static void
+event_buffer_init(struct event_buffer *bufp)
+{
+ bufp->head = bufp->tail = bufp->count = 0;
+ memset(&bufp->events, 0, sizeof(struct rte_event) * EVENT_BUFFER_SZ);
+}
+
+static int
+event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)
+{
+ uint16_t *tailp = &bufp->tail;
+ struct rte_event *buf_eventp;
+
+ if (event_buffer_full(bufp))
+ return -1;
+
+ buf_eventp = &bufp->events[*tailp];
+ rte_memcpy(buf_eventp, eventp, sizeof(struct rte_event));
+
+ *tailp = (*tailp + 1) % EVENT_BUFFER_SZ;
+ bufp->count++;
+
+ return 0;
+}
+
+static void
+event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
+ uint16_t *nb_events_flushed)
+{
+ uint16_t n = 0;
+ uint16_t *headp = &bufp->head;
+ uint16_t *tailp = &bufp->tail;
+ struct rte_event *events = bufp->events;
+
+ if (*tailp > *headp)
+ n = *tailp - *headp;
+ else if (*tailp < *headp)
+ n = EVENT_BUFFER_SZ - *headp;
+ else { /* buffer empty */
+ *nb_events_flushed = 0;
+ return;
+ }
+
+ *nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
+ &events[*headp], n);
+ if (*nb_events_flushed != n && rte_errno == -EINVAL) {
+ /* We must have hit a bad event...skip it to ensure we don't
+ * hang on it.
+ */
+ (*nb_events_flushed)++;
+ }
+
+ *headp = (*headp + *nb_events_flushed) % EVENT_BUFFER_SZ;
+ bufp->count -= *nb_events_flushed;
+}
+
+/*
+ * Software event timer adapter implementation
*/
struct rte_event_timer_adapter_sw_data {
@@ -461,6 +547,8 @@ struct rte_event_timer_adapter_sw_data {
struct rte_mempool *msg_pool;
/* Mempool containing timer objects */
struct rte_mempool *tim_pool;
+ /* Buffered timer expiry events to be enqueued to an event device. */
+ struct event_buffer buffer;
};
enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
@@ -473,7 +561,8 @@ struct msg {
static void
sw_event_timer_cb(struct rte_timer *tim, void *arg)
{
- uint16_t n;
+ uint16_t nb_evs_flushed;
+ int ret;
struct rte_event_timer *evtim;
struct rte_event_timer_adapter *adapter;
struct rte_event_timer_adapter_sw_data *sw_data;
@@ -482,14 +571,10 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg)
adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1];
sw_data = adapter->data->adapter_priv;
- n = rte_event_enqueue_burst(adapter->data->event_dev_id,
- adapter->data->event_port_id,
- &evtim->ev,
- 1);
- if (n != 1 && rte_errno == -ENOSPC) {
- /* If we couldn't enqueue because the event port was
- * backpressured, put the timer back in the skiplist with an
- * immediate expiry value so we can process it again on the
+ ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+ if (ret < 0) {
+ /* If event buffer is full, put timer back in list with
+ * immediate expiry value, so that we process it again on the
* next iteration.
*/
rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
@@ -500,6 +585,13 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg)
evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
rte_mempool_put(sw_data->tim_pool, (void **)&tim);
}
+
+ if (event_buffer_batch_ready(&sw_data->buffer)) {
+ event_buffer_flush(&sw_data->buffer,
+ adapter->data->event_dev_id,
+ adapter->data->event_port_id,
+ &nb_evs_flushed);
+ }
}
static __rte_always_inline uint64_t
@@ -658,10 +750,14 @@ sw_event_timer_adapter_service_func(void *arg)
rte_timer_manage();
+ event_buffer_flush(&sw_data->buffer, adapter->data->event_dev_id,
+ adapter->data->event_port_id, &nb_events);
+
/* Could use for stats */
RTE_SET_USED(nb_events);
RTE_SET_USED(ret);
+
return 0;
}
@@ -726,6 +822,8 @@ sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
return -1;
}
+ event_buffer_init(&sw_data->buffer);
+
/* Register a service component */
memset(&service, 0, sizeof(service));
snprintf(service.name, RTE_SERVICE_NAME_MAX,
--
2.6.4
More information about the dev
mailing list