[dpdk-stable] [dpdk-dev] [PATCH v4 2/8] test/distributor: synchronize lcores statistics
Lukasz Wojciechowski
l.wojciechow at partner.samsung.com
Fri Oct 2 13:25:19 CEST 2020
Hi Honnappa,
Many thanks for the review!
I'll write my answers here not inline as it would be easier to read them
in one place, I think.
So first of all I agree with you in 2 things:
1) all uses of statistics must be atomic and lack of that caused most of
the problems
2) it would be better to replace barrier and memset in
clear_packet_count() with atomic stores as you suggested
So I will apply both of above.
However I wasn't not fully convinced on changing acquire/release to
relaxed. It wood be perfectly ok
if it would look like in this Herb Sutter's example:
https://youtu.be/KeLBd2EJLOU?t=4170
But in his case the counters are cleared before worker threads start and
are printout after they are completed.
In case of the dpdk distributor tests both worker and main cores are
running at the same time. In the sanity_test, the statistics are cleared
and verified few times for different hashes of packages. The worker
cores are not stopped at this time and they continue their loops in
handle procedure. Verification made in main core is an exchange of data
as the current statistics indicate how the test will result.
So as I wasn't convinced, I run some tests with both both relaxed and
acquire/release modes and they both fail :(
The failures caused by statistics errors to number of tests ratio for
200000 tests was:
for relaxed: 0,000790562
for acq/rel: 0,000091321
That's why I'm going to modify tests in such way, that they would:
1) clear statistics
2) launch worker threads
3) run test
4) wait for workers procedures to complete
5) check stats, verify results and print them out
This way worker main core will use (clear or verify) stats only when
there are no worker threads. This would make things simpler and allowing
to focus on testing the distributor not tests. And of course relaxed
mode would be enough!
Best regards
Lukasz
W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
> <snip>
>
>> Statistics of handled packets are cleared and read on main lcore, while they
>> are increased in workers handlers on different lcores.
>>
>> Without synchronization occasionally showed invalid values.
>> This patch uses atomic acquire/release mechanisms to synchronize.
> In general, load-acquire and store-release memory orderings are required while synchronizing data (that cannot be updated atomically) between threads. In the situation, making counters atomic is enough.
>
>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>> Cc: bruce.richardson at intel.com
>> Cc: stable at dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow at partner.samsung.com>
>> Acked-by: David Hunt <david.hunt at intel.com>
>> ---
>> app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>> 1 file changed, 26 insertions(+), 13 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 35b25463a..0e49e3714 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -43,7 +43,8 @@ total_packet_count(void) {
>> unsigned i, count = 0;
>> for (i = 0; i < worker_idx; i++)
>> - count += worker_stats[i].handled_packets;
>> + count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE);
> RELAXED memory order is sufficient. For ex: the worker threads are not 'releasing' any data that is not atomically updated to the main thread.
>
>> return count;
>> }
>>
>> @@ -52,6 +53,7 @@ static inline void
>> clear_packet_count(void)
>> {
>> memset(&worker_stats, 0, sizeof(worker_stats));
>> + rte_atomic_thread_fence(__ATOMIC_RELEASE);
> Ideally, the counters should be set to 0 atomically rather than using a memset.
>
>> }
>>
>> /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
>> handle_work(void *arg)
>> num = rte_distributor_get_pkt(db, id, buf, buf, num);
>> while (!quit) {
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> - __ATOMIC_RELAXED);
>> + __ATOMIC_ACQ_REL);
> Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. The main thread might still see the updates from different threads in different order.
>
>> count += num;
>> num = rte_distributor_get_pkt(db, id,
>> buf, buf, num);
>> }
>> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> - __ATOMIC_RELAXED);
>> + __ATOMIC_ACQ_REL);
> Same here, do not see why this change is required.
>
>> count += num;
>> rte_distributor_return_pkt(db, id, buf, num);
>> return 0;
>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough.
>
>> printf("Sanity test with all zero hashes done.\n");
>>
>> /* pick two flows and check they go correctly */ @@ -159,7 +162,9
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(
>> + &worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough
>
>> printf("Sanity test with two hash values done\n");
>> }
>>
>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough
>
>> printf("Sanity test with non-zero hashes done\n");
>>
>> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>> buf[i] = NULL;
>> num = rte_distributor_get_pkt(d, id, buf, buf, num);
>> while (!quit) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> + __ATOMIC_ACQ_REL);
> IMO, the problem would be the non-atomic update of the statistics. So, __ATOMIC_RELAXED is enough
>
>> for (i = 0; i < num; i++)
>> rte_pktmbuf_free(buf[i]);
>> num = rte_distributor_get_pkt(d,
>> id, buf, buf, num);
>> }
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> + __ATOMIC_ACQ_REL);
> Same here, the problem is non-atomic update of the statistics, __ATOMIC_RELAXED is enough.
> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>
>> rte_distributor_return_pkt(d, id, buf, num);
>> return 0;
>> }
>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>> /* wait for quit single globally, or for worker zero, wait
>> * for zero_quit */
>> while (!quit && !(id == zero_id && zero_quit)) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> + __ATOMIC_ACQ_REL);
>> for (i = 0; i < num; i++)
>> rte_pktmbuf_free(buf[i]);
>> num = rte_distributor_get_pkt(d,
>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>> total += num;
>> }
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> returned = rte_distributor_return_pkt(d, id, buf, num);
>>
>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> + __ATOMIC_ACQ_REL);
>> if (id == zero_id) {
>> /* for worker zero, allow it to restart to pick up last packet
>> * when all workers are shutting down.
>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>> id, buf, buf, num);
>>
>> while (!quit) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> rte_pktmbuf_free(pkt);
>> num = rte_distributor_get_pkt(d, id, buf, buf, num);
>> +
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> + num, __ATOMIC_ACQ_REL);
>> }
>> returned = rte_distributor_return_pkt(d,
>> id, buf, num);
>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>>
>> if (total_packet_count() != BURST * 2) {
>> printf("Line %d: Error, not all packets flushed. "
>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>> zero_quit = 0;
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>>
>> if (total_packet_count() != BURST) {
>> printf("Line %d: Error, not all packets flushed. "
>> --
>> 2.17.1
--
Lukasz Wojciechowski
Principal Software Engineer
Samsung R&D Institute Poland
Samsung Electronics
Office +48 22 377 88 25
l.wojciechow at partner.samsung.com
More information about the stable
mailing list