[dts] [PATCH] tests: add test_rx_checksum_valid_flags

Liu, Yong yong.liu at intel.com
Tue Feb 21 04:47:23 CET 2017


Yuan, some comments below.

Thanks,
Marvin

On 02/08/2017 07:09 PM, Peng Yuan wrote:
> Signed-off-by: Peng Yuan <yuan.peng at intel.com>
>
> diff --git a/tests/TestSuite_checksum_offload.py b/tests/TestSuite_checksum_offload.py
> index 7989c14..36f3e48 100644
> --- a/tests/TestSuite_checksum_offload.py
> +++ b/tests/TestSuite_checksum_offload.py
> @@ -44,6 +44,7 @@ import utils
>   from test_case import TestCase
>   from pmd_output import PmdOutput
>   
> +
>   class TestChecksumOffload(TestCase):
>   
>       def set_up_all(self):
> @@ -81,6 +82,89 @@ class TestChecksumOffload(TestCase):
>               self.dut.send_expect("csum set tcp sw %d" % port, "testpmd>")
>               self.dut.send_expect("csum set sctp sw %d" % port, "testpmd>")
>   
> +    def get_chksum_values(self, packets_expected):
> +        """
> +        Validate the checksum flags.
> +        """
> +        tx_interface = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
> +        rx_interface = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
> +
> +        sniff_src = self.dut.get_mac_address(self.dut_ports[0])
These three variables (tx_interface/rx_interface/sniff_src) look like 
useless here.

> +        checksum_pattern = re.compile("chksum.*=.*(0x[0-9a-z]+)")
> +
> +        chksum = dict()
> +
> +        self.tester.send_expect("scapy", ">>> ")
> +
> +        for packet_type in packets_expected.keys():
> +            self.tester.send_expect("p = %s" % packets_expected[packet_type], ">>>")
> +            out = self.tester.send_expect("p.show2()", ">>>")
> +            chksums = checksum_pattern.findall(out)
> +            chksum[packet_type] = chksums
> +
> +        self.tester.send_expect("exit()", "#")
> +
> +        return chksum
> +
> +    def checksum_valid_flags(self, chksum_expected, packets_sent, flag):
> +        """
> +        Sends packets and check the checksum valid-flags.
> +        """
This logic maybe redundant for check function, for we only need to know 
how many packet checksum is right or wrong.
String match function still can fulfill the requirement and more easily 
to implemented.

> +        self.tester.scapy_foreground()
> +        self.dut.send_expect("start", "testpmd>")
> +        mac = self.dut.get_mac_address(self.dut_ports[0])
> +        tx_interface = self.tester.get_interface(self.tester.get_local_port(self.dut_ports[0]))
> +        self.tester.scapy_foreground()
> +
> +        for packet_type in packets_sent.keys():
> +            self.tester.scapy_append('sendp([%s], iface="%s")' % (packets_sent[packet_type], tx_interface))
> +            self.tester.scapy_execute()
> +            out = self.dut.get_session_output(timeout=1)
> +            lines = out.split("\r\n")
> +
> +            # collect the checksum result
> +            for line in lines:
> +                line = line.strip()
> +                if len(line) != 0 and line.startswith("rx"):
> +                    # IPv6 don't be checksum, so always show "GOOD"
> +                    if packet_type.startswith("IPv6"):
> +                        if "PKT_RX_L4_CKSUM" not in line:
> +                            self.verify(0, "There is no checksum flags appeared!")
> +                        else:
> +                            for item in line.split():
> +                                item = item.strip()
> +                                if "PKT_RX_L4_CKSUM" in item:
> +                                    flags, l4_chsum = item.split("=", 1)
> +                                    name, value = l4_chsum.split("_CKSUM_", 1)
> +                                    if (flag == 1):
> +                                        self.verify(value == "GOOD", "Packet Rx L4 checksum valid-flags error!")
> +                                    else:
> +                                        self.verify(value == "BAD", "Packet Rx L4 checksum valid-flags error!")
> +                    else:
> +                        if "PKT_RX_L4_CKSUM" not in line:
> +                            self.verify(0, "There is no L4 checksum flags appeared!")
> +                        elif "PKT_RX_IP_CKSUM" not in line:
> +                            self.verify(0, "There is no IP checksum flags appeared!")
> +                        else:
> +                            for item in line.split():
> +                                item = item.strip()
> +                                if "PKT_RX_L4_CKSUM" in item:
> +                                    flags, l4_chsum = item.split("=", 1)
> +                                    name, value = l4_chsum.split("_CKSUM_", 1)
> +                                    if (flag == 1):
> +                                        self.verify(value == "GOOD", "Packet Rx L4 checksum valid-flags error!")
> +                                    else:
> +                                        self.verify(value == "BAD", "Packet Rx L4 checksum valid-flags error!")
> +
> +                                elif(item.startswith("PKT_RX_IP_CKSUM")):
> +                                    name, value = item.split("_CKSUM_", 1)
> +                                    if (flag == 1):
> +                                        self.verify(value == "GOOD", "Packet Rx IP checksum valid-flags error!")
> +                                    else:
> +                                        self.verify(value == "BAD", "Packet Rx IP checksum valid-flags error!")
> +
> +        self.dut.send_expect("stop", "testpmd>")
> +
>       def checksum_validate(self, packets_sent, packets_expected):
>           """
>           Validate the checksum.
> @@ -123,16 +207,16 @@ class TestChecksumOffload(TestCase):
>           self.verify(len(packets_sent) == len(packets_received), "Unexpected Packets Drop")
>   
>           for packet_received in packets_received:
> -            ip_checksum, tcp_checksum, udp_checksup, sctp_checksum = packet_received.split(';')
> +            ip_checksum, tcp_checksum, udp_checksum, sctp_checksum = packet_received.split(';')
>   
>               packet_type = ''
>               l4_checksum = ''
>               if tcp_checksum != '??':
>                   packet_type = 'TCP'
>                   l4_checksum = tcp_checksum
> -            elif udp_checksup != '??':
> +            elif udp_checksum != '??':
>                   packet_type = 'UDP'
> -                l4_checksum = udp_checksup
> +                l4_checksum = udp_checksum
>               elif sctp_checksum != '??':
>                   packet_type = 'SCTP'
>                   l4_checksum = sctp_checksum
> @@ -157,16 +241,16 @@ class TestChecksumOffload(TestCase):
>           mac = self.dut.get_mac_address(self.dut_ports[0])
>   
>           pktsChkErr = {'IP/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/UDP(chksum=0xf)/("X"*46)' % mac,
> -                'IP/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/TCP(chksum=0xf)/("X"*46)' % mac,
> -                'IP/SCTP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/SCTP(chksum=0xf)/("X"*48)' % mac,
> -                'IPv6/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IPv6(src="::1")/UDP(chksum=0xf)/("X"*46)' % mac,
> -                'IPv6/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IPv6(src="::1")/TCP(chksum=0xf)/("X"*46)' % mac}
> +                      'IP/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/TCP(chksum=0xf)/("X"*46)' % mac,
> +                      'IP/SCTP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IP(chksum=0x0)/SCTP(chksum=0xf)/("X"*48)' % mac,
> +                      'IPv6/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IPv6(src="::1")/UDP(chksum=0xf)/("X"*46)' % mac,
> +                      'IPv6/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/Dot1Q(vlan=1)/IPv6(src="::1")/TCP(chksum=0xf)/("X"*46)' % mac}
>   
>           pkts = {'IP/UDP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/UDP()/("X"*46)' % mac,
> -                    'IP/TCP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/TCP()/("X"*46)' % mac,
> -                    'IP/SCTP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/SCTP()/("X"*48)' % mac,
> -                    'IPv6/UDP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IPv6(src="::1")/UDP()/("X"*46)' % mac,
> -                    'IPv6/TCP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IPv6(src="::1")/TCP()/("X"*46)' % mac}
> +                'IP/TCP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/TCP()/("X"*46)' % mac,
> +                'IP/SCTP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IP(src="127.0.0.1")/SCTP()/("X"*48)' % mac,
> +                'IPv6/UDP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IPv6(src="::1")/UDP()/("X"*46)' % mac,
> +                'IPv6/TCP': 'Ether(dst="02:00:00:00:00:00", src="%s")/Dot1Q(vlan=1)/IPv6(src="::1")/TCP()/("X"*46)' % mac}
>   
>           if self.kdriver == "fm10k":
>               del pktsChkErr['IP/SCTP']
> @@ -177,7 +261,50 @@ class TestChecksumOffload(TestCase):
>           result = self.checksum_validate(pktsChkErr, pkts)
>           self.dut.send_expect("stop", "testpmd>")
>           self.verify(len(result) == 0, string.join(result.values(), ","))
> -
> +
> +    def test_rx_checksum_valid_flags(self):
> +        """
> +        Insert right and wrong IPv4/IPv6 UDP/TCP/SCTP checksum on the
> +        transmit packet.Enable Checksum offload.
> +        Verify the checksum valid-flags.

This case need to be updated into test plan.
> +        """
> +        mac = self.dut.get_mac_address(self.dut_ports[0])
> +
> +        pkts_ref = {'IP/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP()/UDP()/("X"*46)' % mac,
> +                    'IP/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1")/TCP()/("X"*46)' % mac,
> +                    'IP/SCTP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1")/SCTP()/("X"*48)' % mac,
> +                    'IPv6/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/UDP()/("X"*46)' % mac,
> +                    'IPv6/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/TCP()/("X"*46)' % mac}
> +
> +        result = dict()
> +        if self.kdriver == "fm10k":
> +            del pkts['IP/SCTP']
> +            del pkts_ref['IP/SCTP']
> +
> +        self.checksum_enablehw(self.dut_ports[0])
> +
> +        # get the packet checksum value
> +        result = self.get_chksum_values(pkts_ref)
> +
> +        # set the expected checksum values same with the actual values
> +        pkts_good = {'IP/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(chksum=int(%s))/UDP(chksum=int(%s))/("X"*46)' % (mac, result['IP/UDP'][0], result['IP/UDP'][1]),
> +                     'IP/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=int(%s))/TCP(chksum=int(%s))/("X"*46)' % (mac, result['IP/TCP'][0], result['IP/TCP'][1]),
> +                     'IP/SCTP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=int(%s))/SCTP(chksum=int(%s))/("X"*48)' % (mac, result['IP/SCTP'][0], result['IP/SCTP'][1]),
> +                     'IPv6/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/UDP(chksum=int(%s))/("X"*46)' % (mac, result['IPv6/UDP'][0]),
> +                     'IPv6/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/TCP(chksum=int(%s))/("X"*46)' % (mac, result['IPv6/TCP'][0])}
> +
> +        # set the expected checksum values different from the actual values
> +        pkts_bad = {'IP/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(chksum=0x0)/UDP(chksum=0xf)/("X"*46)' % mac,
> +                    'IP/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=0x0)/TCP(chksum=0xf)/("X"*46)' % mac,
> +                    'IP/SCTP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IP(src="10.0.0.1",chksum=0x0)/SCTP(chksum=0xf)/("X"*48)' % mac,
> +                    'IPv6/UDP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/UDP(chksum=0xf)/("X"*46)' % mac,
> +                    'IPv6/TCP': 'Ether(dst="%s", src="52:00:00:00:00:00")/IPv6(src="::1")/TCP(chksum=0xf)/("X"*46)' % mac}
> +
> +        # send the packet checksum value same with the expected value
> +        self.checksum_valid_flags(result, pkts_good, 1)
> +        # send the packet checksum value different from the expected value
> +        self.checksum_valid_flags(result, pkts_bad, 0)
> +
>       def test_checksum_offload_enable(self):
>           """
>           Insert IPv4/IPv6 UDP/TCP/SCTP checksum on the transmit packet.



More information about the dts mailing list