[dpdk-dev] [PATCH v2 1/2] mempool: allow for user-owned mempool caches

Lazaros Koromilas l at nofutznetworks.com
Tue Apr 5 11:24:03 CEST 2016


Hi all,

I forgot to mention that this series applies on top of:

http://www.dpdk.org/dev/patchwork/patch/10492/

Thanks,
Lazaros.

On Mon, Apr 4, 2016 at 6:43 PM, Lazaros Koromilas <l at nofutznetworks.com> wrote:
> The mempool cache is only available to EAL threads as a per-lcore
> resource. Change this so that the user can create and provide their own
> cache on mempool get and put operations. This works with non-EAL threads
> too. This commit introduces the new API calls:
>
>     rte_mempool_cache_create(size, socket_id)
>     rte_mempool_cache_flush(cache, mp)
>     rte_mempool_cache_free(cache)
>     rte_mempool_default_cache(mp, lcore_id)
>     rte_mempool_generic_put(mp, obj_table, n, cache, is_mp)
>     rte_mempool_generic_get(mp, obj_table, n, cache, is_mc)
>
> Removes the API calls:
>
>     rte_mempool_sp_put_bulk(mp, obj_table, n)
>     rte_mempool_sc_get_bulk(mp, obj_table, n)
>     rte_mempool_sp_put(mp, obj)
>     rte_mempool_sc_get(mp, obj)
>
> And the remaining API calls use the per-lcore default local cache:
>
>     rte_mempool_put_bulk(mp, obj_table, n)
>     rte_mempool_get_bulk(mp, obj_table, n)
>     rte_mempool_put(mp, obj)
>     rte_mempool_get(mp, obj)
>
> Signed-off-by: Lazaros Koromilas <l at nofutznetworks.com>
> ---
>  app/test/test_mempool.c                |  58 +++++--
>  app/test/test_mempool_perf.c           |  46 +++++-
>  lib/librte_eal/common/eal_common_log.c |   8 +-
>  lib/librte_mempool/rte_mempool.c       |  76 ++++++++-
>  lib/librte_mempool/rte_mempool.h       | 291 +++++++++++++--------------------
>  5 files changed, 275 insertions(+), 204 deletions(-)
>
> diff --git a/app/test/test_mempool.c b/app/test/test_mempool.c
> index 10e1fa4..2dc0cf2 100644
> --- a/app/test/test_mempool.c
> +++ b/app/test/test_mempool.c
> @@ -79,6 +79,7 @@
>
>  static struct rte_mempool *mp;
>  static struct rte_mempool *mp_cache, *mp_nocache;
> +static int use_external_cache;
>
>  static rte_atomic32_t synchro;
>
> @@ -107,19 +108,33 @@ test_mempool_basic(void)
>         char *obj_data;
>         int ret = 0;
>         unsigned i, j;
> +       struct rte_mempool_cache *cache;
> +
> +       if (use_external_cache)
> +               /* Create a user-owned mempool cache. */
> +               cache = rte_mempool_cache_create(RTE_MEMPOOL_CACHE_MAX_SIZE,
> +                                                SOCKET_ID_ANY);
> +       else
> +               cache = rte_mempool_default_cache(mp, rte_lcore_id());
>
>         /* dump the mempool status */
>         rte_mempool_dump(stdout, mp);
>
>         printf("get an object\n");
> -       if (rte_mempool_get(mp, &obj) < 0)
> +       if (rte_mempool_generic_get(mp, &obj, 1, cache, 1) < 0)
>                 return -1;
>         rte_mempool_dump(stdout, mp);
>
>         /* tests that improve coverage */
>         printf("get object count\n");
> -       if (rte_mempool_count(mp) != MEMPOOL_SIZE - 1)
> -               return -1;
> +       if (use_external_cache) {
> +               /* We have to count the extra caches, one in this case. */
> +               if (rte_mempool_count(mp) + cache->len != MEMPOOL_SIZE - 1)
> +                       return -1;
> +       } else {
> +               if (rte_mempool_count(mp) != MEMPOOL_SIZE - 1)
> +                       return -1;
> +       }
>
>         printf("get private data\n");
>         if (rte_mempool_get_priv(mp) != (char *)mp +
> @@ -134,21 +149,21 @@ test_mempool_basic(void)
>                 return -1;
>
>         printf("put the object back\n");
> -       rte_mempool_put(mp, obj);
> +       rte_mempool_generic_put(mp, &obj, 1, cache, 1);
>         rte_mempool_dump(stdout, mp);
>
>         printf("get 2 objects\n");
> -       if (rte_mempool_get(mp, &obj) < 0)
> +       if (rte_mempool_generic_get(mp, &obj, 1, cache, 1) < 0)
>                 return -1;
> -       if (rte_mempool_get(mp, &obj2) < 0) {
> -               rte_mempool_put(mp, obj);
> +       if (rte_mempool_generic_get(mp, &obj2, 1, cache, 1) < 0) {
> +               rte_mempool_generic_put(mp, &obj, 1, cache, 1);
>                 return -1;
>         }
>         rte_mempool_dump(stdout, mp);
>
>         printf("put the objects back\n");
> -       rte_mempool_put(mp, obj);
> -       rte_mempool_put(mp, obj2);
> +       rte_mempool_generic_put(mp, &obj, 1, cache, 1);
> +       rte_mempool_generic_put(mp, &obj2, 1, cache, 1);
>         rte_mempool_dump(stdout, mp);
>
>         /*
> @@ -161,7 +176,7 @@ test_mempool_basic(void)
>         }
>
>         for (i=0; i<MEMPOOL_SIZE; i++) {
> -               if (rte_mempool_get(mp, &objtable[i]) < 0)
> +               if (rte_mempool_generic_get(mp, &objtable[i], 1, cache, 1) < 0)
>                         break;
>         }
>
> @@ -183,13 +198,18 @@ test_mempool_basic(void)
>                                 ret = -1;
>                 }
>
> -               rte_mempool_put(mp, objtable[i]);
> +               rte_mempool_generic_put(mp, &objtable[i], 1, cache, 1);
>         }
>
>         free(objtable);
>         if (ret == -1)
>                 printf("objects were modified!\n");
>
> +       if (use_external_cache) {
> +               rte_mempool_cache_flush(cache, mp);
> +               rte_mempool_cache_free(cache);
> +       }
> +
>         return ret;
>  }
>
> @@ -245,7 +265,7 @@ static int test_mempool_single_producer(void)
>                         printf("test_mempool_single_producer there is an obj not owned by this mempool\n");
>                         return -1;
>                 }
> -               rte_mempool_sp_put(mp_spsc, obj);
> +               rte_mempool_put(mp_spsc, obj);
>                 rte_spinlock_lock(&scsp_spinlock);
>                 scsp_obj_table[i] = NULL;
>                 rte_spinlock_unlock(&scsp_spinlock);
> @@ -278,7 +298,7 @@ static int test_mempool_single_consumer(void)
>                 rte_spinlock_unlock(&scsp_spinlock);
>                 if (i >= MAX_KEEP)
>                         continue;
> -               if (rte_mempool_sc_get(mp_spsc, &obj) < 0)
> +               if (rte_mempool_get(mp_spsc, &obj) < 0)
>                         break;
>                 rte_spinlock_lock(&scsp_spinlock);
>                 scsp_obj_table[i] = obj;
> @@ -371,12 +391,12 @@ test_mempool_basic_ex(struct rte_mempool * mp)
>         }
>
>         for (i = 0; i < MEMPOOL_SIZE; i ++) {
> -               if (rte_mempool_mc_get(mp, &obj[i]) < 0) {
> +               if (rte_mempool_get(mp, &obj[i]) < 0) {
>                         printf("fail_mp_basic_ex fail to get mempool object for [%u]\n", i);
>                         goto fail_mp_basic_ex;
>                 }
>         }
> -       if (rte_mempool_mc_get(mp, &err_obj) == 0) {
> +       if (rte_mempool_get(mp, &err_obj) == 0) {
>                 printf("test_mempool_basic_ex get an impossible obj from mempool\n");
>                 goto fail_mp_basic_ex;
>         }
> @@ -387,7 +407,7 @@ test_mempool_basic_ex(struct rte_mempool * mp)
>         }
>
>         for (i = 0; i < MEMPOOL_SIZE; i ++) {
> -               rte_mempool_mp_put(mp, obj[i]);
> +               rte_mempool_put(mp, obj[i]);
>         }
>         if (rte_mempool_full(mp) != 1) {
>                 printf("test_mempool_basic_ex the mempool is not full but it should be\n");
> @@ -499,6 +519,12 @@ test_mempool(void)
>         if (test_mempool_basic() < 0)
>                 return -1;
>
> +       /* basic tests with user-owned cache */
> +       mp = mp_nocache;
> +       use_external_cache = 1;
> +       if (test_mempool_basic() < 0)
> +               return -1;
> +
>         /* more basic tests without cache */
>         if (test_mempool_basic_ex(mp_nocache) < 0)
>                 return -1;
> diff --git a/app/test/test_mempool_perf.c b/app/test/test_mempool_perf.c
> index cdc02a0..e917f4d 100644
> --- a/app/test/test_mempool_perf.c
> +++ b/app/test/test_mempool_perf.c
> @@ -78,6 +78,9 @@
>   *      - One core without cache
>   *      - Two cores without cache
>   *      - Max. cores without cache
> + *      - One core with user-owned cache
> + *      - Two cores with user-owned cache
> + *      - Max. cores with user-owned cache
>   *
>   *    - Bulk size (*n_get_bulk*, *n_put_bulk*)
>   *
> @@ -98,6 +101,8 @@
>
>  static struct rte_mempool *mp;
>  static struct rte_mempool *mp_cache, *mp_nocache;
> +static int use_external_cache;
> +static unsigned external_cache_size = RTE_MEMPOOL_CACHE_MAX_SIZE;
>
>  static rte_atomic32_t synchro;
>
> @@ -137,6 +142,14 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
>         int ret;
>         uint64_t start_cycles, end_cycles;
>         uint64_t time_diff = 0, hz = rte_get_timer_hz();
> +       struct rte_mempool_cache *cache;
> +
> +       if (use_external_cache)
> +               /* Create a user-owned mempool cache. */
> +               cache = rte_mempool_cache_create(external_cache_size,
> +                                                SOCKET_ID_ANY);
> +       else
> +               cache = rte_mempool_default_cache(mp, lcore_id);
>
>         /* n_get_bulk and n_put_bulk must be divisors of n_keep */
>         if (((n_keep / n_get_bulk) * n_get_bulk) != n_keep)
> @@ -157,8 +170,9 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
>                         /* get n_keep objects by bulk of n_bulk */
>                         idx = 0;
>                         while (idx < n_keep) {
> -                               ret = rte_mempool_get_bulk(mp, &obj_table[idx],
> -                                                          n_get_bulk);
> +                               ret = rte_mempool_generic_get(mp, &obj_table[idx],
> +                                                             n_get_bulk,
> +                                                             cache, 1);
>                                 if (unlikely(ret < 0)) {
>                                         rte_mempool_dump(stdout, mp);
>                                         rte_ring_dump(stdout, mp->ring);
> @@ -171,8 +185,9 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
>                         /* put the objects back */
>                         idx = 0;
>                         while (idx < n_keep) {
> -                               rte_mempool_put_bulk(mp, &obj_table[idx],
> -                                                    n_put_bulk);
> +                               rte_mempool_generic_put(mp, &obj_table[idx],
> +                                                       n_put_bulk,
> +                                                       cache, 1);
>                                 idx += n_put_bulk;
>                         }
>                 }
> @@ -181,6 +196,11 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
>                 stats[lcore_id].enq_count += N;
>         }
>
> +       if (use_external_cache) {
> +               rte_mempool_cache_flush(cache, mp);
> +               rte_mempool_cache_free(cache);
> +       }
> +
>         return 0;
>  }
>
> @@ -200,7 +220,9 @@ launch_cores(unsigned cores)
>
>         printf("mempool_autotest cache=%u cores=%u n_get_bulk=%u "
>                "n_put_bulk=%u n_keep=%u ",
> -              (unsigned) mp->cache_size, cores, n_get_bulk, n_put_bulk, n_keep);
> +              use_external_cache ?
> +                  external_cache_size : (unsigned) mp->cache_size,
> +              cores, n_get_bulk, n_put_bulk, n_keep);
>
>         if (rte_mempool_count(mp) != MEMPOOL_SIZE) {
>                 printf("mempool is not full\n");
> @@ -324,6 +346,20 @@ test_mempool_perf(void)
>         if (do_one_mempool_test(rte_lcore_count()) < 0)
>                 return -1;
>
> +       /* performance test with 1, 2 and max cores */
> +       printf("start performance test (with user-owned cache)\n");
> +       mp = mp_nocache;
> +       use_external_cache = 1;
> +
> +       if (do_one_mempool_test(1) < 0)
> +               return -1;
> +
> +       if (do_one_mempool_test(2) < 0)
> +               return -1;
> +
> +       if (do_one_mempool_test(rte_lcore_count()) < 0)
> +               return -1;
> +
>         rte_mempool_list_dump(stdout);
>
>         return 0;
> diff --git a/lib/librte_eal/common/eal_common_log.c b/lib/librte_eal/common/eal_common_log.c
> index 1ae8de7..07aee22 100644
> --- a/lib/librte_eal/common/eal_common_log.c
> +++ b/lib/librte_eal/common/eal_common_log.c
> @@ -125,7 +125,7 @@ rte_log_add_in_history(const char *buf, size_t size)
>                 }
>         }
>         else {
> -               if (rte_mempool_mc_get(log_history_mp, &obj) < 0)
> +               if (rte_mempool_get(log_history_mp, &obj) < 0)
>                         obj = NULL;
>                 hist_buf = obj;
>         }
> @@ -138,7 +138,7 @@ rte_log_add_in_history(const char *buf, size_t size)
>
>         /* not enough room for msg, buffer go back in mempool */
>         if (size >= hist_buf_size) {
> -               rte_mempool_mp_put(log_history_mp, hist_buf);
> +               rte_mempool_put(log_history_mp, hist_buf);
>                 rte_spinlock_unlock(&log_list_lock);
>                 return -ENOBUFS;
>         }
> @@ -252,12 +252,12 @@ rte_log_dump_history(FILE *out)
>
>                 /* write on stdout */
>                 if (fwrite(hist_buf->buf, hist_buf->size, 1, out) == 0) {
> -                       rte_mempool_mp_put(log_history_mp, hist_buf);
> +                       rte_mempool_put(log_history_mp, hist_buf);
>                         break;
>                 }
>
>                 /* put back message structure in pool */
> -               rte_mempool_mp_put(log_history_mp, hist_buf);
> +               rte_mempool_put(log_history_mp, hist_buf);
>         }
>         fflush(out);
>
> diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
> index 73ca770..4d977c1 100644
> --- a/lib/librte_mempool/rte_mempool.c
> +++ b/lib/librte_mempool/rte_mempool.c
> @@ -375,6 +375,63 @@ rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
>         return usz;
>  }
>
> +static void
> +mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
> +{
> +       cache->size = size;
> +       cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
> +       cache->len = 0;
> +}
> +
> +/*
> + * Create and initialize a cache for objects that are retrieved from and
> + * returned to an underlying mempool. This structure is identical to the
> + * local_cache[lcore_id] pointed to by the mempool structure.
> + */
> +struct rte_mempool_cache *
> +rte_mempool_cache_create(uint32_t size, int socket_id)
> +{
> +       struct rte_mempool_cache *cache;
> +
> +       if (size > RTE_MEMPOOL_CACHE_MAX_SIZE) {
> +               rte_errno = EINVAL;
> +               return NULL;
> +       }
> +
> +       cache = rte_zmalloc_socket("MEMPOOL_CACHE", sizeof(*cache),
> +                                  RTE_CACHE_LINE_SIZE, socket_id);
> +       if (cache == NULL) {
> +               RTE_LOG(ERR, MEMPOOL, "Cannot allocate mempool cache!\n");
> +               rte_errno = ENOMEM;
> +               return NULL;
> +       }
> +
> +       mempool_cache_init(cache, size);
> +
> +       return cache;
> +}
> +
> +/*
> + * Free a cache. It's the responsibility of the user to make sure that any
> + * remaining objects in the cache are flushed to the corresponding
> + * mempool.
> + */
> +void
> +rte_mempool_cache_free(struct rte_mempool_cache *cache)
> +{
> +       rte_free(cache);
> +}
> +
> +/*
> + * Put all objects in the cache to the specified mempool's ring.
> + */
> +void
> +rte_mempool_cache_flush(struct rte_mempool_cache *cache,
> +                       struct rte_mempool *mp)
> +{
> +       rte_ring_enqueue_bulk(mp->ring, cache->objs, cache->len);
> +}
> +
>  #ifndef RTE_LIBRTE_XEN_DOM0
>  /* stub if DOM0 support not configured */
>  struct rte_mempool *
> @@ -448,6 +505,7 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>         struct rte_mempool_objsz objsz;
>         void *startaddr;
>         int page_size = getpagesize();
> +       unsigned lcore_id;
>
>         /* compilation-time checks */
>         RTE_BUILD_BUG_ON((sizeof(struct rte_mempool) &
> @@ -583,8 +641,8 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>         mp->elt_size = objsz.elt_size;
>         mp->header_size = objsz.header_size;
>         mp->trailer_size = objsz.trailer_size;
> +       /* Size of default caches, zero means disabled. */
>         mp->cache_size = cache_size;
> -       mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
>         mp->private_data_size = private_data_size;
>
>         /*
> @@ -594,6 +652,13 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>         mp->local_cache = (struct rte_mempool_cache *)
>                         ((char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num, 0));
>
> +       /* Init all default caches. */
> +       if (cache_size != 0) {
> +               for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
> +                       mempool_cache_init(&mp->local_cache[lcore_id],
> +                                          cache_size);
> +       }
> +
>         /* calculate address of the first element for continuous mempool. */
>         obj = (char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num, cache_size) +
>                 private_data_size;
> @@ -672,7 +737,7 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
>         unsigned count = 0;
>         unsigned cache_count;
>
> -       fprintf(f, "  cache infos:\n");
> +       fprintf(f, "  internal cache infos:\n");
>         fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
>
>         if (mp->cache_size == 0)
> @@ -680,7 +745,8 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
>
>         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
>                 cache_count = mp->local_cache[lcore_id].len;
> -               fprintf(f, "    cache_count[%u]=%u\n", lcore_id, cache_count);
> +               fprintf(f, "    cache_count[%u]=%"PRIu32"\n",
> +                       lcore_id, cache_count);
>                 count += cache_count;
>         }
>         fprintf(f, "    total_cache_count=%u\n", count);
> @@ -760,7 +826,9 @@ mempool_audit_cache(const struct rte_mempool *mp)
>                 return;
>
>         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
> -               if (mp->local_cache[lcore_id].len > mp->cache_flushthresh) {
> +               const struct rte_mempool_cache *cache;
> +               cache = &mp->local_cache[lcore_id];
> +               if (cache->len > cache->flushthresh) {
>                         RTE_LOG(CRIT, MEMPOOL, "badness on cache[%u]\n",
>                                 lcore_id);
>                         rte_panic("MEMPOOL: invalid cache len\n");
> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
> index 8595e77..21d43e2 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -99,7 +99,9 @@ struct rte_mempool_debug_stats {
>   * A structure that stores a per-core object cache.
>   */
>  struct rte_mempool_cache {
> -       unsigned len; /**< Cache len */
> +       uint32_t size;        /**< Size of the cache */
> +       uint32_t flushthresh; /**< Threshold before we flush excess elements */
> +       uint32_t len;         /**< Current cache count */
>         /*
>          * Cache is allocated to this size to allow it to overflow in certain
>          * cases to avoid needless emptying of cache.
> @@ -182,9 +184,7 @@ struct rte_mempool {
>         phys_addr_t phys_addr;           /**< Phys. addr. of mempool struct. */
>         int flags;                       /**< Flags of the mempool. */
>         uint32_t size;                   /**< Size of the mempool. */
> -       uint32_t cache_size;             /**< Size of per-lcore local cache. */
> -       uint32_t cache_flushthresh;
> -       /**< Threshold before we flush excess elements. */
> +       uint32_t cache_size;             /**< Size of per-lcore default local cache. */
>
>         uint32_t elt_size;               /**< Size of an element. */
>         uint32_t header_size;            /**< Size of header (before elt). */
> @@ -742,6 +742,66 @@ rte_dom0_mempool_create(const char *name, unsigned n, unsigned elt_size,
>  void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
>
>  /**
> + * Create a user-owned mempool cache.
> + *
> + * This can be used by non-EAL threads to enable caching when they
> + * interact with a mempool.
> + *
> + * @param size
> + *   The size of the mempool cache. See rte_mempool_create()'s cache_size
> + *   parameter description for more information. The same limits and
> + *   considerations apply here too.
> + * @param socket_id
> + *   The socket identifier in the case of NUMA. The value can be
> + *   SOCKET_ID_ANY if there is no NUMA constraint for the reserved zone.
> + */
> +struct rte_mempool_cache *
> +rte_mempool_cache_create(uint32_t size, int socket_id);
> +
> +/**
> + * Free a user-owned mempool cache.
> + *
> + * @param cache
> + *   A pointer to the mempool cache.
> + */
> +void
> +rte_mempool_cache_free(struct rte_mempool_cache *cache);
> +
> +/**
> + * Flush a user-owned mempool cache to the specified mempool.
> + *
> + * @param cache
> + *   A pointer to the mempool cache.
> + * @param mp
> + *   A pointer to the mempool.
> + */
> +void
> +rte_mempool_cache_flush(struct rte_mempool_cache *cache,
> +                       struct rte_mempool *mp);
> +
> +/**
> + * Get a pointer to the per-lcore default mempool cache.
> + *
> + * @param mp
> + *   A pointer to the mempool structure.
> + * @param lcore_id
> + *   The logical core id.
> + * @return
> + *   A pointer to the mempool cache or NULL if disabled or non-EAL thread.
> + */
> +static inline struct rte_mempool_cache *
> +rte_mempool_default_cache(struct rte_mempool *mp, unsigned lcore_id)
> +{
> +       if (mp->cache_size == 0)
> +               return NULL;
> +
> +       if (lcore_id >= RTE_MAX_LCORE)
> +               return NULL;
> +
> +       return &mp->local_cache[lcore_id];
> +}
> +
> +/**
>   * @internal Put several objects back in the mempool; used internally.
>   * @param mp
>   *   A pointer to the mempool structure.
> @@ -750,33 +810,29 @@ void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
>   * @param n
>   *   The number of objects to store back in the mempool, must be strictly
>   *   positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
>   * @param is_mp
>   *   Mono-producer (0) or multi-producers (1).
>   */
>  static inline void __attribute__((always_inline))
> -__mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> -                   unsigned n, int is_mp)
> +__mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
> +                     unsigned n, struct rte_mempool_cache *cache, int is_mp)
>  {
> -       struct rte_mempool_cache *cache;
>         uint32_t index;
>         void **cache_objs;
> -       unsigned lcore_id = rte_lcore_id();
> -       uint32_t cache_size = mp->cache_size;
> -       uint32_t flushthresh = mp->cache_flushthresh;
>
>         /* increment stat now, adding in mempool always success */
>         __MEMPOOL_STAT_ADD(mp, put, n);
>
> -       /* cache is not enabled or single producer or non-EAL thread */
> -       if (unlikely(cache_size == 0 || is_mp == 0 ||
> -                    lcore_id >= RTE_MAX_LCORE))
> +       /* No cache provided or cache is not enabled or single producer */
> +       if (unlikely(cache == NULL || cache->size == 0 || is_mp == 0))
>                 goto ring_enqueue;
>
>         /* Go straight to ring if put would overflow mem allocated for cache */
>         if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE))
>                 goto ring_enqueue;
>
> -       cache = &mp->local_cache[lcore_id];
>         cache_objs = &cache->objs[cache->len];
>
>         /*
> @@ -792,10 +848,10 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>
>         cache->len += n;
>
> -       if (cache->len >= flushthresh) {
> -               rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
> -                               cache->len - cache_size);
> -               cache->len = cache_size;
> +       if (cache->len >= cache->flushthresh) {
> +               rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache->size],
> +                               cache->len - cache->size);
> +               cache->len = cache->size;
>         }
>
>         return;
> @@ -820,41 +876,27 @@ ring_enqueue:
>  #endif
>  }
>
> -
>  /**
> - * Put several objects back in the mempool (multi-producers safe).
> + * Put several objects back in the mempool.
>   *
>   * @param mp
>   *   A pointer to the mempool structure.
>   * @param obj_table
>   *   A pointer to a table of void * pointers (objects).
>   * @param n
> - *   The number of objects to add in the mempool from the obj_table.
> + *   The number of objects to store back in the mempool, must be strictly
> + *   positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
> + * @param is_mp
> + *   Mono-producer (0) or multi-producers (1).
>   */
>  static inline void __attribute__((always_inline))
> -rte_mempool_mp_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> -                       unsigned n)
> -{
> -       __mempool_check_cookies(mp, obj_table, n, 0);
> -       __mempool_put_bulk(mp, obj_table, n, 1);
> -}
> -
> -/**
> - * Put several objects back in the mempool (NOT multi-producers safe).
> - *
> - * @param mp
> - *   A pointer to the mempool structure.
> - * @param obj_table
> - *   A pointer to a table of void * pointers (objects).
> - * @param n
> - *   The number of objects to add in the mempool from obj_table.
> - */
> -static inline void
> -rte_mempool_sp_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> -                       unsigned n)
> +rte_mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
> +                       unsigned n, struct rte_mempool_cache *cache, int is_mp)
>  {
>         __mempool_check_cookies(mp, obj_table, n, 0);
> -       __mempool_put_bulk(mp, obj_table, n, 0);
> +       __mempool_generic_put(mp, obj_table, n, cache, is_mp);
>  }
>
>  /**
> @@ -875,36 +917,11 @@ static inline void __attribute__((always_inline))
>  rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>                      unsigned n)
>  {
> -       __mempool_check_cookies(mp, obj_table, n, 0);
> -       __mempool_put_bulk(mp, obj_table, n, !(mp->flags & MEMPOOL_F_SP_PUT));
> -}
> -
> -/**
> - * Put one object in the mempool (multi-producers safe).
> - *
> - * @param mp
> - *   A pointer to the mempool structure.
> - * @param obj
> - *   A pointer to the object to be added.
> - */
> -static inline void __attribute__((always_inline))
> -rte_mempool_mp_put(struct rte_mempool *mp, void *obj)
> -{
> -       rte_mempool_mp_put_bulk(mp, &obj, 1);
> -}
> +       struct rte_mempool_cache *cache;
>
> -/**
> - * Put one object back in the mempool (NOT multi-producers safe).
> - *
> - * @param mp
> - *   A pointer to the mempool structure.
> - * @param obj
> - *   A pointer to the object to be added.
> - */
> -static inline void __attribute__((always_inline))
> -rte_mempool_sp_put(struct rte_mempool *mp, void *obj)
> -{
> -       rte_mempool_sp_put_bulk(mp, &obj, 1);
> +       cache = rte_mempool_default_cache(mp, rte_lcore_id());
> +       rte_mempool_generic_put(mp, obj_table, n, cache,
> +                               !(mp->flags & MEMPOOL_F_SP_PUT));
>  }
>
>  /**
> @@ -933,6 +950,8 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>   *   A pointer to a table of void * pointers (objects).
>   * @param n
>   *   The number of objects to get, must be strictly positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
>   * @param is_mc
>   *   Mono-consumer (0) or multi-consumers (1).
>   * @return
> @@ -940,28 +959,24 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>   *   - <0: Error; code of ring dequeue function.
>   */
>  static inline int __attribute__((always_inline))
> -__mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
> -                  unsigned n, int is_mc)
> +__mempool_generic_get(struct rte_mempool *mp, void **obj_table,
> +                     unsigned n, struct rte_mempool_cache *cache, int is_mc)
>  {
>         int ret;
> -       struct rte_mempool_cache *cache;
>         uint32_t index, len;
>         void **cache_objs;
> -       unsigned lcore_id = rte_lcore_id();
> -       uint32_t cache_size = mp->cache_size;
>
> -       /* cache is not enabled or single consumer */
> -       if (unlikely(cache_size == 0 || is_mc == 0 ||
> -                    n >= cache_size || lcore_id >= RTE_MAX_LCORE))
> +       /* No cache provided or cache is not enabled or single consumer */
> +       if (unlikely(cache == NULL || cache->size == 0 || is_mc == 0 ||
> +                    n >= cache->size))
>                 goto ring_dequeue;
>
> -       cache = &mp->local_cache[lcore_id];
>         cache_objs = cache->objs;
>
>         /* Can this be satisfied from the cache? */
>         if (cache->len < n) {
>                 /* No. Backfill the cache first, and then fill from it */
> -               uint32_t req = n + (cache_size - cache->len);
> +               uint32_t req = n + (cache->size - cache->len);
>
>                 /* How many do we require i.e. number to fill the cache + the request */
>                 ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
> @@ -1005,57 +1020,28 @@ ring_dequeue:
>  }
>
>  /**
> - * Get several objects from the mempool (multi-consumers safe).
> - *
> - * If cache is enabled, objects will be retrieved first from cache,
> - * subsequently from the common pool. Note that it can return -ENOENT when
> - * the local cache and common pool are empty, even if cache from other
> - * lcores are full.
> - *
> - * @param mp
> - *   A pointer to the mempool structure.
> - * @param obj_table
> - *   A pointer to a table of void * pointers (objects) that will be filled.
> - * @param n
> - *   The number of objects to get from mempool to obj_table.
> - * @return
> - *   - 0: Success; objects taken.
> - *   - -ENOENT: Not enough entries in the mempool; no object is retrieved.
> - */
> -static inline int __attribute__((always_inline))
> -rte_mempool_mc_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
> -{
> -       int ret;
> -       ret = __mempool_get_bulk(mp, obj_table, n, 1);
> -       if (ret == 0)
> -               __mempool_check_cookies(mp, obj_table, n, 1);
> -       return ret;
> -}
> -
> -/**
> - * Get several objects from the mempool (NOT multi-consumers safe).
> - *
> - * If cache is enabled, objects will be retrieved first from cache,
> - * subsequently from the common pool. Note that it can return -ENOENT when
> - * the local cache and common pool are empty, even if cache from other
> - * lcores are full.
> + * Get several objects from the mempool.
>   *
>   * @param mp
>   *   A pointer to the mempool structure.
>   * @param obj_table
> - *   A pointer to a table of void * pointers (objects) that will be filled.
> + *   A pointer to a table of void * pointers (objects).
>   * @param n
> - *   The number of objects to get from the mempool to obj_table.
> + *   The number of objects to get, must be strictly positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not needed.
> + * @param is_mc
> + *   Mono-consumer (0) or multi-consumers (1).
>   * @return
> - *   - 0: Success; objects taken.
> - *   - -ENOENT: Not enough entries in the mempool; no object is
> - *     retrieved.
> + *   - >=0: Success; number of objects supplied.
> + *   - <0: Error; code of ring dequeue function.
>   */
>  static inline int __attribute__((always_inline))
> -rte_mempool_sc_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
> +rte_mempool_generic_get(struct rte_mempool *mp, void **obj_table,
> +                       unsigned n, struct rte_mempool_cache *cache, int is_mc)
>  {
>         int ret;
> -       ret = __mempool_get_bulk(mp, obj_table, n, 0);
> +       ret = __mempool_generic_get(mp, obj_table, n, cache, is_mc);
>         if (ret == 0)
>                 __mempool_check_cookies(mp, obj_table, n, 1);
>         return ret;
> @@ -1086,56 +1072,11 @@ rte_mempool_sc_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
>  static inline int __attribute__((always_inline))
>  rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
>  {
> -       int ret;
> -       ret = __mempool_get_bulk(mp, obj_table, n,
> -                                !(mp->flags & MEMPOOL_F_SC_GET));
> -       if (ret == 0)
> -               __mempool_check_cookies(mp, obj_table, n, 1);
> -       return ret;
> -}
> -
> -/**
> - * Get one object from the mempool (multi-consumers safe).
> - *
> - * If cache is enabled, objects will be retrieved first from cache,
> - * subsequently from the common pool. Note that it can return -ENOENT when
> - * the local cache and common pool are empty, even if cache from other
> - * lcores are full.
> - *
> - * @param mp
> - *   A pointer to the mempool structure.
> - * @param obj_p
> - *   A pointer to a void * pointer (object) that will be filled.
> - * @return
> - *   - 0: Success; objects taken.
> - *   - -ENOENT: Not enough entries in the mempool; no object is retrieved.
> - */
> -static inline int __attribute__((always_inline))
> -rte_mempool_mc_get(struct rte_mempool *mp, void **obj_p)
> -{
> -       return rte_mempool_mc_get_bulk(mp, obj_p, 1);
> -}
> +       struct rte_mempool_cache *cache;
>
> -/**
> - * Get one object from the mempool (NOT multi-consumers safe).
> - *
> - * If cache is enabled, objects will be retrieved first from cache,
> - * subsequently from the common pool. Note that it can return -ENOENT when
> - * the local cache and common pool are empty, even if cache from other
> - * lcores are full.
> - *
> - * @param mp
> - *   A pointer to the mempool structure.
> - * @param obj_p
> - *   A pointer to a void * pointer (object) that will be filled.
> - * @return
> - *   - 0: Success; objects taken.
> - *   - -ENOENT: Not enough entries in the mempool; no object is retrieved.
> - */
> -static inline int __attribute__((always_inline))
> -rte_mempool_sc_get(struct rte_mempool *mp, void **obj_p)
> -{
> -       return rte_mempool_sc_get_bulk(mp, obj_p, 1);
> +       cache = rte_mempool_default_cache(mp, rte_lcore_id());
> +       return rte_mempool_generic_get(mp, obj_table, n, cache,
> +                                      !(mp->flags & MEMPOOL_F_SC_GET));
>  }
>
>  /**
> @@ -1169,7 +1110,7 @@ rte_mempool_get(struct rte_mempool *mp, void **obj_p)
>   *
>   * When cache is enabled, this function has to browse the length of
>   * all lcores, so it should not be used in a data path, but only for
> - * debug purposes.
> + * debug purposes. User-owned mempool caches are not accounted for.
>   *
>   * @param mp
>   *   A pointer to the mempool structure.
> @@ -1188,7 +1129,7 @@ unsigned rte_mempool_count(const struct rte_mempool *mp);
>   *
>   * When cache is enabled, this function has to browse the length of
>   * all lcores, so it should not be used in a data path, but only for
> - * debug purposes.
> + * debug purposes. User-owned mempool caches are not accounted for.
>   *
>   * @param mp
>   *   A pointer to the mempool structure.
> @@ -1206,7 +1147,7 @@ rte_mempool_free_count(const struct rte_mempool *mp)
>   *
>   * When cache is enabled, this function has to browse the length of all
>   * lcores, so it should not be used in a data path, but only for debug
> - * purposes.
> + * purposes. User-owned mempool caches are not accounted for.
>   *
>   * @param mp
>   *   A pointer to the mempool structure.
> @@ -1225,7 +1166,7 @@ rte_mempool_full(const struct rte_mempool *mp)
>   *
>   * When cache is enabled, this function has to browse the length of all
>   * lcores, so it should not be used in a data path, but only for debug
> - * purposes.
> + * purposes. User-owned mempool caches are not accounted for.
>   *
>   * @param mp
>   *   A pointer to the mempool structure.
> --
> 1.9.1
>


More information about the dev mailing list