[dpdk-dev] [RFC PATCH 3/6] mempool/bucket: implement bucket mempool manager

Andrew Rybchenko arybchenko at solarflare.com
Fri Nov 24 17:06:28 CET 2017


From: "Artem V. Andreev" <Artem.Andreev at oktetlabs.ru>

The manager provides a way to allocate physically and virtually
contiguous set of objects.

Note: due to the way objects are organized in the bucket manager,
the get_avail_count may return less objects than were enqueued.
That breaks the expectation of mempool and mempool_perf tests.

Signed-off-by: Artem V. Andreev <Artem.Andreev at oktetlabs.ru>
Signed-off-by: Andrew Rybchenko <arybchenko at solarflare.com>
---
 MAINTAINERS                                        |   9 +
 config/common_base                                 |   2 +
 drivers/mempool/Makefile                           |   1 +
 drivers/mempool/bucket/Makefile                    |  49 ++
 drivers/mempool/bucket/rte_mempool_bucket.c        | 521 +++++++++++++++++++++
 .../mempool/bucket/rte_mempool_bucket_version.map  |   4 +
 mk/rte.app.mk                                      |   1 +
 7 files changed, 587 insertions(+)
 create mode 100644 drivers/mempool/bucket/Makefile
 create mode 100644 drivers/mempool/bucket/rte_mempool_bucket.c
 create mode 100644 drivers/mempool/bucket/rte_mempool_bucket_version.map

diff --git a/MAINTAINERS b/MAINTAINERS
index f0baeb4..144fd1d 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -293,6 +293,15 @@ F: test/test/test_event_eth_rx_adapter.c
 F: doc/guides/prog_guide/event_ethernet_rx_adapter.rst
 
 
+Memory Pool Drivers
+-------------------
+
+Bucket memory pool
+M: Artem V. Andreev <Artem.Andreev at oktetlabs.ru>
+M: Andrew Rybchenko <arybchenko at solarflare.com>
+F: drivers/mempool/bucket/
+
+
 Bus Drivers
 -----------
 
diff --git a/config/common_base b/config/common_base
index e74febe..8793699 100644
--- a/config/common_base
+++ b/config/common_base
@@ -608,6 +608,8 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n
 #
 # Compile Mempool drivers
 #
+CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
+CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=32
 CONFIG_RTE_DRIVER_MEMPOOL_RING=y
 CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
 
diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile
index f656c56..9de0783 100644
--- a/drivers/mempool/Makefile
+++ b/drivers/mempool/Makefile
@@ -30,6 +30,7 @@
 
 include $(RTE_SDK)/mk/rte.vars.mk
 
+DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += bucket
 DIRS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL) += dpaa
 DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2
 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
diff --git a/drivers/mempool/bucket/Makefile b/drivers/mempool/bucket/Makefile
new file mode 100644
index 0000000..06ddd31
--- /dev/null
+++ b/drivers/mempool/bucket/Makefile
@@ -0,0 +1,49 @@
+#
+#   BSD LICENSE
+#
+# Copyright (c) 2017 Solarflare Communications Inc.
+# All rights reserved.
+#
+# This software was jointly developed between OKTET Labs (under contract
+# for Solarflare) and Solarflare Communications, Inc.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+#    this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+#    this list of conditions and the following disclaimer in the documentation
+#    and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+#
+# library name
+#
+LIB = librte_mempool_bucket.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+LDLIBS += -lrte_eal -lrte_mempool -lrte_ring
+
+EXPORT_MAP := rte_mempool_bucket_version.map
+
+LIBABIVER := 1
+
+SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += rte_mempool_bucket.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c
new file mode 100644
index 0000000..4063d2c
--- /dev/null
+++ b/drivers/mempool/bucket/rte_mempool_bucket.c
@@ -0,0 +1,521 @@
+/*-
+ *   BSD LICENSE
+ *
+ * Copyright (c) 2017 Solarflare Communications Inc.
+ * All rights reserved.
+ *
+ * This software was jointly developed between OKTET Labs (under contract
+ * for Solarflare) and Solarflare Communications, Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <rte_errno.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+
+/*
+ * The general idea of the bucket mempool driver is as follows.
+ * We keep track of physically contiguous groups (buckets) of objects
+ * of a certain size. Every such a group has a counter that is
+ * incremented every time an object from that group is enqueued.
+ * Until the bucket is full, no objects from it are eligible for allocation.
+ * If a request is made to dequeue a multiply of bucket size, it is
+ * satisfied by returning the whole buckets, instead of separate objects.
+ */
+
+#define BUCKET_MEM_SIZE		(RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024)
+
+struct bucket_header {
+	unsigned int lcore_id;
+	uint8_t fill_cnt;
+};
+
+struct bucket_stack {
+	unsigned int top;
+	unsigned int limit;
+	void *objects[];
+};
+
+struct bucket_data {
+	unsigned int header_size;
+	unsigned int chunk_size;
+	unsigned int bucket_size;
+	uintptr_t bucket_page_mask;
+	struct rte_ring *shared_bucket_ring;
+	struct bucket_stack *buckets[RTE_MAX_LCORE];
+	/*
+	 * Multi-producer single-consumer ring to hold objects that are
+	 * returned to the mempool at a different lcore than initially
+	 * dequeued
+	 */
+	struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
+	struct rte_ring *shared_orphan_ring;
+	struct rte_mempool *pool;
+
+};
+
+static struct bucket_stack *
+bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
+{
+	struct bucket_stack *stack;
+
+	stack = rte_zmalloc_socket("bucket_stack",
+				   sizeof(struct bucket_stack) +
+				   n_elts * sizeof(void *),
+				   RTE_CACHE_LINE_SIZE,
+				   mp->socket_id);
+	if (stack == NULL)
+		return NULL;
+	stack->limit = n_elts;
+	stack->top = 0;
+
+	return stack;
+}
+
+static void
+bucket_stack_push(struct bucket_stack *stack, void *obj)
+{
+	RTE_ASSERT(stack->top < stack->limit);
+	stack->objects[stack->top++] = obj;
+}
+
+static void *
+bucket_stack_pop_unsafe(struct bucket_stack *stack)
+{
+	RTE_ASSERT(stack->top > 0);
+	return stack->objects[--stack->top];
+}
+
+static void *
+bucket_stack_pop(struct bucket_stack *stack)
+{
+	if (stack->top == 0)
+		return NULL;
+	return bucket_stack_pop_unsafe(stack);
+}
+
+static int
+bucket_enqueue_single(struct bucket_data *data, void *obj)
+{
+	int rc = 0;
+	uintptr_t addr = (uintptr_t)obj;
+	struct bucket_header *hdr;
+	unsigned int lcore_id = rte_lcore_id();
+
+	addr &= data->bucket_page_mask;
+	hdr = (struct bucket_header *)addr;
+
+	if (likely(hdr->lcore_id == lcore_id)) {
+		if (hdr->fill_cnt < data->bucket_size - 1) {
+			hdr->fill_cnt++;
+		} else {
+			hdr->fill_cnt = 0;
+			/* Stack is big enough to put all buckets */
+			bucket_stack_push(data->buckets[lcore_id], hdr);
+		}
+	} else if (hdr->lcore_id != LCORE_ID_ANY) {
+		struct rte_ring *adopt_ring =
+			data->adoption_buffer_rings[hdr->lcore_id];
+
+		rc = rte_ring_enqueue(adopt_ring, obj);
+		/* Ring is big enough to put all objects */
+		RTE_ASSERT(rc == 0);
+	} else if (hdr->fill_cnt < data->bucket_size - 1) {
+		hdr->fill_cnt++;
+	} else {
+		hdr->fill_cnt = 0;
+		rc = rte_ring_enqueue(data->shared_bucket_ring, hdr);
+		/* Ring is big enough to put all buckets */
+		RTE_ASSERT(rc == 0);
+	}
+
+	return rc;
+}
+
+static int
+bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
+	       unsigned int n)
+{
+	struct bucket_data *data = mp->pool_data;
+	unsigned int i;
+	int rc = 0;
+
+	for (i = 0; i < n; i++) {
+		rc = bucket_enqueue_single(data, obj_table[i]);
+		RTE_ASSERT(rc == 0);
+	}
+	return rc;
+}
+
+static void **
+bucket_fill_obj_table(const struct bucket_data *data, void **pstart,
+		      void **obj_table, unsigned int n)
+{
+	unsigned int i;
+	uint8_t *objptr = *pstart;
+
+	for (objptr += data->header_size, i = 0; i < n; i++,
+		     objptr += data->chunk_size)
+		*obj_table++ = objptr;
+	*pstart = objptr;
+	return obj_table;
+}
+
+static int
+bucket_dequeue_orphans(struct bucket_data *data, void **obj_table,
+		       unsigned int n_orphans)
+{
+	unsigned int i;
+	int rc;
+	uint8_t *objptr;
+
+	rc = rte_ring_dequeue_bulk(data->shared_orphan_ring, obj_table,
+				   n_orphans, NULL);
+	if (unlikely(rc != (int)n_orphans)) {
+		struct bucket_header *hdr;
+
+		objptr = bucket_stack_pop(data->buckets[rte_lcore_id()]);
+		hdr = (struct bucket_header *)objptr;
+
+		if (objptr == NULL) {
+			rc = rte_ring_dequeue(data->shared_bucket_ring,
+					      (void **)&objptr);
+			if (rc != 0) {
+				rte_errno = ENOBUFS;
+				return -rte_errno;
+			}
+			hdr = (struct bucket_header *)objptr;
+			hdr->lcore_id = rte_lcore_id();
+		}
+		hdr->fill_cnt = 0;
+		bucket_fill_obj_table(data, (void **)&objptr, obj_table,
+				      n_orphans);
+		for (i = n_orphans; i < data->bucket_size; i++,
+			     objptr += data->chunk_size) {
+			rc = rte_ring_enqueue(data->shared_orphan_ring,
+					      objptr);
+			if (rc != 0) {
+				RTE_ASSERT(0);
+				rte_errno = -rc;
+				return rc;
+			}
+		}
+	}
+
+	return 0;
+}
+
+static int
+bucket_dequeue_buckets(struct bucket_data *data, void **obj_table,
+		       unsigned int n_buckets)
+{
+	struct bucket_stack *cur_stack = data->buckets[rte_lcore_id()];
+	unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
+	void **obj_table_base = obj_table;
+
+	n_buckets -= n_buckets_from_stack;
+	while (n_buckets_from_stack-- > 0) {
+		void *obj = bucket_stack_pop_unsafe(cur_stack);
+
+		obj_table = bucket_fill_obj_table(data, &obj, obj_table,
+						  data->bucket_size);
+	}
+	while (n_buckets-- > 0) {
+		struct bucket_header *hdr;
+
+		if (unlikely(rte_ring_dequeue(data->shared_bucket_ring,
+					      (void **)&hdr) != 0)) {
+			/* Return the already-dequeued buffers
+			 * back to the mempool
+			 */
+			bucket_enqueue(data->pool, obj_table_base,
+				       obj_table - obj_table_base);
+			rte_errno = ENOBUFS;
+			return -rte_errno;
+		}
+		hdr->lcore_id = rte_lcore_id();
+		obj_table = bucket_fill_obj_table(data, (void **)&hdr,
+						  obj_table, data->bucket_size);
+	}
+
+	return 0;
+}
+
+static int
+bucket_adopt_orphans(struct bucket_data *data)
+{
+	int rc = 0;
+	struct rte_ring *adopt_ring =
+		data->adoption_buffer_rings[rte_lcore_id()];
+
+	if (unlikely(!rte_ring_empty(adopt_ring))) {
+		void *orphan;
+
+		while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
+			rc = bucket_enqueue_single(data, orphan);
+			RTE_ASSERT(rc == 0);
+		}
+	}
+	return rc;
+}
+
+static int
+bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
+{
+	struct bucket_data *data = mp->pool_data;
+	unsigned int n_buckets = n / data->bucket_size;
+	unsigned int n_orphans = n - n_buckets * data->bucket_size;
+	int rc = 0;
+
+	bucket_adopt_orphans(data);
+
+	if (unlikely(n_orphans > 0)) {
+		rc = bucket_dequeue_orphans(data, obj_table +
+					    (n_buckets * data->bucket_size),
+					    n_orphans);
+		if (rc != 0)
+			return rc;
+	}
+
+	if (likely(n_buckets > 0)) {
+		rc = bucket_dequeue_buckets(data, obj_table, n_buckets);
+		if (unlikely(rc != 0) && n_orphans > 0) {
+			rte_ring_enqueue_bulk(data->shared_orphan_ring,
+					      obj_table + (n_buckets *
+							   data->bucket_size),
+					      n_orphans, NULL);
+		}
+	}
+
+	return rc;
+}
+
+static unsigned int
+bucket_get_count(const struct rte_mempool *mp)
+{
+	const struct bucket_data *data = mp->pool_data;
+	const struct bucket_stack *local_bucket_stack =
+		data->buckets[rte_lcore_id()];
+
+	return data->bucket_size * local_bucket_stack->top +
+		data->bucket_size * rte_ring_count(data->shared_bucket_ring) +
+		rte_ring_count(data->shared_orphan_ring);
+}
+
+static int
+bucket_alloc(struct rte_mempool *mp)
+{
+	int rg_flags = 0;
+	int rc = 0;
+	char rg_name[RTE_RING_NAMESIZE];
+	struct bucket_data *data;
+	unsigned int i;
+
+	data = rte_zmalloc_socket("bucket_pool", sizeof(*data),
+				  RTE_CACHE_LINE_SIZE, mp->socket_id);
+	if (data == NULL) {
+		rc = -ENOMEM;
+		goto no_mem_for_data;
+	}
+	data->pool = mp;
+	data->header_size = mp->header_size;
+	RTE_VERIFY(sizeof(struct bucket_header) +
+		   sizeof(struct rte_mempool_objhdr) <= mp->header_size);
+	data->chunk_size = mp->header_size + mp->elt_size + mp->trailer_size;
+	data->bucket_size = BUCKET_MEM_SIZE / data->chunk_size;
+	data->bucket_page_mask = ~(rte_align64pow2(BUCKET_MEM_SIZE) - 1);
+
+	if (mp->flags & MEMPOOL_F_SP_PUT)
+		rg_flags |= RING_F_SP_ENQ;
+	if (mp->flags & MEMPOOL_F_SC_GET)
+		rg_flags |= RING_F_SC_DEQ;
+
+	for (i = 0; i < RTE_MAX_LCORE; i++) {
+		if (!rte_lcore_is_enabled(i))
+			continue;
+		data->buckets[i] =
+			bucket_stack_create(mp, mp->size / data->bucket_size);
+		if (data->buckets[i] == NULL) {
+			rc = -ENOMEM;
+			goto no_mem_for_stacks;
+		}
+		rc = snprintf(rg_name, sizeof(rg_name),
+			      RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
+		if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+			rc = -ENAMETOOLONG;
+			goto no_mem_for_stacks;
+		}
+		data->adoption_buffer_rings[i] =
+			rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
+					mp->socket_id,
+					rg_flags | RING_F_SC_DEQ);
+		if (data->adoption_buffer_rings[i] == NULL) {
+			rc = -rte_errno;
+			goto no_mem_for_stacks;
+		}
+	}
+
+	rc = snprintf(rg_name, sizeof(rg_name),
+		      RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
+	if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+		rc = -ENAMETOOLONG;
+		goto invalid_shared_orphan_ring;
+	}
+	data->shared_orphan_ring =
+		rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
+				mp->socket_id, rg_flags);
+	if (data->shared_orphan_ring == NULL) {
+		rc = -rte_errno;
+		goto cannot_create_shared_orphan_ring;
+	}
+
+	rc = snprintf(rg_name, sizeof(rg_name),
+		       RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
+	if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+		rc = -ENAMETOOLONG;
+		goto invalid_shared_bucket_ring;
+	}
+	data->shared_bucket_ring =
+		rte_ring_create(rg_name,
+				rte_align32pow2((mp->size /
+						 data->bucket_size) + 1),
+				mp->socket_id, rg_flags);
+	if (data->shared_bucket_ring == NULL) {
+		rc = -rte_errno;
+		goto cannot_create_shared_bucket_ring;
+	}
+
+	mp->pool_data = data;
+
+	return 0;
+
+cannot_create_shared_bucket_ring:
+invalid_shared_bucket_ring:
+	rte_ring_free(data->shared_orphan_ring);
+cannot_create_shared_orphan_ring:
+invalid_shared_orphan_ring:
+no_mem_for_stacks:
+	for (i = 0; i < RTE_MAX_LCORE; i++) {
+		rte_free(data->buckets[i]);
+		rte_ring_free(data->adoption_buffer_rings[i]);
+	}
+	rte_free(data);
+no_mem_for_data:
+	rte_errno = -rc;
+	return rc;
+}
+
+static void
+bucket_free(struct rte_mempool *mp)
+{
+	unsigned int i;
+	struct bucket_data *data = mp->pool_data;
+
+	if (data == NULL)
+		return;
+
+	for (i = 0; i < RTE_MAX_LCORE; i++) {
+		rte_free(data->buckets[i]);
+		rte_ring_free(data->adoption_buffer_rings[i]);
+	}
+
+	rte_ring_free(data->shared_orphan_ring);
+	rte_ring_free(data->shared_bucket_ring);
+
+	rte_free(data);
+}
+
+static int
+bucket_get_capabilities(__rte_unused const struct rte_mempool *mp,
+			unsigned int *flags)
+{
+	*flags |= MEMPOOL_F_CAPA_PHYS_CONTIG |
+		MEMPOOL_F_CAPA_ALLOCATE_IN_CLUSTERS;
+	return 0;
+}
+
+static int
+bucket_get_info(__rte_unused const struct rte_mempool *mp,
+		struct rte_mempool_info *info)
+{
+	/* mp->pool_data may be still uninitialized at this point */
+	unsigned int chunk_size = mp->header_size + mp->elt_size +
+		mp->trailer_size;
+
+	info->cluster_size = BUCKET_MEM_SIZE / chunk_size;
+	return 0;
+}
+
+static int
+bucket_register_memory_area(__rte_unused const struct rte_mempool *mp,
+			    char *vaddr, __rte_unused phys_addr_t paddr,
+			    size_t len)
+{
+	/* mp->pool_data may be still uninitialized at this point */
+	unsigned int chunk_size = mp->header_size + mp->elt_size +
+		mp->trailer_size;
+	unsigned int bucket_mem_size =
+		(BUCKET_MEM_SIZE / chunk_size) * chunk_size;
+	unsigned int bucket_page_sz = rte_align32pow2(bucket_mem_size);
+	uintptr_t align;
+	char *iter;
+
+	align = RTE_PTR_ALIGN_CEIL(vaddr, bucket_page_sz) - vaddr;
+
+	for (iter = vaddr + align; iter < vaddr + len; iter += bucket_page_sz) {
+		/* librte_mempool uses the header part for its own bookkeeping,
+		 * but the librte_mempool's object header is adjacent to the
+		 * data; it is small enough and the header is guaranteed to be
+		 * at least CACHE_LINE_SIZE (i.e. 64) bytes, so we do have
+		 * plenty of space at the start of the header. So the layout
+		 * looks like this:
+		 * [bucket_header] ... unused ... [rte_mempool_objhdr] [data...]
+		 */
+		struct bucket_header *hdr = (struct bucket_header *)iter;
+
+		hdr->fill_cnt = 0;
+		hdr->lcore_id = LCORE_ID_ANY;
+	}
+
+	return 0;
+}
+
+static const struct rte_mempool_ops ops_bucket = {
+	.name = "bucket",
+	.alloc = bucket_alloc,
+	.free = bucket_free,
+	.enqueue = bucket_enqueue,
+	.dequeue = bucket_dequeue,
+	.get_count = bucket_get_count,
+	.get_capabilities = bucket_get_capabilities,
+	.register_memory_area = bucket_register_memory_area,
+	.get_info = bucket_get_info,
+};
+
+
+MEMPOOL_REGISTER_OPS(ops_bucket);
diff --git a/drivers/mempool/bucket/rte_mempool_bucket_version.map b/drivers/mempool/bucket/rte_mempool_bucket_version.map
new file mode 100644
index 0000000..179140f
--- /dev/null
+++ b/drivers/mempool/bucket/rte_mempool_bucket_version.map
@@ -0,0 +1,4 @@
+DPDK_18.02 {
+
+	local: *;
+};
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 6a6a745..d99181f 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -115,6 +115,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_VDEV_BUS)       += -lrte_bus_vdev
 ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
 # plugins (link only if static libraries)
 
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -lrte_mempool_bucket
 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -lrte_mempool_stack
 
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_AF_PACKET)  += -lrte_pmd_af_packet
-- 
2.7.4



More information about the dev mailing list