[dpdk-stable] [dpdk-dev] [PATCH v7 08/16] test/distributor: fix freeing mbufs

Lukasz Wojciechowski l.wojciechow at partner.samsung.com
Sat Oct 17 05:28:45 CEST 2020


Hi Honnappa,

W dniu 16.10.2020 o 07:12, Honnappa Nagarahalli pisze:
> <snip>
>
>> Sanity tests with mbuf alloc and shutdown tests assume that mbufs passed
>> to worker cores are freed in handlers.
>> Such packets should not be returned to the distributor's main core. The only
>> packets that should be returned are the packets send after completion of
>> the tests in quit_workers function.
>>
>> This patch stops returning mbufs to distributor's core.
>> In case of shutdown tests it is impossible to determine how worker and
>> distributor threads would synchronize.
>> Packets used by tests should be freed and packets used during
>> quit_workers() shouldn't. That's why returning mbufs to mempool is moved
>> to test procedure run on distributor thread from worker threads.
>>
>> Additionally this patch cleans up unused variables.
>>
>> Fixes: c0de0eb82e40 ("distributor: switch over to new API")
>> Cc: david.hunt at intel.com
>> Cc: stable at dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow at partner.samsung.com>
>> Acked-by: David Hunt <david.hunt at intel.com>
>> ---
>>   app/test/test_distributor.c | 96 ++++++++++++++++++-------------------
>>   1 file changed, 47 insertions(+), 49 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 838459392..06e01ff9d 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -44,7 +44,7 @@ total_packet_count(void)
>>   	unsigned i, count = 0;
>>   	for (i = 0; i < worker_idx; i++)
>>   		count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> -				__ATOMIC_ACQUIRE);
>> +				__ATOMIC_RELAXED);
> I think it is better to make this and other statistics changes below in commit 6/16. It will be in line with the commit log as well.
I changed the order of patches to avoid duplicated changes in the code.
>
>>   	return count;
>>   }
>>
>> @@ -55,7 +55,7 @@ clear_packet_count(void)
>>   	unsigned int i;
>>   	for (i = 0; i < RTE_MAX_LCORE; i++)
>>   		__atomic_store_n(&worker_stats[i].handled_packets, 0,
>> -			__ATOMIC_RELEASE);
>> +			__ATOMIC_RELAXED);
>>   }
>>
>>   /* this is the basic worker function for sanity test @@ -67,20 +67,18 @@
>> handle_work(void *arg)
>>   	struct rte_mbuf *buf[8] __rte_cache_aligned;
>>   	struct worker_params *wp = arg;
>>   	struct rte_distributor *db = wp->dist;
>> -	unsigned int count = 0, num;
>> +	unsigned int num;
>>   	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
>> __ATOMIC_RELAXED);
>>
>>   	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
>>   	while (!quit) {
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_ACQ_REL);
>> -		count += num;
>> +				__ATOMIC_RELAXED);
>>   		num = rte_distributor_get_pkt(db, id,
>>   				buf, buf, num);
>>   	}
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_ACQ_REL);
>> -	count += num;
>> +			__ATOMIC_RELAXED);
>>   	rte_distributor_return_pkt(db, id, buf, num);
>>   	return 0;
>>   }
>> @@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>   	printf("Sanity test with all zero hashes done.\n");
>>
>>   	/* pick two flows and check they go correctly */ @@ -163,7 +161,7
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>   			printf("Worker %u handled %u packets\n", i,
>>   				__atomic_load_n(
>>   					&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>   		printf("Sanity test with two hash values done\n");
>>   	}
>>
>> @@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>   	printf("Sanity test with non-zero hashes done\n");
>>
>>   	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -276,23
>> +274,20 @@ handle_work_with_free_mbufs(void *arg)
>>   	struct rte_mbuf *buf[8] __rte_cache_aligned;
>>   	struct worker_params *wp = arg;
>>   	struct rte_distributor *d = wp->dist;
>> -	unsigned int count = 0;
>>   	unsigned int i;
>>   	unsigned int num;
>>   	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
>> __ATOMIC_RELAXED);
>>
>>   	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>   	while (!quit) {
>> -		count += num;
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_ACQ_REL);
>> +				__ATOMIC_RELAXED);
>>   		for (i = 0; i < num; i++)
>>   			rte_pktmbuf_free(buf[i]);
>>   		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>   	}
>> -	count += num;
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_ACQ_REL);
>> +			__ATOMIC_RELAXED);
>>   	rte_distributor_return_pkt(d, id, buf, num);
>>   	return 0;
>>   }
>> @@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params
>> *wp, struct rte_mempool *p)
>>   			rte_distributor_process(d, NULL, 0);
>>   		for (j = 0; j < BURST; j++) {
>>   			bufs[j]->hash.usr = (i+j) << 1;
>> -			rte_mbuf_refcnt_set(bufs[j], 1);
>>   		}
>>
>>   		rte_distributor_process(d, bufs, BURST); @@ -342,15 +336,10
>> @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct
>> rte_mempool *p)  static int  handle_work_for_shutdown_test(void *arg)  {
>> -	struct rte_mbuf *pkt = NULL;
>>   	struct rte_mbuf *buf[8] __rte_cache_aligned;
>>   	struct worker_params *wp = arg;
>>   	struct rte_distributor *d = wp->dist;
>> -	unsigned int count = 0;
>>   	unsigned int num;
>> -	unsigned int total = 0;
>> -	unsigned int i;
>> -	unsigned int returned = 0;
>>   	unsigned int zero_id = 0;
>>   	unsigned int zero_unset;
>>   	const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ -
>> 368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg)
>>   	/* wait for quit single globally, or for worker zero, wait
>>   	 * for zero_quit */
>>   	while (!quit && !(id == zero_id && zero_quit)) {
>> -		count += num;
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_ACQ_REL);
>> -		for (i = 0; i < num; i++)
>> -			rte_pktmbuf_free(buf[i]);
>> +				__ATOMIC_RELAXED);
>>   		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>
>>   		if (num > 0) {
>> @@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg)
>>   				false, __ATOMIC_ACQ_REL,
>> __ATOMIC_ACQUIRE);
>>   		}
>>   		zero_id = __atomic_load_n(&zero_idx,
>> __ATOMIC_ACQUIRE);
>> -
>> -		total += num;
>>   	}
>> -	count += num;
>> -	returned = rte_distributor_return_pkt(d, id, buf, num);
>> -
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_ACQ_REL);
>> +			__ATOMIC_RELAXED);
>>   	if (id == zero_id) {
>> +		rte_distributor_return_pkt(d, id, NULL, 0);
>> +
>>   		/* for worker zero, allow it to restart to pick up last packet
>>   		 * when all workers are shutting down.
>>   		 */
>> @@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>>   		while (!quit) {
>>
>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>> -					num, __ATOMIC_ACQ_REL);
>> -			count += num;
>> -			rte_pktmbuf_free(pkt);
>> +					num, __ATOMIC_RELAXED);
>>   			num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>   		}
>> -		returned = rte_distributor_return_pkt(d,
>> -				id, buf, num);
>> -		printf("Num returned = %d\n", returned);
>>   	}
>> +	rte_distributor_return_pkt(d, id, buf, num);
>>   	return 0;
>>   }
>>
>> @@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,  {
>>   	struct rte_distributor *d = wp->dist;
>>   	struct rte_mbuf *bufs[BURST];
>> -	unsigned i;
>> +	struct rte_mbuf *bufs2[BURST];
>> +	unsigned int i;
>> +	unsigned int failed = 0;
>>
>>   	printf("=== Sanity test of worker shutdown ===\n");
>>
>> @@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>   	 */
>>
>>   	/* get more buffers to queue up, again setting them to the same
>> flow */
>> -	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
>> +	if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
>>   		printf("line %d: Error getting mbufs from pool\n", __LINE__);
>> +		rte_mempool_put_bulk(p, (void *)bufs, BURST);
>>   		return -1;
>>   	}
>>   	for (i = 0; i < BURST; i++)
>> -		bufs[i]->hash.usr = 1;
>> +		bufs2[i]->hash.usr = 1;
>>
>>   	/* get worker zero to quit */
>>   	zero_quit = 1;
>> -	rte_distributor_process(d, bufs, BURST);
>> +	rte_distributor_process(d, bufs2, BURST);
>>
>>   	/* flush the distributor */
>>   	rte_distributor_flush(d);
>> @@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>
>>   	if (total_packet_count() != BURST * 2) {
>>   		printf("Line %d: Error, not all packets flushed. "
>>   				"Expected %u, got %u\n",
>>   				__LINE__, BURST * 2, total_packet_count());
>> -		return -1;
>> +		failed = 1;
>>   	}
>>
>> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
>> +	rte_mempool_put_bulk(p, (void *)bufs2, BURST);
>> +
>> +	if (failed)
>> +		return -1;
>> +
>>   	printf("Sanity test with worker shutdown passed\n\n");
>>   	return 0;
>>   }
>> @@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,  {
>>   	struct rte_distributor *d = wp->dist;
>>   	struct rte_mbuf *bufs[BURST];
>> -	unsigned i;
>> +	unsigned int i;
>> +	unsigned int failed = 0;
>>
>>   	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp-
>>> name);
>> @@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>
>>   	if (total_packet_count() != BURST) {
>>   		printf("Line %d: Error, not all packets flushed. "
>>   				"Expected %u, got %u\n",
>>   				__LINE__, BURST, total_packet_count());
>> -		return -1;
>> +		failed = 1;
>>   	}
>>
>> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
>> +
>> +	if (failed)
>> +		return -1;
>> +
>>   	printf("Flush test with worker shutdown passed\n\n");
>>   	return 0;
>>   }
>> @@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct
>> rte_mempool *p)
>>   	const unsigned num_workers = rte_lcore_count() - 1;
>>   	unsigned i;
>>   	struct rte_mbuf *bufs[RTE_MAX_LCORE];
>> -	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
>> +	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
>> +		printf("line %d: Error getting mbufs from pool\n", __LINE__);
>> +		return;
>> +	}
>>
>>   	zero_quit = 0;
>>   	quit = 1;
>> @@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct
>> rte_mempool *p)
>>   		bufs[i]->hash.usr = i << 1;
>>   	rte_distributor_process(d, bufs, num_workers);
>>
>> -	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
>> -
>>   	rte_distributor_process(d, NULL, 0);
>>   	rte_distributor_flush(d);
>>   	rte_eal_mp_wait_lcore();
>> +
>> +	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
>> +
>>   	quit = 0;
>>   	worker_idx = 0;
>>   	zero_idx = RTE_MAX_LCORE;
>> --
>> 2.17.1

-- 
Lukasz Wojciechowski
Principal Software Engineer

Samsung R&D Institute Poland
Samsung Electronics
Office +48 22 377 88 25
l.wojciechow at partner.samsung.com



More information about the stable mailing list