[dpdk-dev] [PATCH v3 5/8] stack: add lock-free stack implementation

Honnappa Nagarahalli Honnappa.Nagarahalli at arm.com
Fri Mar 29 00:27:00 CET 2019


<snip>

> diff --git a/lib/librte_stack/rte_stack.c b/lib/librte_stack/rte_stack.c index
> 96dffdf44..8f0361ea1 100644
> --- a/lib/librte_stack/rte_stack.c
> +++ b/lib/librte_stack/rte_stack.c

<snip>

> @@ -63,9 +81,16 @@ rte_stack_create(const char *name, unsigned int
> count, int socket_id,
>  	unsigned int sz;
>  	int ret;
> 
> -	RTE_SET_USED(flags);
> +#ifdef RTE_ARCH_X86_64
> +	RTE_BUILD_BUG_ON(sizeof(struct rte_stack_lf_head) != 16);
This check should be independent of the platform.

 #else
> +	if (flags & RTE_STACK_F_LF) {
> +		STACK_LOG_ERR("Lock-free stack is not supported on your
> platform\n");
> +		return NULL;
> +	}
> +#endif
> 
> -	sz = rte_stack_get_memsize(count);
> +	sz = rte_stack_get_memsize(count, flags);
> 
>  	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
>  		       RTE_STACK_MZ_PREFIX, name);
> @@ -94,7 +119,7 @@ rte_stack_create(const char *name, unsigned int
> count, int socket_id,
> 
>  	s = mz->addr;
> 
> -	rte_stack_init(s);
> +	rte_stack_init(s, count, flags);
> 
>  	/* Store the name for later lookups */
>  	ret = snprintf(s->name, sizeof(s->name), "%s", name); diff --git
> a/lib/librte_stack/rte_stack.h b/lib/librte_stack/rte_stack.h index
> 7a633deb5..b484313bb 100644
> --- a/lib/librte_stack/rte_stack.h
> +++ b/lib/librte_stack/rte_stack.h
> @@ -30,6 +30,35 @@ extern "C" {

<snip>

> +/**
> + * @internal Push several objects on the lock-free stack (MT-safe).
> + *
> + * @param s
> + *   A pointer to the stack structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to push on the stack from the obj_table.
> + * @return
> + *   Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned int __rte_experimental
This is an internal function. Is '__rte_experimental' tag required? (applies to other instances in this patch)

> +rte_stack_lf_push(struct rte_stack *s, void * const *obj_table,
> +unsigned int n) {
> +	struct rte_stack_lf_elem *tmp, *first, *last = NULL;
> +	unsigned int i;
> +
> +	if (unlikely(n == 0))
> +		return 0;
> +
> +	/* Pop n free elements */
> +	first = __rte_stack_lf_pop(&s->stack_lf.free, n, NULL, &last);
> +	if (unlikely(first == NULL))
> +		return 0;
> +
> +	/* Construct the list elements */
> +	for (tmp = first, i = 0; i < n; i++, tmp = tmp->next)
> +		tmp->data = obj_table[n - i - 1];
> +
> +	/* Push them to the used list */
> +	__rte_stack_lf_push(&s->stack_lf.used, first, last, n);
> +
> +	return n;
> +}
> +
 
<snip>

> 
>  /**
> @@ -225,7 +339,10 @@ rte_stack_free_count(struct rte_stack *s)
>   *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
>   *   constraint for the reserved zone.
>   * @param flags
> - *   Reserved for future use.
> + *   An OR of the following:
> + *    - RTE_STACK_F_LF: If this flag is set, the stack uses lock-free
> + *      variants of the push and pop functions. Otherwise, it achieves
> + *      thread-safety using a lock.
>   * @return
>   *   On success, the pointer to the new allocated stack. NULL on error with
>   *    rte_errno set appropriately. Possible errno values include:
> diff --git a/lib/librte_stack/rte_stack_generic.h
> b/lib/librte_stack/rte_stack_generic.h
> new file mode 100644
> index 000000000..5e4cbc38e
> --- /dev/null
> +++ b/lib/librte_stack/rte_stack_generic.h
The name "...stack_generic.h" is confusing. It is implementing LF algorithm.
IMO, the code should be re-organized differently.
rte_stack.h, rte_stack.c - Contain the APIs (calling std or LF based on the flag) and top level structure definition
rte_stack_std.c, rte_stack_std.h - Contain the standard implementation
rte_stack_lf.c, rte_stack_lf.h - Contain the LF implementation
rte_stack_lf_c11.h - Contain the LF C11 implementation

> @@ -0,0 +1,151 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#ifndef _RTE_STACK_GENERIC_H_
> +#define _RTE_STACK_GENERIC_H_
> +
> +#include <rte_branch_prediction.h>
> +#include <rte_prefetch.h>
> +

<snip>

> +
> +static __rte_always_inline struct rte_stack_lf_elem *
> +__rte_stack_lf_pop(struct rte_stack_lf_list *list,
> +		   unsigned int num,
> +		   void **obj_table,
> +		   struct rte_stack_lf_elem **last)
> +{
> +#ifndef RTE_ARCH_X86_64
> +	RTE_SET_USED(obj_table);
> +	RTE_SET_USED(last);
> +	RTE_SET_USED(list);
> +	RTE_SET_USED(num);
> +
> +	return NULL;
> +#else
> +	struct rte_stack_lf_head old_head;
> +	int success;
> +
> +	/* Reserve num elements, if available */
> +	while (1) {
> +		uint64_t len = rte_atomic64_read(&list->len);
> +
> +		/* Does the list contain enough elements? */
> +		if (unlikely(len < num))
> +			return NULL;
> +
> +		if (rte_atomic64_cmpset((volatile uint64_t *)&list->len,
> +					len, len - num))
> +			break;
> +	}
> +
> +	old_head = list->head;
> +
> +	/* Pop num elements */
> +	do {
> +		struct rte_stack_lf_head new_head;
> +		struct rte_stack_lf_elem *tmp;
> +		unsigned int i;
> +
> +		rte_prefetch0(old_head.top);
> +
> +		tmp = old_head.top;
> +
> +		/* Traverse the list to find the new head. A next pointer will
> +		 * either point to another element or NULL; if a thread
> +		 * encounters a pointer that has already been popped, the
> CAS
> +		 * will fail.
> +		 */
> +		for (i = 0; i < num && tmp != NULL; i++) {
> +			rte_prefetch0(tmp->next);
> +			if (obj_table)
> +				obj_table[i] = tmp->data;
> +			if (last)
> +				*last = tmp;
> +			tmp = tmp->next;
> +		}
> +
> +		/* If NULL was encountered, the list was modified while
> +		 * traversing it. Retry.
> +		 */
> +		if (i != num)
> +			continue;
> +
> +		new_head.top = tmp;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		/* old_head is updated on failure */
> +		success = rte_atomic128_cmp_exchange(
> +				(rte_int128_t *)&list->head,
> +				(rte_int128_t *)&old_head,
> +				(rte_int128_t *)&new_head,
> +				1, __ATOMIC_ACQUIRE,
> +				__ATOMIC_ACQUIRE);
Just wondering if  'rte_atomic128_cmp_exchange' for x86 should have compiler barriers based on the memory order passed?
C++11 memory model is getting mixed with barrier based model. I think this is something that needs to be discussed at a wider level.

> +	} while (success == 0);
> +
> +	return old_head.top;
> +#endif
> +}
> +
> +#endif /* _RTE_STACK_GENERIC_H_ */
> --
> 2.13.6



More information about the dev mailing list