[dts] [PATCH V1] add test suite link_status_interrupt

Liu, Yong yong.liu at intel.com
Mon Feb 20 08:46:02 CET 2017


Thanks gang, applied. But the patch subject is not align with content in 
it. Look like there's some code optimization in this patch.

On 02/10/2017 02:54 PM, xu,gang wrote:
> Signed-off-by: xu,gang <gangx.xu at intel.com>
> ---
>   tests/TestSuite_link_status_interrupt.py | 288 ++++++++++++++-----------------
>   1 file changed, 132 insertions(+), 156 deletions(-)
>
> diff --git a/tests/TestSuite_link_status_interrupt.py b/tests/TestSuite_link_status_interrupt.py
> index c50a99d..ae205cc 100644
> --- a/tests/TestSuite_link_status_interrupt.py
> +++ b/tests/TestSuite_link_status_interrupt.py
> @@ -1,87 +1,77 @@
> -# <COPYRIGHT_TAG>
> +# BSD LICENSE
> +#
> +# Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
> +# All rights reserved.
> +#
> +# Redistribution and use in source and binary forms, with or without
> +# modification, are permitted provided that the following conditions
> +# are met:
> +#
> +#   * Redistributions of source code must retain the above copyright
> +#     notice, this list of conditions and the following disclaimer.
> +#   * Redistributions in binary form must reproduce the above copyright
> +#     notice, this list of conditions and the following disclaimer in
> +#     the documentation and/or other materials provided with the
> +#     distribution.
> +#   * Neither the name of Intel Corporation nor the names of its
> +#     contributors may be used to endorse or promote products derived
> +#     from this software without specific prior written permission.
> +#
> +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> +
>   
>   """
>   DPDK Test suite.
> -
> -Link Status Detection
> -
> +Test link status.
>   """
>   
> -# NOTE: These tests generally won't work in automated mode since the
> -# link doesn't stay down unless the cable is actually removed. The code
> -# is left here for legacy reasons.
> -
> -
>   import utils
> +import string
> +import time
>   import re
> -
> -testPorts = []
> -intr_mode = ['"intr_mode=random"',
> -             '"intr_mode=msix"', '"intr_mode=legacy"', '']
> -intr_mode_output = ['Error: bad parameter - random', 'Use MSIX interrupt',
> -                    'Use legacy interrupt', 'Use MSIX interrupt by default']
> -
> -
>   from test_case import TestCase
> -
> -#
> -#
> -# Test class.
> -#
> +from packet import Packet, sniff_packets, load_sniff_packets
>   
>   
>   class TestLinkStatusInterrupt(TestCase):
>   
> -    #
> -    #
> -    #
> -    # Test cases.
> -    #
> -
>       def set_up_all(self):
>           """
>           Run at the start of each test suite.
> -
> -
> -        Link Status Interrupt Prerequisites
>           """
> +        self.dut_ports = self.dut.get_ports(self.nic)
> +        self.verify(len(self.dut_ports) >= 2, "Insufficient ports")
> +        cores = self.dut.get_core_list("1S/4C/1T")
> +        self.coremask = utils.create_mask(cores)
> +        self.portmask = utils.create_mask(self.dut_ports)
> +
> +        self.path = "./examples/link_status_interrupt/build/link_status_interrupt"
>   
> -        dutPorts = self.dut.get_ports(self.nic)
> -        self.verify(len(dutPorts) > 1, "Insufficient ports for " + self.nic)
> -        for n in range(2):
> -            testPorts.append(dutPorts[n])
> -            inter = self.tester.get_interface(
> -                self.tester.get_local_port(testPorts[n]))
> -            self.tester.send_expect("ifconfig %s up" % inter, "# ", 5)
> -        out = self.dut.send_expect(
> -            "make -C examples/link_status_interrupt", "# ")
> -        self.verify("Error" not in out, "Compilation error 1")
> -        self.verify("No such file" not in out, "Compilation error 2")
> +        # build sample app
> +        out = self.dut.build_dpdk_apps("./examples/link_status_interrupt")
> +        self.verify("Error" not in out, "compilation error 1")
> +        self.verify("No such file" not in out, "compilation error 2")
>   
>       def set_link_status_and_verify(self, dutPort, status):
>           """
> -        In registered callback...
> -        Event type: LSC interrupt
> -        Port 0 Link Up - speed 10000 Mbps - full-duplex
> -
> -        In registered callback...
> -        Event type: LSC interrupt
> -        Port 0 Link Down
> +        set link status verify results
>           """
> -
> -        inter = self.tester.get_interface(self.tester.get_local_port(dutPort))
> -        self.tester.send_expect("ifconfig %s %s" % (inter, status), "# ", 10)
> -        self.dut.send_expect("", "Port %s Link %s" %
> -                             (dutPort, status.capitalize()), 60)
> -        out = self.dut.send_expect("", "Aggregate statistics", 60)
> -        exp = r"Statistics for port (\d+) -+\r\n" + \
> -            "Link status:\s+Link (up|down)\r\n"
> -        pattern = re.compile(exp)
> -        info = pattern.findall(out)
> -        if info[0][0] == repr(dutPort):
> -            self.verify(info[0][1] == status, "Link status change port error")
> -        else:
> -            self.verify(info[1][1] == status, "Link status change hello error")
> +        self.intf = self.tester.get_interface(
> +            self.tester.get_local_port(dutPort))
> +        self.tester.send_expect("ifconfig %s %s" %
> +                                (self.intf, status.lower()), "# ", 10)
> +        verify_point = "Port %s Link %s" % (dutPort, status)
> +        self.dut.send_expect("", verify_point, 60)
>   
>       def set_up(self):
>           """
> @@ -91,111 +81,97 @@ class TestLinkStatusInterrupt(TestCase):
>   
>       def test_link_status_interrupt_change(self):
>           """
> -        Link status change.
> +        Verify Link status change
>           """
> -
> -        memChannel = self.dut.get_memory_channels()
> -        portMask = utils.create_mask(testPorts)
> -        if self.drivername in ["igb_uio"]:
> -            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
> -                memChannel, portMask)
> -        elif self.drivername in ["vfio-pci"]:
> -            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx  -- -q 2 -p %s" % (
> -                memChannel, portMask)
> -        else:
> -            print "unknow driver"
> -        for n in range(len(intr_mode)):
> -            if self.drivername in ["igb_uio"]:
> -                self.dut.send_expect("rmmod igb_uio", "# ")
> +        if self.drivername == "igb_uio":
> +            cmdline = self.path + " -c %s -n %s -- -p %s " % (
> +                self.coremask, self.dut.get_memory_channels(), self.portmask)
> +            for mode in ["legacy", "msix"]:
> +                self.dut.send_expect("rmmod -f igb_uio", "#", 20)
>                   self.dut.send_expect(
> -                    "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
> -                out = self.dut.send_expect(
> -                    "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
> -                self.verify(
> -                    intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
> -                if n == 0:
> -                    continue
> -            self.dut.send_expect(cmdline, "Aggregate statistics", 605)
> -            self.dut.send_expect(
> -                "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
> -            self.set_link_status_and_verify(testPorts[0], 'down')
> -            self.set_link_status_and_verify(testPorts[0], 'up')
> -            self.set_link_status_and_verify(testPorts[1], 'down')
> -            self.set_link_status_and_verify(testPorts[1], 'up')
> -            self.dut.send_expect("^C", "# ")
> +                    'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
> +                self.dut.bind_interfaces_linux()
> +                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> +                self.set_link_status_and_verify(self.dut_ports[0], 'Down')
> +                self.set_link_status_and_verify(self.dut_ports[0], 'Up')
> +                self.dut.send_expect("^C", "#", 60)
> +        elif self.drivername == "vfio-pci":
> +            for mode in ["legacy", "msi", "msix"]:
> +                cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
> +                    self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
> +                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> +                self.set_link_status_and_verify(self.dut_ports[0], 'Down')
> +                self.set_link_status_and_verify(self.dut_ports[0], 'Up')
> +                self.dut.send_expect("^C", "#", 60)
>   
>       def test_link_status_interrupt_port_available(self):
>           """
> -        Port available.
> +        interrupt all port link, then link them,
> +        sent packet, check packets received.
>           """
> -
> -        memChannel = self.dut.get_memory_channels()
> -        portMask = utils.create_mask(testPorts)
> -        if self.drivername in ["igb_uio"]:
> -            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s -- -q 2 -p %s" % (
> -                memChannel, portMask)
> -        elif self.drivername in ["vfio-pci"]:
> -            cmdline = "./examples/link_status_interrupt/build/link_status_interrupt -c f -n %s --vfio-intr=intx -- -q 2 -p %s " % (
> -                memChannel, portMask)
> -        else:
> -            print "unknow driver"
> -        for n in range(1, len(intr_mode)):
> -            if self.drivername in ["igb_uio"]:
> -                self.dut.send_expect("rmmod igb_uio", "# ")
> +        if self.drivername == "igb_uio":
> +            cmdline = self.path + " -c %s -n %s -- -p %s " % (
> +                self.coremask, self.dut.get_memory_channels(), self.portmask)
> +            for mode in ["legacy", "msix"]:
> +                self.dut.send_expect("rmmod -f igb_uio", "#", 20)
>                   self.dut.send_expect(
> -                    "insmod %s/kmod/igb_uio.ko %s" % (self.target, intr_mode[n]), "# ")
> -                out = self.dut.send_expect(
> -                    "dmesg -c | grep '\<%s\>'" % (intr_mode_output[n]), "# ")
> -                self.verify(
> -                    intr_mode_output[n] in out, "Fail to insmod igb_uio " + intr_mode[n])
> -            self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> -            self.dut.send_expect(
> -                "", "Port %s Link Up.+\r\n" % (testPorts[1]), 5)
> -            self.set_link_status_and_verify(testPorts[0], 'down')
> -            self.set_link_status_and_verify(testPorts[1], 'down')
> -            self.set_link_status_and_verify(testPorts[0], 'up')
> -            self.set_link_status_and_verify(testPorts[1], 'up')
> -            for m in [0, 1]:
> -                txPort = self.tester.get_local_port(testPorts[m])
> -                rxPort = self.tester.get_local_port(testPorts[1 - m])
> -                txItf = self.tester.get_interface(txPort)
> -                rxItf = self.tester.get_interface(rxPort)
> -                self.tester.scapy_background()
> -                self.tester.scapy_append(
> -                    'p = sniff(iface="%s", count=1)' % rxItf)
> -                self.tester.scapy_append('nr_packets=len(p)')
> -                self.tester.scapy_append('RESULT = str(nr_packets)')
> -                self.tester.scapy_foreground()
> -                self.tester.scapy_append(
> -                    'sendp([Ether()/IP()/UDP()/("X"*46)], iface="%s")' % txItf)
> -                self.tester.scapy_execute()
> -                nr_packets = self.tester.scapy_get_result()
> -                self.verify(nr_packets == "1", "Fail to switch L2 frame")
> -            self.dut.send_expect("^C", "# ")
> -
> -    def test_link_status_interrupt_recovery(self):
> -        """
> -        Recovery.
> -        """
> -        if self.drivername in ["igb_uio"]:
> -            self.dut.send_expect("^C", "# ")
> -            self.dut.send_expect("rmmod igb_uio", "# ")
> -            self.dut.send_expect(
> -                "insmod %s/kmod/igb_uio.ko" % (self.target), "# ")
> -            out = self.dut.send_expect(
> -                "dmesg -c | grep '\<Use MSIX interrupt by default\>'", "# ")
> -            self.verify(
> -                'Use MSIX interrupt by default' in out, "Fail to recovery default igb_uio")
> -        elif self.drivername in ["vfio-pci"]:
> -            self.verify(Ture, "not need run this case, when used vfio driver")
> -        else:
> -            print "unknow driver"
> +                    'insmod %s/kmod/igb_uio.ko "intr_mode=%s"' % (self.target, mode), "# ")
> +                self.dut.bind_interfaces_linux()
> +                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> +                for port in self.dut_ports:
> +                    self.set_link_status_and_verify(
> +                        self.dut_ports[port], 'Down')
> +                time.sleep(10)
> +                for port in self.dut_ports:
> +                    self.set_link_status_and_verify(self.dut_ports[port], 'Up')
> +                self.scapy_send_packet(1)
> +                out = self.dut.get_session_output(timeout=60)
> +                pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
> +                pkt_received = re.findall(
> +                    "Total packets received:\s*(\d*)", out)
> +                self.verify(pkt_send == pkt_received == '1',
> +                            "Error: sent packets != received packets")
> +                self.dut.send_expect("^C", "#", 60)
> +        elif self.drivername == "vfio-pci":
> +            for mode in ["legacy", "msi", "msix"]:
> +                cmdline = self.path + " -c %s -n %s --vfio-intr=%s -- -p %s" % (
> +                    self.coremask, self.dut.get_memory_channels(), mode, self.portmask)
> +                self.dut.send_expect(cmdline, "Aggregate statistics", 60)
> +                for port in self.dut_ports:
> +                    self.set_link_status_and_verify(
> +                        self.dut_ports[port], 'Down')
> +                time.sleep(10)
> +                for port in self.dut_ports:
> +                    self.set_link_status_and_verify(self.dut_ports[port], 'Up')
> +                self.scapy_send_packet(1)
> +                out = self.dut.get_session_output(timeout=60)
> +                pkt_send = re.findall("Total packets sent:\s*(\d*)", out)
> +                pkt_received = re.findall(
> +                    "Total packets received:\s*(\d*)", out)
> +                self.verify(pkt_send == pkt_received == '1',
> +                            "Error: sent packets != received packets")
> +                self.dut.send_expect("^C", "#", 60)
> +
> +    def scapy_send_packet(self, num=1):
> +        """
> +        Send a packet to port
> +        """
> +        self.dmac = self.dut.get_mac_address(self.dut_ports[0])
> +        txport = self.tester.get_local_port(self.dut_ports[0])
> +        self.txItf = self.tester.get_interface(txport)
> +        pkt = Packet(pkt_type='UDP')
> +        pkt.config_layer('ether', {'dst': self.dmac})
> +        pkt.send_pkt(tx_port=self.txItf, count=num)
>   
>       def tear_down(self):
>           """
>           Run after each test case.
>           """
> -        pass
> +        self.dut.kill_all()
> +        time.sleep(2)
> +        for port in self.dut_ports:
> +            intf = self.tester.get_interface(self.tester.get_local_port(port))
> +            self.tester.send_expect("ifconfig %s up" % intf, "# ", 10)
>   
>       def tear_down_all(self):
>           """



More information about the dts mailing list