[DTS][PATCH V1 1/5] tests/vf_offload: Improve checksum_validate method.
Ke Xu
ke1.xu at intel.com
Tue Dec 27 11:21:29 CET 2022
Use packet.show(dump) methods and packet reading to
validate packets in a faster, more common and more
stable way.
This allows packets other than the plain packets can
be verified by this method checksum_validate.
Signed-off-by: Ke Xu <ke1.xu at intel.com>
---
tests/TestSuite_vf_offload.py | 89 +++++++++++++----------------------
1 file changed, 34 insertions(+), 55 deletions(-)
diff --git a/tests/TestSuite_vf_offload.py b/tests/TestSuite_vf_offload.py
index 62e74c33..009da148 100644
--- a/tests/TestSuite_vf_offload.py
+++ b/tests/TestSuite_vf_offload.py
@@ -253,6 +253,16 @@ class TestVfOffload(TestCase):
dut.send_expect("tunnel_tso set 800 %d" % port, "testpmd>")
dut.send_expect("port start %d" % port, "testpmd>")
+ def filter_packets(self, packets):
+ return [
+ p
+ for p in packets
+ if len(p.layers()) >= 3
+ and p.layers()[1] in {IP, IPv6}
+ and p.layers()[2] in {IP, IPv6, UDP, TCP, SCTP, GRE, MPLS}
+ and Raw in p
+ ]
+
def checksum_validate(self, packets_sent, packets_expected):
"""
Validate the checksum.
@@ -266,17 +276,16 @@ class TestVfOffload(TestCase):
sniff_src = self.vm0_testpmd.get_port_mac(0)
checksum_pattern = re.compile("chksum.*=.*(0x[0-9a-z]+)")
sniff_src = "52:00:00:00:00:00"
- chksum = dict()
+ expected_chksum_list = dict()
result = dict()
-
self.tester.send_expect("scapy", ">>> ")
-
+ self.tester.send_expect("from scapy.contrib.gtp import GTP_U_Header", ">>>")
for packet_type in list(packets_expected.keys()):
self.tester.send_expect("p = %s" % packets_expected[packet_type], ">>>")
out = self.tester.send_expect("p.show2()", ">>>")
- chksums = checksum_pattern.findall(out)
- chksum[packet_type] = chksums
- print(packet_type, ": ", chksums)
+ chksum = checksum_pattern.findall(out)
+ expected_chksum_list[packet_type] = chksum
+ print(packet_type, ": ", chksum)
self.tester.send_expect("exit()", "#")
@@ -289,7 +298,7 @@ class TestVfOffload(TestCase):
# Send packet.
self.tester.scapy_foreground()
-
+ self.tester.scapy_append("from scapy.contrib.gtp import GTP_U_Header")
for packet_type in list(packets_sent.keys()):
self.tester.scapy_append(
'sendp([%s], iface="%s")' % (packets_sent[packet_type], tx_interface)
@@ -297,58 +306,28 @@ class TestVfOffload(TestCase):
self.tester.scapy_execute()
out = self.tester.scapy_get_result()
-
- p = self.tester.load_tcpdump_sniff_packets(inst)
- nr_packets = len(p)
- print(p)
- packets_received = [
- p[i].sprintf("%IP.chksum%;%TCP.chksum%;%UDP.chksum%;%SCTP.chksum%")
- for i in range(nr_packets)
- ]
+ packets_received = self.filter_packets(self.tester.load_tcpdump_sniff_packets(inst))
+ print(list(packets_received))
self.verify(
len(packets_sent) == len(packets_received), "Unexpected Packets Drop"
)
-
- for packet_received in packets_received:
- (
- ip_checksum,
- tcp_checksum,
- udp_checksum,
- sctp_checksum,
- ) = packet_received.split(";")
- print(
- "ip_checksum: ",
- ip_checksum,
- "tcp_checksum:, ",
- tcp_checksum,
- "udp_checksum: ",
- udp_checksum,
- "sctp_checksum: ",
- sctp_checksum,
- )
-
- packet_type = ""
- l4_checksum = ""
- if tcp_checksum != "??":
- packet_type = "TCP"
- l4_checksum = tcp_checksum
- elif udp_checksum != "??":
- packet_type = "UDP"
- l4_checksum = udp_checksum
- elif sctp_checksum != "??":
- packet_type = "SCTP"
- l4_checksum = sctp_checksum
-
- if ip_checksum != "??":
- packet_type = "IP/" + packet_type
- if chksum[packet_type] != [ip_checksum, l4_checksum]:
- result[packet_type] = packet_type + " checksum error"
- else:
- packet_type = "IPv6/" + packet_type
- if chksum[packet_type] != [l4_checksum]:
- result[packet_type] = packet_type + " checksum error"
-
+ for i in range(len(packets_sent)):
+ packet_type = list(packets_sent.keys())[i]
+ checksum_received = checksum_pattern.findall(packets_received[i].show2(dump=True))
+ checksum_expected = expected_chksum_list[packets_sent.keys()[i]]
+ self.logger.debug(f"checksum_received: {checksum_received}")
+ self.logger.debug(f"checksum_expected: {checksum_expected}")
+ if not len(checksum_expected) == len(checksum_received):
+ result[packet_type] = (
+ packet_type + " Failed:"
+ + f"The chksum type {packet_type} length of the actual result is inconsistent with the expected length!"
+ )
+ elif not (checksum_received == checksum_expected):
+ result[packet_type] = (
+ packet_type + " Failed:"
+ + f"The actually received chksum {packet_type} is inconsistent with the expectation"
+ )
return result
def test_checksum_offload_enable(self):
--
2.25.1
More information about the dts
mailing list