[dpdk-stable] [dpdk-dev] [PATCH v7 08/16] test/distributor: fix freeing mbufs
Lukasz Wojciechowski
l.wojciechow at partner.samsung.com
Sat Oct 17 05:28:45 CEST 2020
Hi Honnappa,
W dniu 16.10.2020 o 07:12, Honnappa Nagarahalli pisze:
> <snip>
>
>> Sanity tests with mbuf alloc and shutdown tests assume that mbufs passed
>> to worker cores are freed in handlers.
>> Such packets should not be returned to the distributor's main core. The only
>> packets that should be returned are the packets send after completion of
>> the tests in quit_workers function.
>>
>> This patch stops returning mbufs to distributor's core.
>> In case of shutdown tests it is impossible to determine how worker and
>> distributor threads would synchronize.
>> Packets used by tests should be freed and packets used during
>> quit_workers() shouldn't. That's why returning mbufs to mempool is moved
>> to test procedure run on distributor thread from worker threads.
>>
>> Additionally this patch cleans up unused variables.
>>
>> Fixes: c0de0eb82e40 ("distributor: switch over to new API")
>> Cc: david.hunt at intel.com
>> Cc: stable at dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow at partner.samsung.com>
>> Acked-by: David Hunt <david.hunt at intel.com>
>> ---
>> app/test/test_distributor.c | 96 ++++++++++++++++++-------------------
>> 1 file changed, 47 insertions(+), 49 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 838459392..06e01ff9d 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -44,7 +44,7 @@ total_packet_count(void)
>> unsigned i, count = 0;
>> for (i = 0; i < worker_idx; i++)
>> count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> - __ATOMIC_ACQUIRE);
>> + __ATOMIC_RELAXED);
> I think it is better to make this and other statistics changes below in commit 6/16. It will be in line with the commit log as well.
I changed the order of patches to avoid duplicated changes in the code.
>
>> return count;
>> }
>>
>> @@ -55,7 +55,7 @@ clear_packet_count(void)
>> unsigned int i;
>> for (i = 0; i < RTE_MAX_LCORE; i++)
>> __atomic_store_n(&worker_stats[i].handled_packets, 0,
>> - __ATOMIC_RELEASE);
>> + __ATOMIC_RELAXED);
>> }
>>
>> /* this is the basic worker function for sanity test @@ -67,20 +67,18 @@
>> handle_work(void *arg)
>> struct rte_mbuf *buf[8] __rte_cache_aligned;
>> struct worker_params *wp = arg;
>> struct rte_distributor *db = wp->dist;
>> - unsigned int count = 0, num;
>> + unsigned int num;
>> unsigned int id = __atomic_fetch_add(&worker_idx, 1,
>> __ATOMIC_RELAXED);
>>
>> num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
>> while (!quit) {
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> - __ATOMIC_ACQ_REL);
>> - count += num;
>> + __ATOMIC_RELAXED);
>> num = rte_distributor_get_pkt(db, id,
>> buf, buf, num);
>> }
>> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> - __ATOMIC_ACQ_REL);
>> - count += num;
>> + __ATOMIC_RELAXED);
>> rte_distributor_return_pkt(db, id, buf, num);
>> return 0;
>> }
>> @@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> - __ATOMIC_ACQUIRE));
>> + __ATOMIC_RELAXED));
>> printf("Sanity test with all zero hashes done.\n");
>>
>> /* pick two flows and check they go correctly */ @@ -163,7 +161,7
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>> printf("Worker %u handled %u packets\n", i,
>> __atomic_load_n(
>> &worker_stats[i].handled_packets,
>> - __ATOMIC_ACQUIRE));
>> + __ATOMIC_RELAXED));
>> printf("Sanity test with two hash values done\n");
>> }
>>
>> @@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> - __ATOMIC_ACQUIRE));
>> + __ATOMIC_RELAXED));
>> printf("Sanity test with non-zero hashes done\n");
>>
>> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -276,23
>> +274,20 @@ handle_work_with_free_mbufs(void *arg)
>> struct rte_mbuf *buf[8] __rte_cache_aligned;
>> struct worker_params *wp = arg;
>> struct rte_distributor *d = wp->dist;
>> - unsigned int count = 0;
>> unsigned int i;
>> unsigned int num;
>> unsigned int id = __atomic_fetch_add(&worker_idx, 1,
>> __ATOMIC_RELAXED);
>>
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>> while (!quit) {
>> - count += num;
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> - __ATOMIC_ACQ_REL);
>> + __ATOMIC_RELAXED);
>> for (i = 0; i < num; i++)
>> rte_pktmbuf_free(buf[i]);
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>> }
>> - count += num;
>> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> - __ATOMIC_ACQ_REL);
>> + __ATOMIC_RELAXED);
>> rte_distributor_return_pkt(d, id, buf, num);
>> return 0;
>> }
>> @@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params
>> *wp, struct rte_mempool *p)
>> rte_distributor_process(d, NULL, 0);
>> for (j = 0; j < BURST; j++) {
>> bufs[j]->hash.usr = (i+j) << 1;
>> - rte_mbuf_refcnt_set(bufs[j], 1);
>> }
>>
>> rte_distributor_process(d, bufs, BURST); @@ -342,15 +336,10
>> @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct
>> rte_mempool *p) static int handle_work_for_shutdown_test(void *arg) {
>> - struct rte_mbuf *pkt = NULL;
>> struct rte_mbuf *buf[8] __rte_cache_aligned;
>> struct worker_params *wp = arg;
>> struct rte_distributor *d = wp->dist;
>> - unsigned int count = 0;
>> unsigned int num;
>> - unsigned int total = 0;
>> - unsigned int i;
>> - unsigned int returned = 0;
>> unsigned int zero_id = 0;
>> unsigned int zero_unset;
>> const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ -
>> 368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg)
>> /* wait for quit single globally, or for worker zero, wait
>> * for zero_quit */
>> while (!quit && !(id == zero_id && zero_quit)) {
>> - count += num;
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> - __ATOMIC_ACQ_REL);
>> - for (i = 0; i < num; i++)
>> - rte_pktmbuf_free(buf[i]);
>> + __ATOMIC_RELAXED);
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>
>> if (num > 0) {
>> @@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg)
>> false, __ATOMIC_ACQ_REL,
>> __ATOMIC_ACQUIRE);
>> }
>> zero_id = __atomic_load_n(&zero_idx,
>> __ATOMIC_ACQUIRE);
>> -
>> - total += num;
>> }
>> - count += num;
>> - returned = rte_distributor_return_pkt(d, id, buf, num);
>> -
>> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> - __ATOMIC_ACQ_REL);
>> + __ATOMIC_RELAXED);
>> if (id == zero_id) {
>> + rte_distributor_return_pkt(d, id, NULL, 0);
>> +
>> /* for worker zero, allow it to restart to pick up last packet
>> * when all workers are shutting down.
>> */
>> @@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>> while (!quit) {
>>
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> - num, __ATOMIC_ACQ_REL);
>> - count += num;
>> - rte_pktmbuf_free(pkt);
>> + num, __ATOMIC_RELAXED);
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>> }
>> - returned = rte_distributor_return_pkt(d,
>> - id, buf, num);
>> - printf("Num returned = %d\n", returned);
>> }
>> + rte_distributor_return_pkt(d, id, buf, num);
>> return 0;
>> }
>>
>> @@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp, {
>> struct rte_distributor *d = wp->dist;
>> struct rte_mbuf *bufs[BURST];
>> - unsigned i;
>> + struct rte_mbuf *bufs2[BURST];
>> + unsigned int i;
>> + unsigned int failed = 0;
>>
>> printf("=== Sanity test of worker shutdown ===\n");
>>
>> @@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>> */
>>
>> /* get more buffers to queue up, again setting them to the same
>> flow */
>> - if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
>> + if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
>> printf("line %d: Error getting mbufs from pool\n", __LINE__);
>> + rte_mempool_put_bulk(p, (void *)bufs, BURST);
>> return -1;
>> }
>> for (i = 0; i < BURST; i++)
>> - bufs[i]->hash.usr = 1;
>> + bufs2[i]->hash.usr = 1;
>>
>> /* get worker zero to quit */
>> zero_quit = 1;
>> - rte_distributor_process(d, bufs, BURST);
>> + rte_distributor_process(d, bufs2, BURST);
>>
>> /* flush the distributor */
>> rte_distributor_flush(d);
>> @@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> - __ATOMIC_ACQUIRE));
>> + __ATOMIC_RELAXED));
>>
>> if (total_packet_count() != BURST * 2) {
>> printf("Line %d: Error, not all packets flushed. "
>> "Expected %u, got %u\n",
>> __LINE__, BURST * 2, total_packet_count());
>> - return -1;
>> + failed = 1;
>> }
>>
>> + rte_mempool_put_bulk(p, (void *)bufs, BURST);
>> + rte_mempool_put_bulk(p, (void *)bufs2, BURST);
>> +
>> + if (failed)
>> + return -1;
>> +
>> printf("Sanity test with worker shutdown passed\n\n");
>> return 0;
>> }
>> @@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp, {
>> struct rte_distributor *d = wp->dist;
>> struct rte_mbuf *bufs[BURST];
>> - unsigned i;
>> + unsigned int i;
>> + unsigned int failed = 0;
>>
>> printf("=== Test flush fn with worker shutdown (%s) ===\n", wp-
>>> name);
>> @@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> - __ATOMIC_ACQUIRE));
>> + __ATOMIC_RELAXED));
>>
>> if (total_packet_count() != BURST) {
>> printf("Line %d: Error, not all packets flushed. "
>> "Expected %u, got %u\n",
>> __LINE__, BURST, total_packet_count());
>> - return -1;
>> + failed = 1;
>> }
>>
>> + rte_mempool_put_bulk(p, (void *)bufs, BURST);
>> +
>> + if (failed)
>> + return -1;
>> +
>> printf("Flush test with worker shutdown passed\n\n");
>> return 0;
>> }
>> @@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct
>> rte_mempool *p)
>> const unsigned num_workers = rte_lcore_count() - 1;
>> unsigned i;
>> struct rte_mbuf *bufs[RTE_MAX_LCORE];
>> - rte_mempool_get_bulk(p, (void *)bufs, num_workers);
>> + if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
>> + printf("line %d: Error getting mbufs from pool\n", __LINE__);
>> + return;
>> + }
>>
>> zero_quit = 0;
>> quit = 1;
>> @@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct
>> rte_mempool *p)
>> bufs[i]->hash.usr = i << 1;
>> rte_distributor_process(d, bufs, num_workers);
>>
>> - rte_mempool_put_bulk(p, (void *)bufs, num_workers);
>> -
>> rte_distributor_process(d, NULL, 0);
>> rte_distributor_flush(d);
>> rte_eal_mp_wait_lcore();
>> +
>> + rte_mempool_put_bulk(p, (void *)bufs, num_workers);
>> +
>> quit = 0;
>> worker_idx = 0;
>> zero_idx = RTE_MAX_LCORE;
>> --
>> 2.17.1
--
Lukasz Wojciechowski
Principal Software Engineer
Samsung R&D Institute Poland
Samsung Electronics
Office +48 22 377 88 25
l.wojciechow at partner.samsung.com
More information about the stable
mailing list