[dpdk-dev] [PATCH v2 1/2] mempool: allow for user-owned mempool caches

Ananyev, Konstantin konstantin.ananyev at intel.com
Mon Apr 18 15:17:02 CEST 2016


Hi Lazaros,

Looks ok to me in general, few comments below.
One more generic question - did you observe any performance impact 
caused by these changes?
Konstantin

> -----Original Message-----
> From: dev [mailto:dev-bounces at dpdk.org] On Behalf Of Lazaros Koromilas
> Sent: Monday, April 04, 2016 4:43 PM
> To: dev at dpdk.org
> Subject: [dpdk-dev] [PATCH v2 1/2] mempool: allow for user-owned mempool caches
> 
> The mempool cache is only available to EAL threads as a per-lcore
> resource. Change this so that the user can create and provide their own
> cache on mempool get and put operations. This works with non-EAL threads
> too. This commit introduces the new API calls:
> 
>     rte_mempool_cache_create(size, socket_id)
>     rte_mempool_cache_flush(cache, mp)
>     rte_mempool_cache_free(cache)
>     rte_mempool_default_cache(mp, lcore_id)
>     rte_mempool_generic_put(mp, obj_table, n, cache, is_mp)
>     rte_mempool_generic_get(mp, obj_table, n, cache, is_mc)
> 
> Removes the API calls:
> 
>     rte_mempool_sp_put_bulk(mp, obj_table, n)
>     rte_mempool_sc_get_bulk(mp, obj_table, n)
>     rte_mempool_sp_put(mp, obj)
>     rte_mempool_sc_get(mp, obj)


Hmm, shouldn't we deprecate it first for a release before removing completely?
Let say for now you can just make them macros that calls the remaining functions or so.

> 
> And the remaining API calls use the per-lcore default local cache:
> 
>     rte_mempool_put_bulk(mp, obj_table, n)
>     rte_mempool_get_bulk(mp, obj_table, n)
>     rte_mempool_put(mp, obj)
>     rte_mempool_get(mp, obj)
> 
> Signed-off-by: Lazaros Koromilas <l at nofutznetworks.com>
> ---
>  app/test/test_mempool.c                |  58 +++++--
>  app/test/test_mempool_perf.c           |  46 +++++-
>  lib/librte_eal/common/eal_common_log.c |   8 +-
>  lib/librte_mempool/rte_mempool.c       |  76 ++++++++-
>  lib/librte_mempool/rte_mempool.h       | 291 +++++++++++++--------------------
>  5 files changed, 275 insertions(+), 204 deletions(-)
> 
> 
> diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
> index 73ca770..4d977c1 100644
> --- a/lib/librte_mempool/rte_mempool.c
> +++ b/lib/librte_mempool/rte_mempool.c
> @@ -375,6 +375,63 @@ rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
>  	return usz;
>  }
> 
> +static void
> +mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
> +{
> +	cache->size = size;
> +	cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
> +	cache->len = 0;
> +}
> +
> +/*
> + * Create and initialize a cache for objects that are retrieved from and
> + * returned to an underlying mempool. This structure is identical to the
> + * local_cache[lcore_id] pointed to by the mempool structure.
> + */
> +struct rte_mempool_cache *
> +rte_mempool_cache_create(uint32_t size, int socket_id)
> +{
> +	struct rte_mempool_cache *cache;
> +
> +	if (size > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +
> +	cache = rte_zmalloc_socket("MEMPOOL_CACHE", sizeof(*cache),
> +				   RTE_CACHE_LINE_SIZE, socket_id);
> +	if (cache == NULL) {
> +		RTE_LOG(ERR, MEMPOOL, "Cannot allocate mempool cache!\n");
> +		rte_errno = ENOMEM;
> +		return NULL;
> +	}
> +
> +	mempool_cache_init(cache, size);
> +
> +	return cache;
> +}
> +
> +/*
> + * Free a cache. It's the responsibility of the user to make sure that any
> + * remaining objects in the cache are flushed to the corresponding
> + * mempool.
> + */
> +void
> +rte_mempool_cache_free(struct rte_mempool_cache *cache)
> +{
> +	rte_free(cache);
> +}
> +
> +/*
> + * Put all objects in the cache to the specified mempool's ring.
> + */
> +void
> +rte_mempool_cache_flush(struct rte_mempool_cache *cache,
> +			struct rte_mempool *mp)
> +{
> +	rte_ring_enqueue_bulk(mp->ring, cache->objs, cache->len);

Shouldn't you also reset cache->len too here?
cache->len = 0;
Another thought - might be that function deserved to be inline one.

> +}
> +
>  #ifndef RTE_LIBRTE_XEN_DOM0
>  /* stub if DOM0 support not configured */
>  struct rte_mempool *
> @@ -448,6 +505,7 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  	struct rte_mempool_objsz objsz;
>  	void *startaddr;
>  	int page_size = getpagesize();
> +	unsigned lcore_id;
> 
>  	/* compilation-time checks */
>  	RTE_BUILD_BUG_ON((sizeof(struct rte_mempool) &
> @@ -583,8 +641,8 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  	mp->elt_size = objsz.elt_size;
>  	mp->header_size = objsz.header_size;
>  	mp->trailer_size = objsz.trailer_size;
> +	/* Size of default caches, zero means disabled. */
>  	mp->cache_size = cache_size;
> -	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
>  	mp->private_data_size = private_data_size;
> 
>  	/*
> @@ -594,6 +652,13 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  	mp->local_cache = (struct rte_mempool_cache *)
>  			((char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num, 0));
> 
> +	/* Init all default caches. */
> +	if (cache_size != 0) {
> +		for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
> +			mempool_cache_init(&mp->local_cache[lcore_id],
> +					   cache_size);
> +	}
> +
>  	/* calculate address of the first element for continuous mempool. */
>  	obj = (char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num, cache_size) +
>  		private_data_size;
> @@ -672,7 +737,7 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
>  	unsigned count = 0;
>  	unsigned cache_count;
> 
> -	fprintf(f, "  cache infos:\n");
> +	fprintf(f, "  internal cache infos:\n");
>  	fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
> 
>  	if (mp->cache_size == 0)
> @@ -680,7 +745,8 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
> 
>  	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
>  		cache_count = mp->local_cache[lcore_id].len;
> -		fprintf(f, "    cache_count[%u]=%u\n", lcore_id, cache_count);
> +		fprintf(f, "    cache_count[%u]=%"PRIu32"\n",
> +			lcore_id, cache_count);
>  		count += cache_count;
>  	}
>  	fprintf(f, "    total_cache_count=%u\n", count);
> @@ -760,7 +826,9 @@ mempool_audit_cache(const struct rte_mempool *mp)
>  		return;
> 
>  	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> -		if (mp->local_cache[lcore_id].len > mp->cache_flushthresh) {
> +		const struct rte_mempool_cache *cache;
> +		cache = &mp->local_cache[lcore_id];
> +		if (cache->len > cache->flushthresh) {
>  			RTE_LOG(CRIT, MEMPOOL, "badness on cache[%u]\n",
>  				lcore_id);
>  			rte_panic("MEMPOOL: invalid cache len\n");
> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
> index 8595e77..21d43e2 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -99,7 +99,9 @@ struct rte_mempool_debug_stats {
>   * A structure that stores a per-core object cache.
>   */
>  struct rte_mempool_cache {
> -	unsigned len; /**< Cache len */
> +	uint32_t size;        /**< Size of the cache */
> +	uint32_t flushthresh; /**< Threshold before we flush excess elements */
> +	uint32_t len;         /**< Current cache count */
>  	/*
>  	 * Cache is allocated to this size to allow it to overflow in certain
>  	 * cases to avoid needless emptying of cache.
> @@ -182,9 +184,7 @@ struct rte_mempool {
>  	phys_addr_t phys_addr;           /**< Phys. addr. of mempool struct. */
>  	int flags;                       /**< Flags of the mempool. */
>  	uint32_t size;                   /**< Size of the mempool. */
> -	uint32_t cache_size;             /**< Size of per-lcore local cache. */
> -	uint32_t cache_flushthresh;
> -	/**< Threshold before we flush excess elements. */
> +	uint32_t cache_size;             /**< Size of per-lcore default local cache. */
> 
>  	uint32_t elt_size;               /**< Size of an element. */
>  	uint32_t header_size;            /**< Size of header (before elt). */
> @@ -742,6 +742,66 @@ rte_dom0_mempool_create(const char *name, unsigned n, unsigned elt_size,
>  void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
> 
>  /**
> + * Create a user-owned mempool cache.
> + *
> + * This can be used by non-EAL threads to enable caching when they
> + * interact with a mempool.
> + *
> + * @param size
> + *   The size of the mempool cache. See rte_mempool_create()'s cache_size
> + *   parameter description for more information. The same limits and
> + *   considerations apply here too.
> + * @param socket_id
> + *   The socket identifier in the case of NUMA. The value can be
> + *   SOCKET_ID_ANY if there is no NUMA constraint for the reserved zone.
> + */
> +struct rte_mempool_cache *
> +rte_mempool_cache_create(uint32_t size, int socket_id);
> +
> +/**
> + * Free a user-owned mempool cache.
> + *
> + * @param cache
> + *   A pointer to the mempool cache.
> + */
> +void
> +rte_mempool_cache_free(struct rte_mempool_cache *cache);
> +
> +/**
> + * Flush a user-owned mempool cache to the specified mempool.
> + *
> + * @param cache
> + *   A pointer to the mempool cache.
> + * @param mp
> + *   A pointer to the mempool.
> + */
> +void
> +rte_mempool_cache_flush(struct rte_mempool_cache *cache,
> +			struct rte_mempool *mp);
> +
> +/**
> + * Get a pointer to the per-lcore default mempool cache.
> + *
> + * @param mp
> + *   A pointer to the mempool structure.
> + * @param lcore_id
> + *   The logical core id.
> + * @return
> + *   A pointer to the mempool cache or NULL if disabled or non-EAL thread.
> + */
> +static inline struct rte_mempool_cache *
> +rte_mempool_default_cache(struct rte_mempool *mp, unsigned lcore_id)
> +{
> +	if (mp->cache_size == 0)
> +		return NULL;
> +
> +	if (lcore_id >= RTE_MAX_LCORE)
> +		return NULL;
> +
> +	return &mp->local_cache[lcore_id];
> +}
> +
> +/**
>   * @internal Put several objects back in the mempool; used internally.
>   * @param mp
>   *   A pointer to the mempool structure.
> @@ -750,33 +810,29 @@ void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
>   * @param n
>   *   The number of objects to store back in the mempool, must be strictly
>   *   positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
>   * @param is_mp
>   *   Mono-producer (0) or multi-producers (1).
>   */
>  static inline void __attribute__((always_inline))
> -__mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> -		    unsigned n, int is_mp)
> +__mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
> +		      unsigned n, struct rte_mempool_cache *cache, int is_mp)
>  {
> -	struct rte_mempool_cache *cache;
>  	uint32_t index;
>  	void **cache_objs;
> -	unsigned lcore_id = rte_lcore_id();
> -	uint32_t cache_size = mp->cache_size;
> -	uint32_t flushthresh = mp->cache_flushthresh;
> 
>  	/* increment stat now, adding in mempool always success */
>  	__MEMPOOL_STAT_ADD(mp, put, n);
> 
> -	/* cache is not enabled or single producer or non-EAL thread */
> -	if (unlikely(cache_size == 0 || is_mp == 0 ||
> -		     lcore_id >= RTE_MAX_LCORE))
> +	/* No cache provided or cache is not enabled or single producer */
> +	if (unlikely(cache == NULL || cache->size == 0 || is_mp == 0))

Here and in other places do we really need that check: 'cache->size == 0'?
I mean at mempool_create() if mp->cache_size would be zero, then we wouldn't allocate
any local_cache[] ant all and cache would be 0 anyway.
Also in rte_mempool_cache_create() we can check that size > 0 and return -EINVAL otherwise.

>  		goto ring_enqueue;
> 
>  	/* Go straight to ring if put would overflow mem allocated for cache */
>  	if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE))
>  		goto ring_enqueue;
> 
> -	cache = &mp->local_cache[lcore_id];
>  	cache_objs = &cache->objs[cache->len];
> 
>  	/*
> @@ -792,10 +848,10 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> 
>  	cache->len += n;
> 
> -	if (cache->len >= flushthresh) {
> -		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
> -				cache->len - cache_size);
> -		cache->len = cache_size;
> +	if (cache->len >= cache->flushthresh) {
> +		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache->size],
> +				cache->len - cache->size);
> +		cache->len = cache->size;
>  	}
> 
>  	return;
> @@ -820,41 +876,27 @@ ring_enqueue:
>  #endif
>  }
> 


More information about the dev mailing list