[dpdk-dev,v1,2/9] mempool: add op to populate objects using provided memory

Message ID 1520696382-16400-3-git-send-email-arybchenko@solarflare.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Andrew Rybchenko March 10, 2018, 3:39 p.m. UTC
  The callback allows to customize how objects are stored in the
memory chunk. Default implementation of the callback which simply
puts objects one by one is available.

Signed-off-by: Andrew Rybchenko <arybchenko@solarflare.com>
---
RFCv2 -> v1:
 - advertise ABI changes in release notes
 - use consistent name for default callback:
   rte_mempool_op_<callback>_default()
 - add opaque data pointer to populated object callback
 - move default callback to dedicated file

 doc/guides/rel_notes/deprecation.rst         |  2 +-
 doc/guides/rel_notes/release_18_05.rst       |  2 +
 lib/librte_mempool/rte_mempool.c             | 23 +++----
 lib/librte_mempool/rte_mempool.h             | 90 ++++++++++++++++++++++++++++
 lib/librte_mempool/rte_mempool_ops.c         | 21 +++++++
 lib/librte_mempool/rte_mempool_ops_default.c | 24 ++++++++
 lib/librte_mempool/rte_mempool_version.map   |  1 +
 7 files changed, 148 insertions(+), 15 deletions(-)
  

Comments

Olivier Matz March 19, 2018, 5:04 p.m. UTC | #1
On Sat, Mar 10, 2018 at 03:39:35PM +0000, Andrew Rybchenko wrote:
> The callback allows to customize how objects are stored in the
> memory chunk. Default implementation of the callback which simply
> puts objects one by one is available.
> 
> Signed-off-by: Andrew Rybchenko <arybchenko@solarflare.com>

[...]

> --- a/lib/librte_mempool/rte_mempool.c
> +++ b/lib/librte_mempool/rte_mempool.c
> @@ -99,7 +99,8 @@ static unsigned optimize_object_size(unsigned obj_size)
>  }
>  
>  static void
> -mempool_add_elem(struct rte_mempool *mp, void *obj, rte_iova_t iova)
> +mempool_add_elem(struct rte_mempool *mp, __rte_unused void *opaque,
> +		 void *obj, rte_iova_t iova)
>  {
>  	struct rte_mempool_objhdr *hdr;
>  	struct rte_mempool_objtlr *tlr __rte_unused;
> @@ -116,9 +117,6 @@ mempool_add_elem(struct rte_mempool *mp, void *obj, rte_iova_t iova)
>  	tlr = __mempool_get_trailer(obj);
>  	tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
>  #endif
> -
> -	/* enqueue in ring */
> -	rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
>  }
>  
>  /* call obj_cb() for each mempool element */

Before this patch, the purpose of mempool_add_elem() was to add
an object into a mempool:
1- write object header and trailers
2- chain it into the list of objects
3- add it into the ring/stack/whatever (=enqueue)

Now, the enqueue is done in rte_mempool_op_populate_default() or will be
done in the driver. I'm not sure it's a good idea to separate 3- from
2-, because an object that is chained into the list is expected to be
in the ring/stack too.

This risk of mis-synchronization is also enforced by the fact that
ops->populate() can be provided by the driver and mempool_add_elem() is
passed as a callback pointer.

It's not clear to me why rte_mempool_ops_enqueue_bulk() is
removed from mempool_add_elem().



> @@ -396,16 +394,13 @@ rte_mempool_populate_iova(struct rte_mempool *mp, char *vaddr,
>  	else
>  		off = RTE_PTR_ALIGN_CEIL(vaddr, RTE_CACHE_LINE_SIZE) - vaddr;
>  
> -	while (off + total_elt_sz <= len && mp->populated_size < mp->size) {
> -		off += mp->header_size;
> -		if (iova == RTE_BAD_IOVA)
> -			mempool_add_elem(mp, (char *)vaddr + off,
> -				RTE_BAD_IOVA);
> -		else
> -			mempool_add_elem(mp, (char *)vaddr + off, iova + off);
> -		off += mp->elt_size + mp->trailer_size;
> -		i++;
> -	}
> +	if (off > len)
> +		return -EINVAL;

I think there is a memory leak here (memhdr), but it's my fault ;)
I introduced a similar code in commit 84121f1971:

      if (i == 0)
              return -EINVAL;

I can send a patch for it if you want.
  
Andrew Rybchenko March 21, 2018, 7:05 a.m. UTC | #2
On 03/19/2018 08:04 PM, Olivier Matz wrote:
> On Sat, Mar 10, 2018 at 03:39:35PM +0000, Andrew Rybchenko wrote:
>> The callback allows to customize how objects are stored in the
>> memory chunk. Default implementation of the callback which simply
>> puts objects one by one is available.
>>
>> Signed-off-by: Andrew Rybchenko <arybchenko@solarflare.com>
> [...]
>
>> --- a/lib/librte_mempool/rte_mempool.c
>> +++ b/lib/librte_mempool/rte_mempool.c
>> @@ -99,7 +99,8 @@ static unsigned optimize_object_size(unsigned obj_size)
>>   }
>>   
>>   static void
>> -mempool_add_elem(struct rte_mempool *mp, void *obj, rte_iova_t iova)
>> +mempool_add_elem(struct rte_mempool *mp, __rte_unused void *opaque,
>> +		 void *obj, rte_iova_t iova)
>>   {
>>   	struct rte_mempool_objhdr *hdr;
>>   	struct rte_mempool_objtlr *tlr __rte_unused;
>> @@ -116,9 +117,6 @@ mempool_add_elem(struct rte_mempool *mp, void *obj, rte_iova_t iova)
>>   	tlr = __mempool_get_trailer(obj);
>>   	tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
>>   #endif
>> -
>> -	/* enqueue in ring */
>> -	rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
>>   }
>>   
>>   /* call obj_cb() for each mempool element */
> Before this patch, the purpose of mempool_add_elem() was to add
> an object into a mempool:
> 1- write object header and trailers
> 2- chain it into the list of objects
> 3- add it into the ring/stack/whatever (=enqueue)
>
> Now, the enqueue is done in rte_mempool_op_populate_default() or will be
> done in the driver. I'm not sure it's a good idea to separate 3- from
> 2-, because an object that is chained into the list is expected to be
> in the ring/stack too.

When an object is dequeued, it is still chained into the list, but not in
the ring/stack. Separation is to use callback for generic mempool
housekeeping. Enqueue is a driver-specific operation.

> This risk of mis-synchronization is also enforced by the fact that
> ops->populate() can be provided by the driver and mempool_add_elem() is
> passed as a callback pointer.
>
> It's not clear to me why rte_mempool_ops_enqueue_bulk() is
> removed from mempool_add_elem().

The idea was that it could be more efficient (and probably the only way)
to enqueue the first time inside the driver. In theory bucket mempool
could init and enqueue full buckets instead of objects one-by-one.
However, finally it appears to be easier to reuse default populate
callback and enqueue operation.
So, now I have no strong opinion and agree with your arguments,
that's why I've tried to highlight it rte_mempool_populate_t description.
Even explicit description does not always help...
So, should I return enqueue to callback or leave as is in my patches?

>> @@ -396,16 +394,13 @@ rte_mempool_populate_iova(struct rte_mempool *mp, char *vaddr,
>>   	else
>>   		off = RTE_PTR_ALIGN_CEIL(vaddr, RTE_CACHE_LINE_SIZE) - vaddr;
>>   
>> -	while (off + total_elt_sz <= len && mp->populated_size < mp->size) {
>> -		off += mp->header_size;
>> -		if (iova == RTE_BAD_IOVA)
>> -			mempool_add_elem(mp, (char *)vaddr + off,
>> -				RTE_BAD_IOVA);
>> -		else
>> -			mempool_add_elem(mp, (char *)vaddr + off, iova + off);
>> -		off += mp->elt_size + mp->trailer_size;
>> -		i++;
>> -	}
>> +	if (off > len)
>> +		return -EINVAL;
> I think there is a memory leak here (memhdr), but it's my fault ;)
> I introduced a similar code in commit 84121f1971:
>
>        if (i == 0)
>                return -EINVAL;
>
> I can send a patch for it if you want.

This one is yours, above is mine :)
Don't worry, I'll submit separate pre-patch to fix it with appropriate 
Fixes and Cc.
  

Patch

diff --git a/doc/guides/rel_notes/deprecation.rst b/doc/guides/rel_notes/deprecation.rst
index e02d4ca..c06fc67 100644
--- a/doc/guides/rel_notes/deprecation.rst
+++ b/doc/guides/rel_notes/deprecation.rst
@@ -72,7 +72,7 @@  Deprecation Notices
 
   - removal of ``get_capabilities`` mempool ops and related flags.
   - substitute ``register_memory_area`` with ``populate`` ops.
-  - addition of new ops to customize objects population and allocate contiguous
+  - addition of new op to allocate contiguous
     block of objects if underlying driver supports it.
 
 * mbuf: The control mbuf API will be removed in v18.05. The impacted
diff --git a/doc/guides/rel_notes/release_18_05.rst b/doc/guides/rel_notes/release_18_05.rst
index 59583ea..abaefe5 100644
--- a/doc/guides/rel_notes/release_18_05.rst
+++ b/doc/guides/rel_notes/release_18_05.rst
@@ -84,6 +84,8 @@  ABI Changes
 
   A new callback ``calc_mem_size`` has been added to ``rte_mempool_ops``
   to allow to customize required memory size calculation.
+  A new callback ``populate`` has been added to ``rte_mempool_ops``
+  to allow to customize objects population.
 
 
 Removed Items
diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
index 3bfb36e..ed0e982 100644
--- a/lib/librte_mempool/rte_mempool.c
+++ b/lib/librte_mempool/rte_mempool.c
@@ -99,7 +99,8 @@  static unsigned optimize_object_size(unsigned obj_size)
 }
 
 static void
-mempool_add_elem(struct rte_mempool *mp, void *obj, rte_iova_t iova)
+mempool_add_elem(struct rte_mempool *mp, __rte_unused void *opaque,
+		 void *obj, rte_iova_t iova)
 {
 	struct rte_mempool_objhdr *hdr;
 	struct rte_mempool_objtlr *tlr __rte_unused;
@@ -116,9 +117,6 @@  mempool_add_elem(struct rte_mempool *mp, void *obj, rte_iova_t iova)
 	tlr = __mempool_get_trailer(obj);
 	tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
 #endif
-
-	/* enqueue in ring */
-	rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
 }
 
 /* call obj_cb() for each mempool element */
@@ -396,16 +394,13 @@  rte_mempool_populate_iova(struct rte_mempool *mp, char *vaddr,
 	else
 		off = RTE_PTR_ALIGN_CEIL(vaddr, RTE_CACHE_LINE_SIZE) - vaddr;
 
-	while (off + total_elt_sz <= len && mp->populated_size < mp->size) {
-		off += mp->header_size;
-		if (iova == RTE_BAD_IOVA)
-			mempool_add_elem(mp, (char *)vaddr + off,
-				RTE_BAD_IOVA);
-		else
-			mempool_add_elem(mp, (char *)vaddr + off, iova + off);
-		off += mp->elt_size + mp->trailer_size;
-		i++;
-	}
+	if (off > len)
+		return -EINVAL;
+
+	i = rte_mempool_ops_populate(mp, mp->size - mp->populated_size,
+		(char *)vaddr + off,
+		(iova == RTE_BAD_IOVA) ? RTE_BAD_IOVA : (iova + off),
+		len - off, mempool_add_elem, NULL);
 
 	/* not enough room to store one object */
 	if (i == 0)
diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
index 0151f6c..49083bd 100644
--- a/lib/librte_mempool/rte_mempool.h
+++ b/lib/librte_mempool/rte_mempool.h
@@ -449,6 +449,63 @@  ssize_t rte_mempool_op_calc_mem_size_default(const struct rte_mempool *mp,
 		uint32_t obj_num, uint32_t pg_shift,
 		size_t *min_chunk_size, size_t *align);
 
+/**
+ * Function to be called for each populated object.
+ *
+ * @param[in] mp
+ *   A pointer to the mempool structure.
+ * @param[in] opaque
+ *   An opaque pointer passed to iterator.
+ * @param[in] vaddr
+ *   Object virtual address.
+ * @param[in] iova
+ *   Input/output virtual address of the object or RTE_BAD_IOVA.
+ */
+typedef void (rte_mempool_populate_obj_cb_t)(struct rte_mempool *mp,
+		void *opaque, void *vaddr, rte_iova_t iova);
+
+/**
+ * Populate memory pool objects using provided memory chunk.
+ *
+ * Populated objects should be enqueued to the pool, e.g. using
+ * rte_mempool_ops_enqueue_bulk().
+ *
+ * If the given IO address is unknown (iova = RTE_BAD_IOVA),
+ * the chunk doesn't need to be physically contiguous (only virtually),
+ * and allocated objects may span two pages.
+ *
+ * @param[in] mp
+ *   A pointer to the mempool structure.
+ * @param[in] max_objs
+ *   Maximum number of objects to be populated.
+ * @param[in] vaddr
+ *   The virtual address of memory that should be used to store objects.
+ * @param[in] iova
+ *   The IO address
+ * @param[in] len
+ *   The length of memory in bytes.
+ * @param[in] obj_cb
+ *   Callback function to be executed for each populated object.
+ * @param[in] obj_cb_arg
+ *   An opaque pointer passed to the callback function.
+ * @return
+ *   The number of objects added on success.
+ *   On error, no objects are populated and a negative errno is returned.
+ */
+typedef int (*rte_mempool_populate_t)(struct rte_mempool *mp,
+		unsigned int max_objs,
+		void *vaddr, rte_iova_t iova, size_t len,
+		rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg);
+
+/**
+ * Default way to populate memory pool object using provided memory
+ * chunk: just slice objects one by one.
+ */
+int rte_mempool_op_populate_default(struct rte_mempool *mp,
+		unsigned int max_objs,
+		void *vaddr, rte_iova_t iova, size_t len,
+		rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg);
+
 /** Structure defining mempool operations structure */
 struct rte_mempool_ops {
 	char name[RTE_MEMPOOL_OPS_NAMESIZE]; /**< Name of mempool ops struct. */
@@ -470,6 +527,11 @@  struct rte_mempool_ops {
 	 * store specified number of objects.
 	 */
 	rte_mempool_calc_mem_size_t calc_mem_size;
+	/**
+	 * Optional callback to populate mempool objects using
+	 * provided memory chunk.
+	 */
+	rte_mempool_populate_t populate;
 } __rte_cache_aligned;
 
 #define RTE_MEMPOOL_MAX_OPS_IDX 16  /**< Max registered ops structs */
@@ -642,6 +704,34 @@  ssize_t rte_mempool_ops_calc_mem_size(const struct rte_mempool *mp,
 				      size_t *min_chunk_size, size_t *align);
 
 /**
+ * @internal wrapper for mempool_ops populate callback.
+ *
+ * Populate memory pool objects using provided memory chunk.
+ *
+ * @param[in] mp
+ *   A pointer to the mempool structure.
+ * @param[in] max_objs
+ *   Maximum number of objects to be populated.
+ * @param[in] vaddr
+ *   The virtual address of memory that should be used to store objects.
+ * @param[in] iova
+ *   The IO address
+ * @param[in] len
+ *   The length of memory in bytes.
+ * @param[in] obj_cb
+ *   Callback function to be executed for each populated object.
+ * @param[in] obj_cb_arg
+ *   An opaque pointer passed to the callback function.
+ * @return
+ *   The number of objects added on success.
+ *   On error, no objects are populated and a negative errno is returned.
+ */
+int rte_mempool_ops_populate(struct rte_mempool *mp, unsigned int max_objs,
+			     void *vaddr, rte_iova_t iova, size_t len,
+			     rte_mempool_populate_obj_cb_t *obj_cb,
+			     void *obj_cb_arg);
+
+/**
  * @internal wrapper for mempool_ops free callback.
  *
  * @param mp
diff --git a/lib/librte_mempool/rte_mempool_ops.c b/lib/librte_mempool/rte_mempool_ops.c
index 26908cc..1a7f39f 100644
--- a/lib/librte_mempool/rte_mempool_ops.c
+++ b/lib/librte_mempool/rte_mempool_ops.c
@@ -60,6 +60,7 @@  rte_mempool_register_ops(const struct rte_mempool_ops *h)
 	ops->get_capabilities = h->get_capabilities;
 	ops->register_memory_area = h->register_memory_area;
 	ops->calc_mem_size = h->calc_mem_size;
+	ops->populate = h->populate;
 
 	rte_spinlock_unlock(&rte_mempool_ops_table.sl);
 
@@ -141,6 +142,26 @@  rte_mempool_ops_calc_mem_size(const struct rte_mempool *mp,
 	return ops->calc_mem_size(mp, obj_num, pg_shift, min_chunk_size, align);
 }
 
+/* wrapper to populate memory pool objects using provided memory chunk */
+int
+rte_mempool_ops_populate(struct rte_mempool *mp, unsigned int max_objs,
+				void *vaddr, rte_iova_t iova, size_t len,
+				rte_mempool_populate_obj_cb_t *obj_cb,
+				void *obj_cb_arg)
+{
+	struct rte_mempool_ops *ops;
+
+	ops = rte_mempool_get_ops(mp->ops_index);
+
+	if (ops->populate == NULL)
+		return rte_mempool_op_populate_default(mp, max_objs, vaddr,
+						       iova, len, obj_cb,
+						       obj_cb_arg);
+
+	return ops->populate(mp, max_objs, vaddr, iova, len, obj_cb,
+			     obj_cb_arg);
+}
+
 /* sets mempool ops previously registered by rte_mempool_register_ops. */
 int
 rte_mempool_set_ops_byname(struct rte_mempool *mp, const char *name,
diff --git a/lib/librte_mempool/rte_mempool_ops_default.c b/lib/librte_mempool/rte_mempool_ops_default.c
index 57fe79b..57295f7 100644
--- a/lib/librte_mempool/rte_mempool_ops_default.c
+++ b/lib/librte_mempool/rte_mempool_ops_default.c
@@ -36,3 +36,27 @@  rte_mempool_op_calc_mem_size_default(const struct rte_mempool *mp,
 
 	return mem_size;
 }
+
+int
+rte_mempool_op_populate_default(struct rte_mempool *mp, unsigned int max_objs,
+		void *vaddr, rte_iova_t iova, size_t len,
+		rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
+{
+	size_t total_elt_sz;
+	size_t off;
+	unsigned int i;
+	void *obj;
+
+	total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;
+
+	for (off = 0, i = 0; off + total_elt_sz <= len && i < max_objs; i++) {
+		off += mp->header_size;
+		obj = (char *)vaddr + off;
+		obj_cb(mp, obj_cb_arg, obj,
+		       (iova == RTE_BAD_IOVA) ? RTE_BAD_IOVA : (iova + off));
+		rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
+		off += mp->elt_size + mp->trailer_size;
+	}
+
+	return i;
+}
diff --git a/lib/librte_mempool/rte_mempool_version.map b/lib/librte_mempool/rte_mempool_version.map
index e2a054b..90e79ec 100644
--- a/lib/librte_mempool/rte_mempool_version.map
+++ b/lib/librte_mempool/rte_mempool_version.map
@@ -56,6 +56,7 @@  DPDK_18.05 {
 	global:
 
 	rte_mempool_op_calc_mem_size_default;
+	rte_mempool_op_populate_default;
 
 } DPDK_17.11;