[dpdk-dev] [PATCH 3/4] port: fix full burst checks in f_tx_bulk ops

Dumitrescu, Cristian cristian.dumitrescu at intel.com
Thu Mar 31 17:41:10 CEST 2016



> -----Original Message-----
> From: Robert Sanford [mailto:rsanford2 at gmail.com]
> Sent: Monday, March 28, 2016 9:52 PM
> To: dev at dpdk.org; Dumitrescu, Cristian <cristian.dumitrescu at intel.com>
> Subject: [PATCH 3/4] port: fix full burst checks in f_tx_bulk ops
> 
> For several f_tx_bulk functions in rte_port_{ethdev,ring,sched}.c,
> it appears that the intent of the bsz_mask logic is to test whether
> pkts_mask contains a full burst (i.e., the <tx_burst_sz> least
> significant bits are set).
> 
> There are two problems with the bsz_mask code: 1) It truncates
> by using the wrong size for local variable uint32_t bsz_mask, and

This is indeed a bug: although port->bsz_mask is always defined as uint64_t, there are several places where we cache it to a local variable which is defined as 32-bit by mistake: uint32_t bsz = p->bsz_mask. Thanks, Robert!

> 2) We may pass oversized bursts to the underlying ethdev/ring/sched,
> e.g., tx_burst_sz=16, bsz_mask=0x8000, and pkts_mask=0x1ffff
> (17 packets), results in expr==0, and we send a burst larger than
> desired (and non-power-of-2) to the underlying tx burst interface.
> 

As stated in another related email, this is done by design, with the key assumption being that larger TX burst sizes will always be beneficial. So tx_burst_size is, by design, a requirement for the *minimal* value of the TX burst size rather than the *exact* value for the burst size.
As an example, when the TX burst size of 32 is set, then larger burst sizes of 33, 34, ..., 40, 41, ..., 48, ..., 64 are welcomed and sent out as a single burst rather than breaking in into multiple fixed size 32-packet bursts. 
For PMDs, burst size (smaller than 64) is typically much lower than the TX ring size (typical value for IXGBE: 512). Same for rte_ring.

So what we are debating here is which of the following two approaches is better:
Approach 1: Consider tx_burst_sz as the minimal burst size, welcome larger bursts and send them as a single burst (i.e. do not break them into fixed tx_burst_sz bursts). This is the existing approach used consistently everywhere in librte_port.
Approach 2: Consider tx_burst_sz as an exact burst size requirement, any larger incoming burst is broken into fixed size bursts of exactly tx_burst_sz packets before send. This is the approach suggested by Robert.

I think we should go for the approach that gives the best performance. Personally, I think Approach 1 (existing) is doing this, but I would like to get more fact-based opinions from the people on this mail list (CC-ing a few key folks), especially PMD and ring maintainers. What is your experience, guys?

> We propose to effectively set bsz_mask = (1 << tx_burst_sz) - 1
> (while avoiding truncation for tx_burst_sz=64), to cache the mask
> value of a full burst, and then do a simple compare with pkts_mask
> in each f_tx_bulk.
> 
> Signed-off-by: Robert Sanford <rsanford at akamai.com>
> ---
>  lib/librte_port/rte_port_ethdev.c |   15 ++++-----------
>  lib/librte_port/rte_port_ring.c   |   16 ++++------------
>  lib/librte_port/rte_port_sched.c  |    7 ++-----
>  3 files changed, 10 insertions(+), 28 deletions(-)
> 
> diff --git a/lib/librte_port/rte_port_ethdev.c
> b/lib/librte_port/rte_port_ethdev.c
> index 1c34602..3fb4947 100644
> --- a/lib/librte_port/rte_port_ethdev.c
> +++ b/lib/librte_port/rte_port_ethdev.c
> @@ -188,7 +188,7 @@ rte_port_ethdev_writer_create(void *params, int
> socket_id)
>  	port->queue_id = conf->queue_id;
>  	port->tx_burst_sz = conf->tx_burst_sz;
>  	port->tx_buf_count = 0;
> -	port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
> +	port->bsz_mask = UINT64_MAX >> (64 - conf->tx_burst_sz);

Another way to write this is: port->bsz_mask = RTE_LEN2MASK(conf->tx_burst_sz, uint64_t);

> 
>  	return port;
>  }
> @@ -229,12 +229,9 @@ rte_port_ethdev_writer_tx_bulk(void *port,
>  {
>  	struct rte_port_ethdev_writer *p =
>  		(struct rte_port_ethdev_writer *) port;
> -	uint32_t bsz_mask = p->bsz_mask;
>  	uint32_t tx_buf_count = p->tx_buf_count;
> -	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
> -			((pkts_mask & bsz_mask) ^ bsz_mask);
> 
> -	if (expr == 0) {
> +	if (pkts_mask == p->bsz_mask) {
>  		uint64_t n_pkts = __builtin_popcountll(pkts_mask);
>  		uint32_t n_pkts_ok;
> 
> @@ -369,7 +366,7 @@ rte_port_ethdev_writer_nodrop_create(void
> *params, int socket_id)
>  	port->queue_id = conf->queue_id;
>  	port->tx_burst_sz = conf->tx_burst_sz;
>  	port->tx_buf_count = 0;
> -	port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
> +	port->bsz_mask = UINT64_MAX >> (64 - conf->tx_burst_sz);
> 
>  	/*
>  	 * When n_retries is 0 it means that we should wait for every packet
> to
> @@ -435,13 +432,9 @@ rte_port_ethdev_writer_nodrop_tx_bulk(void
> *port,
>  {
>  	struct rte_port_ethdev_writer_nodrop *p =
>  		(struct rte_port_ethdev_writer_nodrop *) port;
> -
> -	uint32_t bsz_mask = p->bsz_mask;
>  	uint32_t tx_buf_count = p->tx_buf_count;
> -	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
> -			((pkts_mask & bsz_mask) ^ bsz_mask);
> 
> -	if (expr == 0) {
> +	if (pkts_mask == p->bsz_mask) {
>  		uint64_t n_pkts = __builtin_popcountll(pkts_mask);
>  		uint32_t n_pkts_ok;
> 
> diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
> index 765ecc5..b36e4ce 100644
> --- a/lib/librte_port/rte_port_ring.c
> +++ b/lib/librte_port/rte_port_ring.c
> @@ -217,7 +217,7 @@ rte_port_ring_writer_create_internal(void *params,
> int socket_id,
>  	port->ring = conf->ring;
>  	port->tx_burst_sz = conf->tx_burst_sz;
>  	port->tx_buf_count = 0;
> -	port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
> +	port->bsz_mask = UINT64_MAX >> (64 - conf->tx_burst_sz);
>  	port->is_multi = is_multi;
> 
>  	return port;
> @@ -299,13 +299,9 @@ rte_port_ring_writer_tx_bulk_internal(void *port,
>  {
>  	struct rte_port_ring_writer *p =
>  		(struct rte_port_ring_writer *) port;
> -
> -	uint32_t bsz_mask = p->bsz_mask;
>  	uint32_t tx_buf_count = p->tx_buf_count;
> -	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
> -			((pkts_mask & bsz_mask) ^ bsz_mask);
> 
> -	if (expr == 0) {
> +	if (pkts_mask == p->bsz_mask) {
>  		uint64_t n_pkts = __builtin_popcountll(pkts_mask);
>  		uint32_t n_pkts_ok;
> 
> @@ -486,7 +482,7 @@ rte_port_ring_writer_nodrop_create_internal(void
> *params, int socket_id,
>  	port->ring = conf->ring;
>  	port->tx_burst_sz = conf->tx_burst_sz;
>  	port->tx_buf_count = 0;
> -	port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
> +	port->bsz_mask = UINT64_MAX >> (64 - conf->tx_burst_sz);
>  	port->is_multi = is_multi;
> 
>  	/*
> @@ -613,13 +609,9 @@ rte_port_ring_writer_nodrop_tx_bulk_internal(void
> *port,
>  {
>  	struct rte_port_ring_writer_nodrop *p =
>  		(struct rte_port_ring_writer_nodrop *) port;
> -
> -	uint32_t bsz_mask = p->bsz_mask;
>  	uint32_t tx_buf_count = p->tx_buf_count;
> -	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
> -			((pkts_mask & bsz_mask) ^ bsz_mask);
> 
> -	if (expr == 0) {
> +	if (pkts_mask == p->bsz_mask) {
>  		uint64_t n_pkts = __builtin_popcountll(pkts_mask);
>  		uint32_t n_pkts_ok;
> 
> diff --git a/lib/librte_port/rte_port_sched.c
> b/lib/librte_port/rte_port_sched.c
> index c5ff8ab..5b6afc4 100644
> --- a/lib/librte_port/rte_port_sched.c
> +++ b/lib/librte_port/rte_port_sched.c
> @@ -185,7 +185,7 @@ rte_port_sched_writer_create(void *params, int
> socket_id)
>  	port->sched = conf->sched;
>  	port->tx_burst_sz = conf->tx_burst_sz;
>  	port->tx_buf_count = 0;
> -	port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
> +	port->bsz_mask = UINT64_MAX >> (64 - conf->tx_burst_sz);
> 
>  	return port;
>  }
> @@ -214,12 +214,9 @@ rte_port_sched_writer_tx_bulk(void *port,
>  		uint64_t pkts_mask)
>  {
>  	struct rte_port_sched_writer *p = (struct rte_port_sched_writer *)
> port;
> -	uint32_t bsz_mask = p->bsz_mask;
>  	uint32_t tx_buf_count = p->tx_buf_count;
> -	uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
> -			((pkts_mask & bsz_mask) ^ bsz_mask);
> 
> -	if (expr == 0) {
> +	if (pkts_mask == p->bsz_mask) {
>  		__rte_unused uint32_t nb_tx;
>  		uint64_t n_pkts = __builtin_popcountll(pkts_mask);
> 
> --
> 1.7.1



More information about the dev mailing list