[dpdk-dev,RFC,3/6] mempool/bucket: implement bucket mempool manager
Checks
Commit Message
From: "Artem V. Andreev" <Artem.Andreev@oktetlabs.ru>
The manager provides a way to allocate physically and virtually
contiguous set of objects.
Note: due to the way objects are organized in the bucket manager,
the get_avail_count may return less objects than were enqueued.
That breaks the expectation of mempool and mempool_perf tests.
Signed-off-by: Artem V. Andreev <Artem.Andreev@oktetlabs.ru>
Signed-off-by: Andrew Rybchenko <arybchenko@solarflare.com>
---
MAINTAINERS | 9 +
config/common_base | 2 +
drivers/mempool/Makefile | 1 +
drivers/mempool/bucket/Makefile | 49 ++
drivers/mempool/bucket/rte_mempool_bucket.c | 521 +++++++++++++++++++++
.../mempool/bucket/rte_mempool_bucket_version.map | 4 +
mk/rte.app.mk | 1 +
7 files changed, 587 insertions(+)
create mode 100644 drivers/mempool/bucket/Makefile
create mode 100644 drivers/mempool/bucket/rte_mempool_bucket.c
create mode 100644 drivers/mempool/bucket/rte_mempool_bucket_version.map
Comments
On Fri, Nov 24, 2017 at 04:06:28PM +0000, Andrew Rybchenko wrote:
> From: "Artem V. Andreev" <Artem.Andreev@oktetlabs.ru>
>
> The manager provides a way to allocate physically and virtually
> contiguous set of objects.
>
> Note: due to the way objects are organized in the bucket manager,
> the get_avail_count may return less objects than were enqueued.
> That breaks the expectation of mempool and mempool_perf tests.
To me, this can be problematic. The driver should respect the
API, or it will trigger hard-to-debug issues in applications. Can't
this be fixed in some way or another?
[...]
> --- a/config/common_base
> +++ b/config/common_base
> @@ -608,6 +608,8 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n
> #
> # Compile Mempool drivers
> #
> +CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
> +CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=32
> CONFIG_RTE_DRIVER_MEMPOOL_RING=y
> CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
>
Why 32KB?
Why not more, or less?
Can it be a runtime parameter?
I guess it won't work with too large objects.
[...]
> +struct bucket_data {
> + unsigned int header_size;
> + unsigned int chunk_size;
> + unsigned int bucket_size;
> + uintptr_t bucket_page_mask;
> + struct rte_ring *shared_bucket_ring;
> + struct bucket_stack *buckets[RTE_MAX_LCORE];
> + /*
> + * Multi-producer single-consumer ring to hold objects that are
> + * returned to the mempool at a different lcore than initially
> + * dequeued
> + */
> + struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
> + struct rte_ring *shared_orphan_ring;
> + struct rte_mempool *pool;
> +
> +};
I'm seeing per-core structures. Will it work on non-dataplane cores?
For instance, if a control thread wants to allocate a mbuf?
If possible, these fields should be more documented (or just renamed).
For instance, I suggest chunk_size could be called obj_per_bucket, which
better described the content of the field.
[...]
> +static int
> +bucket_enqueue_single(struct bucket_data *data, void *obj)
> +{
> + int rc = 0;
> + uintptr_t addr = (uintptr_t)obj;
> + struct bucket_header *hdr;
> + unsigned int lcore_id = rte_lcore_id();
> +
> + addr &= data->bucket_page_mask;
> + hdr = (struct bucket_header *)addr;
> +
> + if (likely(hdr->lcore_id == lcore_id)) {
> + if (hdr->fill_cnt < data->bucket_size - 1) {
> + hdr->fill_cnt++;
> + } else {
> + hdr->fill_cnt = 0;
> + /* Stack is big enough to put all buckets */
> + bucket_stack_push(data->buckets[lcore_id], hdr);
> + }
> + } else if (hdr->lcore_id != LCORE_ID_ANY) {
> + struct rte_ring *adopt_ring =
> + data->adoption_buffer_rings[hdr->lcore_id];
> +
> + rc = rte_ring_enqueue(adopt_ring, obj);
> + /* Ring is big enough to put all objects */
> + RTE_ASSERT(rc == 0);
> + } else if (hdr->fill_cnt < data->bucket_size - 1) {
> + hdr->fill_cnt++;
> + } else {
> + hdr->fill_cnt = 0;
> + rc = rte_ring_enqueue(data->shared_bucket_ring, hdr);
> + /* Ring is big enough to put all buckets */
> + RTE_ASSERT(rc == 0);
> + }
> +
> + return rc;
> +}
[...]
> +static int
> +bucket_dequeue_buckets(struct bucket_data *data, void **obj_table,
> + unsigned int n_buckets)
> +{
> + struct bucket_stack *cur_stack = data->buckets[rte_lcore_id()];
> + unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
> + void **obj_table_base = obj_table;
> +
> + n_buckets -= n_buckets_from_stack;
> + while (n_buckets_from_stack-- > 0) {
> + void *obj = bucket_stack_pop_unsafe(cur_stack);
> +
> + obj_table = bucket_fill_obj_table(data, &obj, obj_table,
> + data->bucket_size);
> + }
> + while (n_buckets-- > 0) {
> + struct bucket_header *hdr;
> +
> + if (unlikely(rte_ring_dequeue(data->shared_bucket_ring,
> + (void **)&hdr) != 0)) {
> + /* Return the already-dequeued buffers
> + * back to the mempool
> + */
> + bucket_enqueue(data->pool, obj_table_base,
> + obj_table - obj_table_base);
> + rte_errno = ENOBUFS;
> + return -rte_errno;
> + }
> + hdr->lcore_id = rte_lcore_id();
> + obj_table = bucket_fill_obj_table(data, (void **)&hdr,
> + obj_table, data->bucket_size);
> + }
> +
> + return 0;
> +}
[...]
> +static int
> +bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
> +{
> + struct bucket_data *data = mp->pool_data;
> + unsigned int n_buckets = n / data->bucket_size;
> + unsigned int n_orphans = n - n_buckets * data->bucket_size;
> + int rc = 0;
> +
> + bucket_adopt_orphans(data);
> +
> + if (unlikely(n_orphans > 0)) {
> + rc = bucket_dequeue_orphans(data, obj_table +
> + (n_buckets * data->bucket_size),
> + n_orphans);
> + if (rc != 0)
> + return rc;
> + }
> +
> + if (likely(n_buckets > 0)) {
> + rc = bucket_dequeue_buckets(data, obj_table, n_buckets);
> + if (unlikely(rc != 0) && n_orphans > 0) {
> + rte_ring_enqueue_bulk(data->shared_orphan_ring,
> + obj_table + (n_buckets *
> + data->bucket_size),
> + n_orphans, NULL);
> + }
> + }
> +
> + return rc;
> +}
If my understanding is correct, at initialization, all full buckets will
go to the data->shared_bucket_ring ring, with lcore_id == ANY (this is
done in register_mem).
(note: I feel 'data' is not an ideal name for bucket_data)
If the core 0 allocates all the mbufs, and then frees them all, they
will be stored in the per-core stack, with hdr->lcoreid == 0. Is it
right?
If yes, can core 1 allocate a mbuf after that?
> +static unsigned int
> +bucket_get_count(const struct rte_mempool *mp)
> +{
> + const struct bucket_data *data = mp->pool_data;
> + const struct bucket_stack *local_bucket_stack =
> + data->buckets[rte_lcore_id()];
> +
> + return data->bucket_size * local_bucket_stack->top +
> + data->bucket_size * rte_ring_count(data->shared_bucket_ring) +
> + rte_ring_count(data->shared_orphan_ring);
> +}
It looks that get_count only rely on the current core stack usage
and ignore the other core stacks.
[...]
> +static int
> +bucket_register_memory_area(__rte_unused const struct rte_mempool *mp,
> + char *vaddr, __rte_unused phys_addr_t paddr,
> + size_t len)
> +{
> + /* mp->pool_data may be still uninitialized at this point */
> + unsigned int chunk_size = mp->header_size + mp->elt_size +
> + mp->trailer_size;
> + unsigned int bucket_mem_size =
> + (BUCKET_MEM_SIZE / chunk_size) * chunk_size;
> + unsigned int bucket_page_sz = rte_align32pow2(bucket_mem_size);
> + uintptr_t align;
> + char *iter;
> +
> + align = RTE_PTR_ALIGN_CEIL(vaddr, bucket_page_sz) - vaddr;
> +
> + for (iter = vaddr + align; iter < vaddr + len; iter += bucket_page_sz) {
> + /* librte_mempool uses the header part for its own bookkeeping,
> + * but the librte_mempool's object header is adjacent to the
> + * data; it is small enough and the header is guaranteed to be
> + * at least CACHE_LINE_SIZE (i.e. 64) bytes, so we do have
> + * plenty of space at the start of the header. So the layout
> + * looks like this:
> + * [bucket_header] ... unused ... [rte_mempool_objhdr] [data...]
> + */
This is not always true.
If a use creates a mempool with the NO_CACHE_ALIGN, the header will be
small, without padding.
On 12/14/2017 04:38 PM, Olivier MATZ wrote:
> On Fri, Nov 24, 2017 at 04:06:28PM +0000, Andrew Rybchenko wrote:
>> From: "Artem V. Andreev" <Artem.Andreev@oktetlabs.ru>
>>
>> The manager provides a way to allocate physically and virtually
>> contiguous set of objects.
>>
>> Note: due to the way objects are organized in the bucket manager,
>> the get_avail_count may return less objects than were enqueued.
>> That breaks the expectation of mempool and mempool_perf tests.
> To me, this can be problematic. The driver should respect the
> API, or it will trigger hard-to-debug issues in applications. Can't
> this be fixed in some way or another?
As I understand there is no requirements on how fast get_count
works. If so, it is doable and we'll fix it in RFCv2.
> [...]
>
>> --- a/config/common_base
>> +++ b/config/common_base
>> @@ -608,6 +608,8 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n
>> #
>> # Compile Mempool drivers
>> #
>> +CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
>> +CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=32
>> CONFIG_RTE_DRIVER_MEMPOOL_RING=y
>> CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
>>
> Why 32KB?
> Why not more, or less?
> Can it be a runtime parameter?
> I guess it won't work with too large objects.
We have no good understanding of how driver-specific parameters
should be passed on mempool creation. We've simply kept it for
future since it looks like separate task.
If you have ideas, please, share - we'll be thankful.
> [...]
>
>> +struct bucket_data {
>> + unsigned int header_size;
>> + unsigned int chunk_size;
>> + unsigned int bucket_size;
>> + uintptr_t bucket_page_mask;
>> + struct rte_ring *shared_bucket_ring;
>> + struct bucket_stack *buckets[RTE_MAX_LCORE];
>> + /*
>> + * Multi-producer single-consumer ring to hold objects that are
>> + * returned to the mempool at a different lcore than initially
>> + * dequeued
>> + */
>> + struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
>> + struct rte_ring *shared_orphan_ring;
>> + struct rte_mempool *pool;
>> +
>> +};
> I'm seeing per-core structures. Will it work on non-dataplane cores?
> For instance, if a control thread wants to allocate a mbuf?
May be I don't understand something. Does the control thread has
valid rte_lcore_id()?
> If possible, these fields should be more documented (or just renamed).
> For instance, I suggest chunk_size could be called obj_per_bucket, which
> better described the content of the field.
Thanks, we'll do.
> [...]
>
>> +static int
>> +bucket_enqueue_single(struct bucket_data *data, void *obj)
>> +{
>> + int rc = 0;
>> + uintptr_t addr = (uintptr_t)obj;
>> + struct bucket_header *hdr;
>> + unsigned int lcore_id = rte_lcore_id();
>> +
>> + addr &= data->bucket_page_mask;
>> + hdr = (struct bucket_header *)addr;
>> +
>> + if (likely(hdr->lcore_id == lcore_id)) {
>> + if (hdr->fill_cnt < data->bucket_size - 1) {
>> + hdr->fill_cnt++;
>> + } else {
>> + hdr->fill_cnt = 0;
>> + /* Stack is big enough to put all buckets */
>> + bucket_stack_push(data->buckets[lcore_id], hdr);
>> + }
>> + } else if (hdr->lcore_id != LCORE_ID_ANY) {
>> + struct rte_ring *adopt_ring =
>> + data->adoption_buffer_rings[hdr->lcore_id];
>> +
>> + rc = rte_ring_enqueue(adopt_ring, obj);
>> + /* Ring is big enough to put all objects */
>> + RTE_ASSERT(rc == 0);
>> + } else if (hdr->fill_cnt < data->bucket_size - 1) {
>> + hdr->fill_cnt++;
>> + } else {
>> + hdr->fill_cnt = 0;
>> + rc = rte_ring_enqueue(data->shared_bucket_ring, hdr);
>> + /* Ring is big enough to put all buckets */
>> + RTE_ASSERT(rc == 0);
>> + }
>> +
>> + return rc;
>> +}
> [...]
>
>> +static int
>> +bucket_dequeue_buckets(struct bucket_data *data, void **obj_table,
>> + unsigned int n_buckets)
>> +{
>> + struct bucket_stack *cur_stack = data->buckets[rte_lcore_id()];
>> + unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
>> + void **obj_table_base = obj_table;
>> +
>> + n_buckets -= n_buckets_from_stack;
>> + while (n_buckets_from_stack-- > 0) {
>> + void *obj = bucket_stack_pop_unsafe(cur_stack);
>> +
>> + obj_table = bucket_fill_obj_table(data, &obj, obj_table,
>> + data->bucket_size);
>> + }
>> + while (n_buckets-- > 0) {
>> + struct bucket_header *hdr;
>> +
>> + if (unlikely(rte_ring_dequeue(data->shared_bucket_ring,
>> + (void **)&hdr) != 0)) {
>> + /* Return the already-dequeued buffers
>> + * back to the mempool
>> + */
>> + bucket_enqueue(data->pool, obj_table_base,
>> + obj_table - obj_table_base);
>> + rte_errno = ENOBUFS;
>> + return -rte_errno;
>> + }
>> + hdr->lcore_id = rte_lcore_id();
>> + obj_table = bucket_fill_obj_table(data, (void **)&hdr,
>> + obj_table, data->bucket_size);
>> + }
>> +
>> + return 0;
>> +}
> [...]
>
>> +static int
>> +bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
>> +{
>> + struct bucket_data *data = mp->pool_data;
>> + unsigned int n_buckets = n / data->bucket_size;
>> + unsigned int n_orphans = n - n_buckets * data->bucket_size;
>> + int rc = 0;
>> +
>> + bucket_adopt_orphans(data);
>> +
>> + if (unlikely(n_orphans > 0)) {
>> + rc = bucket_dequeue_orphans(data, obj_table +
>> + (n_buckets * data->bucket_size),
>> + n_orphans);
>> + if (rc != 0)
>> + return rc;
>> + }
>> +
>> + if (likely(n_buckets > 0)) {
>> + rc = bucket_dequeue_buckets(data, obj_table, n_buckets);
>> + if (unlikely(rc != 0) && n_orphans > 0) {
>> + rte_ring_enqueue_bulk(data->shared_orphan_ring,
>> + obj_table + (n_buckets *
>> + data->bucket_size),
>> + n_orphans, NULL);
>> + }
>> + }
>> +
>> + return rc;
>> +}
> If my understanding is correct, at initialization, all full buckets will
> go to the data->shared_bucket_ring ring, with lcore_id == ANY (this is
> done in register_mem).
>
> (note: I feel 'data' is not an ideal name for bucket_data)
Yes, agree. We'll rename it. It is really too generic.
> If the core 0 allocates all the mbufs, and then frees them all, they
> will be stored in the per-core stack, with hdr->lcoreid == 0. Is it
> right?
Right.
> If yes, can core 1 allocate a mbuf after that?
We'll add threshold for per-core stack. If it is exceeded, buckets will be
flushed into shared ring.
>> +static unsigned int
>> +bucket_get_count(const struct rte_mempool *mp)
>> +{
>> + const struct bucket_data *data = mp->pool_data;
>> + const struct bucket_stack *local_bucket_stack =
>> + data->buckets[rte_lcore_id()];
>> +
>> + return data->bucket_size * local_bucket_stack->top +
>> + data->bucket_size * rte_ring_count(data->shared_bucket_ring) +
>> + rte_ring_count(data->shared_orphan_ring);
>> +}
> It looks that get_count only rely on the current core stack usage
> and ignore the other core stacks.
We'll fix it to provide more accurate return value which is required
to pass self-test and make it usable for debugging.
> [...]
>
>> +static int
>> +bucket_register_memory_area(__rte_unused const struct rte_mempool *mp,
>> + char *vaddr, __rte_unused phys_addr_t paddr,
>> + size_t len)
>> +{
>> + /* mp->pool_data may be still uninitialized at this point */
>> + unsigned int chunk_size = mp->header_size + mp->elt_size +
>> + mp->trailer_size;
>> + unsigned int bucket_mem_size =
>> + (BUCKET_MEM_SIZE / chunk_size) * chunk_size;
>> + unsigned int bucket_page_sz = rte_align32pow2(bucket_mem_size);
>> + uintptr_t align;
>> + char *iter;
>> +
>> + align = RTE_PTR_ALIGN_CEIL(vaddr, bucket_page_sz) - vaddr;
>> +
>> + for (iter = vaddr + align; iter < vaddr + len; iter += bucket_page_sz) {
>> + /* librte_mempool uses the header part for its own bookkeeping,
>> + * but the librte_mempool's object header is adjacent to the
>> + * data; it is small enough and the header is guaranteed to be
>> + * at least CACHE_LINE_SIZE (i.e. 64) bytes, so we do have
>> + * plenty of space at the start of the header. So the layout
>> + * looks like this:
>> + * [bucket_header] ... unused ... [rte_mempool_objhdr] [data...]
>> + */
> This is not always true.
> If a use creates a mempool with the NO_CACHE_ALIGN, the header will be
> small, without padding.
Thanks. I think it can be handled when bucket mempool implements own
callback to populate objects.
@@ -293,6 +293,15 @@ F: test/test/test_event_eth_rx_adapter.c
F: doc/guides/prog_guide/event_ethernet_rx_adapter.rst
+Memory Pool Drivers
+-------------------
+
+Bucket memory pool
+M: Artem V. Andreev <Artem.Andreev@oktetlabs.ru>
+M: Andrew Rybchenko <arybchenko@solarflare.com>
+F: drivers/mempool/bucket/
+
+
Bus Drivers
-----------
@@ -608,6 +608,8 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n
#
# Compile Mempool drivers
#
+CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
+CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=32
CONFIG_RTE_DRIVER_MEMPOOL_RING=y
CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
@@ -30,6 +30,7 @@
include $(RTE_SDK)/mk/rte.vars.mk
+DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += bucket
DIRS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL) += dpaa
DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2
DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
new file mode 100644
@@ -0,0 +1,49 @@
+#
+# BSD LICENSE
+#
+# Copyright (c) 2017 Solarflare Communications Inc.
+# All rights reserved.
+#
+# This software was jointly developed between OKTET Labs (under contract
+# for Solarflare) and Solarflare Communications, Inc.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+#
+# library name
+#
+LIB = librte_mempool_bucket.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+LDLIBS += -lrte_eal -lrte_mempool -lrte_ring
+
+EXPORT_MAP := rte_mempool_bucket_version.map
+
+LIBABIVER := 1
+
+SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += rte_mempool_bucket.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
new file mode 100644
@@ -0,0 +1,521 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) 2017 Solarflare Communications Inc.
+ * All rights reserved.
+ *
+ * This software was jointly developed between OKTET Labs (under contract
+ * for Solarflare) and Solarflare Communications, Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <rte_errno.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+
+/*
+ * The general idea of the bucket mempool driver is as follows.
+ * We keep track of physically contiguous groups (buckets) of objects
+ * of a certain size. Every such a group has a counter that is
+ * incremented every time an object from that group is enqueued.
+ * Until the bucket is full, no objects from it are eligible for allocation.
+ * If a request is made to dequeue a multiply of bucket size, it is
+ * satisfied by returning the whole buckets, instead of separate objects.
+ */
+
+#define BUCKET_MEM_SIZE (RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024)
+
+struct bucket_header {
+ unsigned int lcore_id;
+ uint8_t fill_cnt;
+};
+
+struct bucket_stack {
+ unsigned int top;
+ unsigned int limit;
+ void *objects[];
+};
+
+struct bucket_data {
+ unsigned int header_size;
+ unsigned int chunk_size;
+ unsigned int bucket_size;
+ uintptr_t bucket_page_mask;
+ struct rte_ring *shared_bucket_ring;
+ struct bucket_stack *buckets[RTE_MAX_LCORE];
+ /*
+ * Multi-producer single-consumer ring to hold objects that are
+ * returned to the mempool at a different lcore than initially
+ * dequeued
+ */
+ struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
+ struct rte_ring *shared_orphan_ring;
+ struct rte_mempool *pool;
+
+};
+
+static struct bucket_stack *
+bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
+{
+ struct bucket_stack *stack;
+
+ stack = rte_zmalloc_socket("bucket_stack",
+ sizeof(struct bucket_stack) +
+ n_elts * sizeof(void *),
+ RTE_CACHE_LINE_SIZE,
+ mp->socket_id);
+ if (stack == NULL)
+ return NULL;
+ stack->limit = n_elts;
+ stack->top = 0;
+
+ return stack;
+}
+
+static void
+bucket_stack_push(struct bucket_stack *stack, void *obj)
+{
+ RTE_ASSERT(stack->top < stack->limit);
+ stack->objects[stack->top++] = obj;
+}
+
+static void *
+bucket_stack_pop_unsafe(struct bucket_stack *stack)
+{
+ RTE_ASSERT(stack->top > 0);
+ return stack->objects[--stack->top];
+}
+
+static void *
+bucket_stack_pop(struct bucket_stack *stack)
+{
+ if (stack->top == 0)
+ return NULL;
+ return bucket_stack_pop_unsafe(stack);
+}
+
+static int
+bucket_enqueue_single(struct bucket_data *data, void *obj)
+{
+ int rc = 0;
+ uintptr_t addr = (uintptr_t)obj;
+ struct bucket_header *hdr;
+ unsigned int lcore_id = rte_lcore_id();
+
+ addr &= data->bucket_page_mask;
+ hdr = (struct bucket_header *)addr;
+
+ if (likely(hdr->lcore_id == lcore_id)) {
+ if (hdr->fill_cnt < data->bucket_size - 1) {
+ hdr->fill_cnt++;
+ } else {
+ hdr->fill_cnt = 0;
+ /* Stack is big enough to put all buckets */
+ bucket_stack_push(data->buckets[lcore_id], hdr);
+ }
+ } else if (hdr->lcore_id != LCORE_ID_ANY) {
+ struct rte_ring *adopt_ring =
+ data->adoption_buffer_rings[hdr->lcore_id];
+
+ rc = rte_ring_enqueue(adopt_ring, obj);
+ /* Ring is big enough to put all objects */
+ RTE_ASSERT(rc == 0);
+ } else if (hdr->fill_cnt < data->bucket_size - 1) {
+ hdr->fill_cnt++;
+ } else {
+ hdr->fill_cnt = 0;
+ rc = rte_ring_enqueue(data->shared_bucket_ring, hdr);
+ /* Ring is big enough to put all buckets */
+ RTE_ASSERT(rc == 0);
+ }
+
+ return rc;
+}
+
+static int
+bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
+ unsigned int n)
+{
+ struct bucket_data *data = mp->pool_data;
+ unsigned int i;
+ int rc = 0;
+
+ for (i = 0; i < n; i++) {
+ rc = bucket_enqueue_single(data, obj_table[i]);
+ RTE_ASSERT(rc == 0);
+ }
+ return rc;
+}
+
+static void **
+bucket_fill_obj_table(const struct bucket_data *data, void **pstart,
+ void **obj_table, unsigned int n)
+{
+ unsigned int i;
+ uint8_t *objptr = *pstart;
+
+ for (objptr += data->header_size, i = 0; i < n; i++,
+ objptr += data->chunk_size)
+ *obj_table++ = objptr;
+ *pstart = objptr;
+ return obj_table;
+}
+
+static int
+bucket_dequeue_orphans(struct bucket_data *data, void **obj_table,
+ unsigned int n_orphans)
+{
+ unsigned int i;
+ int rc;
+ uint8_t *objptr;
+
+ rc = rte_ring_dequeue_bulk(data->shared_orphan_ring, obj_table,
+ n_orphans, NULL);
+ if (unlikely(rc != (int)n_orphans)) {
+ struct bucket_header *hdr;
+
+ objptr = bucket_stack_pop(data->buckets[rte_lcore_id()]);
+ hdr = (struct bucket_header *)objptr;
+
+ if (objptr == NULL) {
+ rc = rte_ring_dequeue(data->shared_bucket_ring,
+ (void **)&objptr);
+ if (rc != 0) {
+ rte_errno = ENOBUFS;
+ return -rte_errno;
+ }
+ hdr = (struct bucket_header *)objptr;
+ hdr->lcore_id = rte_lcore_id();
+ }
+ hdr->fill_cnt = 0;
+ bucket_fill_obj_table(data, (void **)&objptr, obj_table,
+ n_orphans);
+ for (i = n_orphans; i < data->bucket_size; i++,
+ objptr += data->chunk_size) {
+ rc = rte_ring_enqueue(data->shared_orphan_ring,
+ objptr);
+ if (rc != 0) {
+ RTE_ASSERT(0);
+ rte_errno = -rc;
+ return rc;
+ }
+ }
+ }
+
+ return 0;
+}
+
+static int
+bucket_dequeue_buckets(struct bucket_data *data, void **obj_table,
+ unsigned int n_buckets)
+{
+ struct bucket_stack *cur_stack = data->buckets[rte_lcore_id()];
+ unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
+ void **obj_table_base = obj_table;
+
+ n_buckets -= n_buckets_from_stack;
+ while (n_buckets_from_stack-- > 0) {
+ void *obj = bucket_stack_pop_unsafe(cur_stack);
+
+ obj_table = bucket_fill_obj_table(data, &obj, obj_table,
+ data->bucket_size);
+ }
+ while (n_buckets-- > 0) {
+ struct bucket_header *hdr;
+
+ if (unlikely(rte_ring_dequeue(data->shared_bucket_ring,
+ (void **)&hdr) != 0)) {
+ /* Return the already-dequeued buffers
+ * back to the mempool
+ */
+ bucket_enqueue(data->pool, obj_table_base,
+ obj_table - obj_table_base);
+ rte_errno = ENOBUFS;
+ return -rte_errno;
+ }
+ hdr->lcore_id = rte_lcore_id();
+ obj_table = bucket_fill_obj_table(data, (void **)&hdr,
+ obj_table, data->bucket_size);
+ }
+
+ return 0;
+}
+
+static int
+bucket_adopt_orphans(struct bucket_data *data)
+{
+ int rc = 0;
+ struct rte_ring *adopt_ring =
+ data->adoption_buffer_rings[rte_lcore_id()];
+
+ if (unlikely(!rte_ring_empty(adopt_ring))) {
+ void *orphan;
+
+ while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
+ rc = bucket_enqueue_single(data, orphan);
+ RTE_ASSERT(rc == 0);
+ }
+ }
+ return rc;
+}
+
+static int
+bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
+{
+ struct bucket_data *data = mp->pool_data;
+ unsigned int n_buckets = n / data->bucket_size;
+ unsigned int n_orphans = n - n_buckets * data->bucket_size;
+ int rc = 0;
+
+ bucket_adopt_orphans(data);
+
+ if (unlikely(n_orphans > 0)) {
+ rc = bucket_dequeue_orphans(data, obj_table +
+ (n_buckets * data->bucket_size),
+ n_orphans);
+ if (rc != 0)
+ return rc;
+ }
+
+ if (likely(n_buckets > 0)) {
+ rc = bucket_dequeue_buckets(data, obj_table, n_buckets);
+ if (unlikely(rc != 0) && n_orphans > 0) {
+ rte_ring_enqueue_bulk(data->shared_orphan_ring,
+ obj_table + (n_buckets *
+ data->bucket_size),
+ n_orphans, NULL);
+ }
+ }
+
+ return rc;
+}
+
+static unsigned int
+bucket_get_count(const struct rte_mempool *mp)
+{
+ const struct bucket_data *data = mp->pool_data;
+ const struct bucket_stack *local_bucket_stack =
+ data->buckets[rte_lcore_id()];
+
+ return data->bucket_size * local_bucket_stack->top +
+ data->bucket_size * rte_ring_count(data->shared_bucket_ring) +
+ rte_ring_count(data->shared_orphan_ring);
+}
+
+static int
+bucket_alloc(struct rte_mempool *mp)
+{
+ int rg_flags = 0;
+ int rc = 0;
+ char rg_name[RTE_RING_NAMESIZE];
+ struct bucket_data *data;
+ unsigned int i;
+
+ data = rte_zmalloc_socket("bucket_pool", sizeof(*data),
+ RTE_CACHE_LINE_SIZE, mp->socket_id);
+ if (data == NULL) {
+ rc = -ENOMEM;
+ goto no_mem_for_data;
+ }
+ data->pool = mp;
+ data->header_size = mp->header_size;
+ RTE_VERIFY(sizeof(struct bucket_header) +
+ sizeof(struct rte_mempool_objhdr) <= mp->header_size);
+ data->chunk_size = mp->header_size + mp->elt_size + mp->trailer_size;
+ data->bucket_size = BUCKET_MEM_SIZE / data->chunk_size;
+ data->bucket_page_mask = ~(rte_align64pow2(BUCKET_MEM_SIZE) - 1);
+
+ if (mp->flags & MEMPOOL_F_SP_PUT)
+ rg_flags |= RING_F_SP_ENQ;
+ if (mp->flags & MEMPOOL_F_SC_GET)
+ rg_flags |= RING_F_SC_DEQ;
+
+ for (i = 0; i < RTE_MAX_LCORE; i++) {
+ if (!rte_lcore_is_enabled(i))
+ continue;
+ data->buckets[i] =
+ bucket_stack_create(mp, mp->size / data->bucket_size);
+ if (data->buckets[i] == NULL) {
+ rc = -ENOMEM;
+ goto no_mem_for_stacks;
+ }
+ rc = snprintf(rg_name, sizeof(rg_name),
+ RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
+ if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+ rc = -ENAMETOOLONG;
+ goto no_mem_for_stacks;
+ }
+ data->adoption_buffer_rings[i] =
+ rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
+ mp->socket_id,
+ rg_flags | RING_F_SC_DEQ);
+ if (data->adoption_buffer_rings[i] == NULL) {
+ rc = -rte_errno;
+ goto no_mem_for_stacks;
+ }
+ }
+
+ rc = snprintf(rg_name, sizeof(rg_name),
+ RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
+ if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+ rc = -ENAMETOOLONG;
+ goto invalid_shared_orphan_ring;
+ }
+ data->shared_orphan_ring =
+ rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
+ mp->socket_id, rg_flags);
+ if (data->shared_orphan_ring == NULL) {
+ rc = -rte_errno;
+ goto cannot_create_shared_orphan_ring;
+ }
+
+ rc = snprintf(rg_name, sizeof(rg_name),
+ RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
+ if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+ rc = -ENAMETOOLONG;
+ goto invalid_shared_bucket_ring;
+ }
+ data->shared_bucket_ring =
+ rte_ring_create(rg_name,
+ rte_align32pow2((mp->size /
+ data->bucket_size) + 1),
+ mp->socket_id, rg_flags);
+ if (data->shared_bucket_ring == NULL) {
+ rc = -rte_errno;
+ goto cannot_create_shared_bucket_ring;
+ }
+
+ mp->pool_data = data;
+
+ return 0;
+
+cannot_create_shared_bucket_ring:
+invalid_shared_bucket_ring:
+ rte_ring_free(data->shared_orphan_ring);
+cannot_create_shared_orphan_ring:
+invalid_shared_orphan_ring:
+no_mem_for_stacks:
+ for (i = 0; i < RTE_MAX_LCORE; i++) {
+ rte_free(data->buckets[i]);
+ rte_ring_free(data->adoption_buffer_rings[i]);
+ }
+ rte_free(data);
+no_mem_for_data:
+ rte_errno = -rc;
+ return rc;
+}
+
+static void
+bucket_free(struct rte_mempool *mp)
+{
+ unsigned int i;
+ struct bucket_data *data = mp->pool_data;
+
+ if (data == NULL)
+ return;
+
+ for (i = 0; i < RTE_MAX_LCORE; i++) {
+ rte_free(data->buckets[i]);
+ rte_ring_free(data->adoption_buffer_rings[i]);
+ }
+
+ rte_ring_free(data->shared_orphan_ring);
+ rte_ring_free(data->shared_bucket_ring);
+
+ rte_free(data);
+}
+
+static int
+bucket_get_capabilities(__rte_unused const struct rte_mempool *mp,
+ unsigned int *flags)
+{
+ *flags |= MEMPOOL_F_CAPA_PHYS_CONTIG |
+ MEMPOOL_F_CAPA_ALLOCATE_IN_CLUSTERS;
+ return 0;
+}
+
+static int
+bucket_get_info(__rte_unused const struct rte_mempool *mp,
+ struct rte_mempool_info *info)
+{
+ /* mp->pool_data may be still uninitialized at this point */
+ unsigned int chunk_size = mp->header_size + mp->elt_size +
+ mp->trailer_size;
+
+ info->cluster_size = BUCKET_MEM_SIZE / chunk_size;
+ return 0;
+}
+
+static int
+bucket_register_memory_area(__rte_unused const struct rte_mempool *mp,
+ char *vaddr, __rte_unused phys_addr_t paddr,
+ size_t len)
+{
+ /* mp->pool_data may be still uninitialized at this point */
+ unsigned int chunk_size = mp->header_size + mp->elt_size +
+ mp->trailer_size;
+ unsigned int bucket_mem_size =
+ (BUCKET_MEM_SIZE / chunk_size) * chunk_size;
+ unsigned int bucket_page_sz = rte_align32pow2(bucket_mem_size);
+ uintptr_t align;
+ char *iter;
+
+ align = RTE_PTR_ALIGN_CEIL(vaddr, bucket_page_sz) - vaddr;
+
+ for (iter = vaddr + align; iter < vaddr + len; iter += bucket_page_sz) {
+ /* librte_mempool uses the header part for its own bookkeeping,
+ * but the librte_mempool's object header is adjacent to the
+ * data; it is small enough and the header is guaranteed to be
+ * at least CACHE_LINE_SIZE (i.e. 64) bytes, so we do have
+ * plenty of space at the start of the header. So the layout
+ * looks like this:
+ * [bucket_header] ... unused ... [rte_mempool_objhdr] [data...]
+ */
+ struct bucket_header *hdr = (struct bucket_header *)iter;
+
+ hdr->fill_cnt = 0;
+ hdr->lcore_id = LCORE_ID_ANY;
+ }
+
+ return 0;
+}
+
+static const struct rte_mempool_ops ops_bucket = {
+ .name = "bucket",
+ .alloc = bucket_alloc,
+ .free = bucket_free,
+ .enqueue = bucket_enqueue,
+ .dequeue = bucket_dequeue,
+ .get_count = bucket_get_count,
+ .get_capabilities = bucket_get_capabilities,
+ .register_memory_area = bucket_register_memory_area,
+ .get_info = bucket_get_info,
+};
+
+
+MEMPOOL_REGISTER_OPS(ops_bucket);
new file mode 100644
@@ -0,0 +1,4 @@
+DPDK_18.02 {
+
+ local: *;
+};
@@ -115,6 +115,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_VDEV_BUS) += -lrte_bus_vdev
ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
# plugins (link only if static libraries)
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -lrte_mempool_bucket
_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += -lrte_mempool_stack
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_AF_PACKET) += -lrte_pmd_af_packet