[dpdk-dev] [RFC PATCH] eventdev: add buffered enqueue and flush APIs

Eads, Gage gage.eads at intel.com
Tue Dec 6 00:30:46 CET 2016


> On Dec 3, 2016, at 5:18 AM, Jerin Jacob <jerin.jacob at caviumnetworks.com> wrote:
> 
>> On Fri, Dec 02, 2016 at 01:45:56PM -0600, Gage Eads wrote:
>> This commit adds buffered enqueue functionality to the eventdev API.
>> It is conceptually similar to the ethdev API's tx buffering, however
>> with a smaller API surface and no dropping of events.
> 
> Hello Gage,
> Different implementation may have different strategies to hold the buffers.

A benefit of inlining the buffering logic in the header is that we avoid the overhead of entering the PMD for what is a fairly simple operation (common case: add event to an array, increment counter). If we make this implementation-defined (i.e. use PMD callbacks), we lose that benefit.

> and some does not need to hold the buffers if it is DDR backed.

Though DDR-backed hardware doesn't need to buffer in software, doing so would reduce the software overhead of enqueueing. Compared to N individual calls to enqueue, buffering N events then calling enqueue burst once can benefit from amortized (or parallelized) PMD-specific bookkeeping and error-checking across the set of events, and will definitely benefit from the amortized function call overhead and better I-cache behavior. (Essentially this is VPP from the fd.io project.) This should result in higher overall event throughout (agnostic of the underlying device).

I'm skeptical that other buffering strategies would emerge, but I can only speculate on Cavium/NXP/etc. NPU software.

> IHMO, This may not be the candidate for common code. I guess you can move this
> to driver side and abstract under SW driver's enqueue_burst.
> 

I don't think that will work without adding a flush API, otherwise we could have indefinitely buffered events. I see three ways forward:

- The proposed approach
- Add the proposed functions but make them implementation-specific.
- Require the application to write its own buffering logic (i.e. no API change)

Thanks,
Gage

> 
>> 
>> Signed-off-by: Gage Eads <gage.eads at intel.com>
>> ---
>> lib/librte_eventdev/rte_eventdev.c |  29 ++++++++++
>> lib/librte_eventdev/rte_eventdev.h | 106 +++++++++++++++++++++++++++++++++++++
>> 2 files changed, 135 insertions(+)
>> 
>> diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
>> index 17ce5c3..564573f 100644
>> --- a/lib/librte_eventdev/rte_eventdev.c
>> +++ b/lib/librte_eventdev/rte_eventdev.c
>> @@ -219,6 +219,7 @@
>>    uint16_t *links_map;
>>    uint8_t *ports_dequeue_depth;
>>    uint8_t *ports_enqueue_depth;
>> +    struct rte_eventdev_enqueue_buffer *port_buffers;
>>    unsigned int i;
>> 
>>    EDEV_LOG_DEBUG("Setup %d ports on device %u", nb_ports,
>> @@ -272,6 +273,19 @@
>>                    "nb_ports %u", nb_ports);
>>            return -(ENOMEM);
>>        }
>> +
>> +        /* Allocate memory to store port enqueue buffers */
>> +        dev->data->port_buffers =
>> +            rte_zmalloc_socket("eventdev->port_buffers",
>> +            sizeof(dev->data->port_buffers[0]) * nb_ports,
>> +            RTE_CACHE_LINE_SIZE, dev->data->socket_id);
>> +        if (dev->data->port_buffers == NULL) {
>> +            dev->data->nb_ports = 0;
>> +            EDEV_LOG_ERR("failed to get memory for port enq"
>> +                     " buffers, nb_ports %u", nb_ports);
>> +            return -(ENOMEM);
>> +        }
>> +
>>    } else if (dev->data->ports != NULL && nb_ports != 0) {/* re-config */
>>        RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
>> 
>> @@ -279,6 +293,7 @@
>>        ports_dequeue_depth = dev->data->ports_dequeue_depth;
>>        ports_enqueue_depth = dev->data->ports_enqueue_depth;
>>        links_map = dev->data->links_map;
>> +        port_buffers = dev->data->port_buffers;
>> 
>>        for (i = nb_ports; i < old_nb_ports; i++)
>>            (*dev->dev_ops->port_release)(ports[i]);
>> @@ -324,6 +339,17 @@
>>            return -(ENOMEM);
>>        }
>> 
>> +        /* Realloc memory to store port enqueue buffers */
>> +        port_buffers = rte_realloc(dev->data->port_buffers,
>> +            sizeof(dev->data->port_buffers[0]) * nb_ports,
>> +            RTE_CACHE_LINE_SIZE);
>> +        if (port_buffers == NULL) {
>> +            dev->data->nb_ports = 0;
>> +            EDEV_LOG_ERR("failed to realloc mem for port enq"
>> +                     " buffers, nb_ports %u", nb_ports);
>> +            return -(ENOMEM);
>> +        }
>> +
>>        if (nb_ports > old_nb_ports) {
>>            uint8_t new_ps = nb_ports - old_nb_ports;
>> 
>> @@ -336,12 +362,15 @@
>>            memset(links_map +
>>                (old_nb_ports * RTE_EVENT_MAX_QUEUES_PER_DEV),
>>                0, sizeof(ports_enqueue_depth[0]) * new_ps);
>> +            memset(port_buffers + old_nb_ports, 0,
>> +                sizeof(port_buffers[0]) * new_ps);
>>        }
>> 
>>        dev->data->ports = ports;
>>        dev->data->ports_dequeue_depth = ports_dequeue_depth;
>>        dev->data->ports_enqueue_depth = ports_enqueue_depth;
>>        dev->data->links_map = links_map;
>> +        dev->data->port_buffers = port_buffers;
>>    } else if (dev->data->ports != NULL && nb_ports == 0) {
>>        RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP);
>> 
>> diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
>> index 778d6dc..3f24342 100644
>> --- a/lib/librte_eventdev/rte_eventdev.h
>> +++ b/lib/librte_eventdev/rte_eventdev.h
>> @@ -246,6 +246,7 @@
>> #include <rte_dev.h>
>> #include <rte_memory.h>
>> #include <rte_errno.h>
>> +#include <rte_memcpy.h>
>> 
>> #define EVENTDEV_NAME_SKELETON_PMD event_skeleton
>> /**< Skeleton event device PMD name */
>> @@ -965,6 +966,26 @@ typedef uint16_t (*event_dequeue_burst_t)(void *port, struct rte_event ev[],
>> #define RTE_EVENTDEV_NAME_MAX_LEN    (64)
>> /**< @internal Max length of name of event PMD */
>> 
>> +#define RTE_EVENT_BUF_MAX 16
>> +/**< Maximum number of events in an enqueue buffer. */
>> +
>> +/**
>> + * @internal
>> + * An enqueue buffer for each port.
>> + *
>> + * The reason this struct is in the header is for inlining the function calls
>> + * to enqueue, as doing a function call per packet would incur significant
>> + * performance overhead.
>> + *
>> + * \see rte_event_enqueue_buffer(), rte_event_enqueue_buffer_flush()
>> + */
>> +struct rte_eventdev_enqueue_buffer {
>> +    /**> Count of events in this buffer */
>> +    uint16_t count;
>> +    /**> Array of events in this buffer */
>> +    struct rte_event events[RTE_EVENT_BUF_MAX];
>> +} __rte_cache_aligned;
>> +
>> /**
>>  * @internal
>>  * The data part, with no function pointers, associated with each device.
>> @@ -983,6 +1004,8 @@ struct rte_eventdev_data {
>>    /**< Number of event ports. */
>>    void **ports;
>>    /**< Array of pointers to ports. */
>> +    struct rte_eventdev_enqueue_buffer *port_buffers;
>> +    /**< Array of port enqueue buffers. */
>>    uint8_t *ports_dequeue_depth;
>>    /**< Array of port dequeue depth. */
>>    uint8_t *ports_enqueue_depth;
>> @@ -1132,6 +1155,89 @@ struct rte_eventdev {
>> }
>> 
>> /**
>> + * Flush the enqueue buffer of the event port specified by *port_id*, in the
>> + * event device specified by *dev_id*.
>> + *
>> + * This function attempts to flush as many of the buffered events as possible,
>> + * and returns the number of flushed events. Any unflushed events remain in
>> + * the buffer.
>> + *
>> + * @param dev_id
>> + *   The identifier of the device.
>> + * @param port_id
>> + *   The identifier of the event port.
>> + *
>> + * @return
>> + *   The number of event objects actually flushed to the event device.
>> + *
>> + * \see rte_event_enqueue_buffer(), rte_event_enqueue_burst()
>> + * \see rte_event_port_enqueue_depth()
>> + */
>> +static inline int
>> +rte_event_enqueue_buffer_flush(uint8_t dev_id, uint8_t port_id)
>> +{
>> +    struct rte_eventdev *dev = &rte_eventdevs[dev_id];
>> +    struct rte_eventdev_enqueue_buffer *buf =
>> +        &dev->data->port_buffers[port_id];
>> +    int n;
>> +
>> +    n = rte_event_enqueue_burst(dev_id, port_id, buf->events, buf->count);
>> +
>> +    if (n != buf->count)
>> +        memmove(buf->events, &buf->events[n], buf->count - n);
>> +
>> +    buf->count -= n;
>> +
>> +    return n;
>> +}
>> +
>> +/**
>> + * Buffer an event object supplied in *rte_event* structure for future
>> + * enqueueing on an event device designated by its *dev_id* through the event
>> + * port specified by *port_id*.
>> + *
>> + * This function takes a single event and buffers it for later enqueuing to the
>> + * queue specified in the event structure. If the buffer is full, the
>> + * function will attempt to flush the buffer before buffering the event.
>> + * If the flush operation fails, the previously buffered events remain in the
>> + * buffer and an error is returned to the user to indicate that *ev* was not
>> + * buffered.
>> + *
>> + * @param dev_id
>> + *   The identifier of the device.
>> + * @param port_id
>> + *   The identifier of the event port.
>> + * @param ev
>> + *   Pointer to struct rte_event
>> + *
>> + * @return
>> + *  - 0 on success
>> + *  - <0 on failure. Failure can occur if the event port's output queue is
>> + *     backpressured, for instance.
>> + *
>> + * \see rte_event_enqueue_buffer_flush(), rte_event_enqueue_burst()
>> + * \see rte_event_port_enqueue_depth()
>> + */
>> +static inline int
>> +rte_event_enqueue_buffer(uint8_t dev_id, uint8_t port_id, struct rte_event *ev)
>> +{
>> +    struct rte_eventdev *dev = &rte_eventdevs[dev_id];
>> +    struct rte_eventdev_enqueue_buffer *buf =
>> +        &dev->data->port_buffers[port_id];
>> +    int ret;
>> +
>> +    /* If necessary, flush the enqueue buffer to make space for ev. */
>> +    if (buf->count == RTE_EVENT_BUF_MAX) {
>> +        ret = rte_event_enqueue_buffer_flush(dev_id, port_id);
>> +        if (ret == 0)
>> +            return -ENOSPC;
>> +    }
>> +
>> +    rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
>> +    return 0;
>> +}
>> +
>> +/**
>>  * Converts nanoseconds to *wait* value for rte_event_dequeue()
>>  *
>>  * If the device is configured with RTE_EVENT_DEV_CFG_PER_DEQUEUE_WAIT flag then
>> -- 
>> 1.9.1
>> 


More information about the dev mailing list