[dpdk-dev,v6,13/23] eventtimer: add adapter service definition

Message ID 1515630074-29020-14-git-send-email-erik.g.carrillo@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Carrillo, Erik G Jan. 11, 2018, 12:21 a.m. UTC
  Define the callback function for the service that corresponds to an
adapter instance, as well as the callback for expired timers that the
service manages.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
---
 lib/librte_eventdev/rte_event_timer_adapter.c | 198 +++++++++++++++++++++++++-
 lib/librte_eventdev/rte_event_timer_adapter.h |   2 +-
 2 files changed, 198 insertions(+), 2 deletions(-)
  

Comments

Pavan Nikhilesh Jan. 11, 2018, 12:03 p.m. UTC | #1
On Wed, Jan 10, 2018 at 06:21:04PM -0600, Erik Gabriel Carrillo wrote:
> Define the callback function for the service that corresponds to an
> adapter instance, as well as the callback for expired timers that the
> service manages.
>
> Signed-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
> ---
>  lib/librte_eventdev/rte_event_timer_adapter.c | 198 +++++++++++++++++++++++++-
>  lib/librte_eventdev/rte_event_timer_adapter.h |   2 +-
>  2 files changed, 198 insertions(+), 2 deletions(-)
>
> diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
> index 38e52cb..0266ad5 100644
> --- a/lib/librte_eventdev/rte_event_timer_adapter.c
> +++ b/lib/librte_eventdev/rte_event_timer_adapter.c
> @@ -40,8 +40,10 @@
>  #include <rte_malloc.h>
>  #include <rte_ring.h>
>  #include <rte_mempool.h>
> +#include <rte_common.h>
>  #include <rte_timer.h>
>  #include <rte_service_component.h>
> +#include <rte_cycles.h>
>
>  #include "rte_eventdev.h"
>  #include "rte_eventdev_pmd.h"
> @@ -460,10 +462,198 @@ struct msg {
>  	struct rte_event_timer *evtim;
>  };
<snip>
> +	if (n != 1 && rte_errno == -ENOSPC) {
> +		/* If we couldn't enqueue because the event port was
> +		 * backpressured, put the timer back in the skiplist with an
> +		 * immediate expiry value so we can process it again on the
> +		 * next iteration.
> +		 */
> +		rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
> +				     sw_event_timer_cb, evtim);
> +	} else {
> +		sw_data->nb_armed_evtims--;
> +		rte_wmb();

Any reason for using barrier here?. IMO smp_wmb() would be more than sufficient
or use atomics.

> +		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
> +		rte_mempool_put(sw_data->tim_pool, (void **)&tim);
> +	}
> +}
> +
> +static __rte_always_inline uint64_t
> +get_timeout_cycles(struct rte_event_timer *evtim,
> +		   struct rte_event_timer_adapter *adapter)
> +{
> +	uint64_t timeout_ns;
> +
> +	timeout_ns = evtim->timeout_ticks * adapter->data->conf.timer_tick_ns;
> +#define NSECPERSEC 1E9
> +	return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
> +
> +}
> +
> +/* Check that event timer timeout value is in range */
> +static __rte_always_inline int
> +check_timeout(struct rte_event_timer *evtim,
> +	      const struct rte_event_timer_adapter *adapter)
> +{
> +	uint64_t tmo_nsec = evtim->timeout_ticks *
> +		adapter->data->conf.timer_tick_ns;
> +
> +	return  (tmo_nsec > adapter->data->conf.max_tmo_ns) ? -1
> +		: (tmo_nsec < adapter->data->conf.timer_tick_ns) ? -2
> +		: 0;

Consider simplifying this for readability.

> +}
> +
> +/* Check that event timer event queue sched type matches destination event queue
> + * sched type
> + */
> +static __rte_always_inline int
> +check_destination_event_queue(struct rte_event_timer *evtim,
> +			      const struct rte_event_timer_adapter *adapter)

<snip>
> +
> +#define NB_OBJS 32
>  static int
>  sw_event_timer_adapter_service_func(void *arg)
>  {
> -	RTE_SET_USED(arg);
> +	int i, num_msgs, ret;
> +	uint64_t cycles;
> +	uint16_t nb_events;
> +	struct rte_event_timer_adapter *adapter;
> +	struct rte_event_timer_adapter_sw_data *sw_data;
> +	struct rte_event_timer *evtim = NULL;
> +	struct rte_timer *tim = NULL;
> +	struct msg *msg, *msgs[NB_OBJS];
> +
> +	adapter = arg;
> +	sw_data = adapter->data->adapter_priv;
> +
> +	while (!rte_ring_empty(sw_data->msg_ring)) {
> +		num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
> +						  (void **)msgs, NB_OBJS, NULL);
> +
> +		for (i = 0; i < num_msgs; i++) {
> +			msg = msgs[i];
> +			evtim = msg->evtim;
> +
> +			tim = (struct rte_timer *)evtim->impl_opaque[0];
> +			RTE_ASSERT(tim != NULL);
> +
> +			switch (msg->type) {
> +			case MSG_TYPE_ARM:
> +				if (validate_event_timer(evtim, adapter) < 0) {
> +					rte_mempool_put(sw_data->tim_pool,
> +							(void **)&tim);
> +					continue;
> +				}
> +
> +				/* Checks passed; set an rte_timer */
> +				cycles = get_timeout_cycles(msg->evtim,
> +							    adapter);
> +				rte_timer_reset_sync(tim, cycles, SINGLE,
> +						     rte_lcore_id(),
> +						     sw_event_timer_cb,
> +						     msg->evtim);
> +
> +				sw_data->nb_armed_evtims++;
> +				rte_wmb();

Same as above comment.

> +				evtim->state = RTE_EVENT_TIMER_ARMED;
> +				break;
> +			case MSG_TYPE_CANCEL:
> +				/* The event timer was either not armed or it
> +				 * fired after this cancel request was queued
> +				 * and before the request was processed.
> +				 */
> +				if (evtim->state != RTE_EVENT_TIMER_ARMED)
> +					continue;
> +
> +				rte_timer_stop_sync(tim);
> +				rte_mempool_put(sw_data->tim_pool,
> +						(void **)&tim);
> +				sw_data->nb_armed_evtims--;
> +				rte_wmb();

Same as above comment.

> +				msg->evtim->state = RTE_EVENT_TIMER_CANCELED;
> +				break;
> +			}
> +		}
> +
> +		rte_mempool_put_bulk(sw_data->msg_pool, (void **)msgs,
> +				     num_msgs);
> +	}
> +
> +	rte_timer_manage();

Consider calling rte_timer_manage() before ARM new set of timers also, poll it
based on the timeout interval configured.

> +
> +	/* Could use for stats */
> +	RTE_SET_USED(nb_events);
> +	RTE_SET_USED(ret);
> +
>  	return 0;
>  }
>
<snip>
  

Patch

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c
index 38e52cb..0266ad5 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -40,8 +40,10 @@ 
 #include <rte_malloc.h>
 #include <rte_ring.h>
 #include <rte_mempool.h>
+#include <rte_common.h>
 #include <rte_timer.h>
 #include <rte_service_component.h>
+#include <rte_cycles.h>
 
 #include "rte_eventdev.h"
 #include "rte_eventdev_pmd.h"
@@ -460,10 +462,198 @@  struct msg {
 	struct rte_event_timer *evtim;
 };
 
+static void
+sw_event_timer_cb(struct rte_timer *tim, void *arg)
+{
+	uint16_t n;
+	struct rte_event_timer *evtim;
+	struct rte_event_timer_adapter *adapter;
+	struct rte_event_timer_adapter_sw_data *sw_data;
+
+	evtim = arg;
+	adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1];
+	sw_data = adapter->data->adapter_priv;
+
+	n = rte_event_enqueue_burst(adapter->data->event_dev_id,
+				    adapter->data->event_port_id,
+				    &evtim->ev,
+				    1);
+	if (n != 1 && rte_errno == -ENOSPC) {
+		/* If we couldn't enqueue because the event port was
+		 * backpressured, put the timer back in the skiplist with an
+		 * immediate expiry value so we can process it again on the
+		 * next iteration.
+		 */
+		rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
+				     sw_event_timer_cb, evtim);
+	} else {
+		sw_data->nb_armed_evtims--;
+		rte_wmb();
+		evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+		rte_mempool_put(sw_data->tim_pool, (void **)&tim);
+	}
+}
+
+static __rte_always_inline uint64_t
+get_timeout_cycles(struct rte_event_timer *evtim,
+		   struct rte_event_timer_adapter *adapter)
+{
+	uint64_t timeout_ns;
+
+	timeout_ns = evtim->timeout_ticks * adapter->data->conf.timer_tick_ns;
+#define NSECPERSEC 1E9
+	return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
+
+}
+
+/* Check that event timer timeout value is in range */
+static __rte_always_inline int
+check_timeout(struct rte_event_timer *evtim,
+	      const struct rte_event_timer_adapter *adapter)
+{
+	uint64_t tmo_nsec = evtim->timeout_ticks *
+		adapter->data->conf.timer_tick_ns;
+
+	return  (tmo_nsec > adapter->data->conf.max_tmo_ns) ? -1
+		: (tmo_nsec < adapter->data->conf.timer_tick_ns) ? -2
+		: 0;
+}
+
+/* Check that event timer event queue sched type matches destination event queue
+ * sched type
+ */
+static __rte_always_inline int
+check_destination_event_queue(struct rte_event_timer *evtim,
+			      const struct rte_event_timer_adapter *adapter)
+{
+	int ret;
+	uint32_t sched_type;
+
+	ret = rte_event_queue_attr_get(adapter->data->event_dev_id,
+				       evtim->ev.queue_id,
+				       RTE_EVENT_QUEUE_ATTR_SCHEDULE_TYPE,
+				       &sched_type);
+
+	if (ret < 0 || evtim->ev.sched_type != sched_type)
+		return -1;
+
+	return 0;
+}
+
+/* We can't correctly block on the state of a timer that is currently armed,
+ * so disallow it.
+ */
+static __rte_always_inline int
+check_state_for_arm(struct rte_event_timer *evtim)
+{
+	return evtim->state != RTE_EVENT_TIMER_ARMED ? 0 : -1;
+}
+
+static inline int
+validate_event_timer(struct rte_event_timer *evtim,
+		     struct rte_event_timer_adapter *adapter)
+{
+	int ret;
+
+	if (check_state_for_arm(evtim) < 0) {
+		evtim->state = RTE_EVENT_TIMER_ERROR;
+		return -1;
+	}
+
+	ret = check_timeout(evtim, adapter);
+	switch (ret) {
+	case -1:
+		evtim->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+		return -1;
+	case -2:
+		evtim->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
+		return -1;
+	}
+
+	if (check_destination_event_queue(evtim, adapter) < 0) {
+		evtim->state = RTE_EVENT_TIMER_ERROR;
+		return -1;
+	}
+
+	return 0;
+}
+
+
+#define NB_OBJS 32
 static int
 sw_event_timer_adapter_service_func(void *arg)
 {
-	RTE_SET_USED(arg);
+	int i, num_msgs, ret;
+	uint64_t cycles;
+	uint16_t nb_events;
+	struct rte_event_timer_adapter *adapter;
+	struct rte_event_timer_adapter_sw_data *sw_data;
+	struct rte_event_timer *evtim = NULL;
+	struct rte_timer *tim = NULL;
+	struct msg *msg, *msgs[NB_OBJS];
+
+	adapter = arg;
+	sw_data = adapter->data->adapter_priv;
+
+	while (!rte_ring_empty(sw_data->msg_ring)) {
+		num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
+						  (void **)msgs, NB_OBJS, NULL);
+
+		for (i = 0; i < num_msgs; i++) {
+			msg = msgs[i];
+			evtim = msg->evtim;
+
+			tim = (struct rte_timer *)evtim->impl_opaque[0];
+			RTE_ASSERT(tim != NULL);
+
+			switch (msg->type) {
+			case MSG_TYPE_ARM:
+				if (validate_event_timer(evtim, adapter) < 0) {
+					rte_mempool_put(sw_data->tim_pool,
+							(void **)&tim);
+					continue;
+				}
+
+				/* Checks passed; set an rte_timer */
+				cycles = get_timeout_cycles(msg->evtim,
+							    adapter);
+				rte_timer_reset_sync(tim, cycles, SINGLE,
+						     rte_lcore_id(),
+						     sw_event_timer_cb,
+						     msg->evtim);
+
+				sw_data->nb_armed_evtims++;
+				rte_wmb();
+				evtim->state = RTE_EVENT_TIMER_ARMED;
+				break;
+			case MSG_TYPE_CANCEL:
+				/* The event timer was either not armed or it
+				 * fired after this cancel request was queued
+				 * and before the request was processed.
+				 */
+				if (evtim->state != RTE_EVENT_TIMER_ARMED)
+					continue;
+
+				rte_timer_stop_sync(tim);
+				rte_mempool_put(sw_data->tim_pool,
+						(void **)&tim);
+				sw_data->nb_armed_evtims--;
+				rte_wmb();
+				msg->evtim->state = RTE_EVENT_TIMER_CANCELED;
+				break;
+			}
+		}
+
+		rte_mempool_put_bulk(sw_data->msg_pool, (void **)msgs,
+				     num_msgs);
+	}
+
+	rte_timer_manage();
+
+	/* Could use for stats */
+	RTE_SET_USED(nb_events);
+	RTE_SET_USED(ret);
+
 	return 0;
 }
 
@@ -474,6 +664,7 @@  sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
 	struct rte_event_timer_adapter_sw_data *sw_data;
 	uint64_t nb_timers;
 	struct rte_service_spec service;
+	static bool timer_subsystem_inited; // static initialized to false
 
 	/* Allocate storage for SW implementation data */
 	char priv_data_name[RTE_RING_NAMESIZE];
@@ -541,6 +732,11 @@  sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
 	adapter->data->service_id = sw_data->service_id;
 	adapter->data->service_inited = 1;
 
+	if (!timer_subsystem_inited) {
+		rte_timer_subsystem_init();
+		timer_subsystem_inited = true;
+	}
+
 	return 0;
 }
 
diff --git a/lib/librte_eventdev/rte_event_timer_adapter.h b/lib/librte_eventdev/rte_event_timer_adapter.h
index 8d29cfc..bbbe7b9 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.h
+++ b/lib/librte_eventdev/rte_event_timer_adapter.h
@@ -461,7 +461,7 @@  struct rte_event_timer {
 	 *  - op: RTE_EVENT_OP_NEW
 	 *  - event_type: RTE_EVENT_TYPE_TIMER
 	 */
-	enum rte_event_timer_state state;
+	volatile enum rte_event_timer_state state;
 	/**< State of the event timer. */
 	uint64_t timeout_ticks;
 	/**< Expiry timer ticks expressed in number of *timer_ticks_ns* from