[dts] [PATCH V3 1/2] add ip_pipeline config_file test code

Xu, HuilongX huilongx.xu at intel.com
Fri Sep 2 03:08:37 CEST 2016


Hi gang,
Pls see my comments as below.

> -----Original Message-----
> From: dts [mailto:dts-bounces at dpdk.org] On Behalf Of xu,gang
> Sent: Monday, August 29, 2016 5:41 PM
> To: dts at dpdk.org
> Cc: Xu, GangX
> Subject: [dts] [PATCH V3 1/2] add ip_pipeline config_file test code
> 
> Signed-off-by: xu,gang <gangx.xu at intel.com>
> ---
>  tests/TestSuite_ip_pipeline_config_file.py | 254
> +++++++++++++++++++++++++++++
>  1 file changed, 254 insertions(+)
>  create mode 100644 tests/TestSuite_ip_pipeline_config_file.py
> 
> diff --git a/tests/TestSuite_ip_pipeline_config_file.py
> b/tests/TestSuite_ip_pipeline_config_file.py
> new file mode 100644
> index 0000000..f2fe1fc
> --- /dev/null
> +++ b/tests/TestSuite_ip_pipeline_config_file.py
> @@ -0,0 +1,254 @@
> +#BSD LICENSE
> +#
> +# Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
> +# All rights reserved.
> +#
> +# Redistribution and use in source and binary forms, with or without
> +# modification, are permitted provided that the following conditions
> +# are met:
> +#
> +#   * Redistributions of source code must retain the above copyright
> +#     notice, this list of conditions and the following disclaimer.
> +#   * Redistributions in binary form must reproduce the above copyright
> +#     notice, this list of conditions and the following disclaimer in
> +#     the documentation and/or other materials provided with the
> +#     distribution.
> +#   * Neither the name of Intel Corporation nor the names of its
> +#     contributors may be used to endorse or promote products derived
> +#     from this software without specific prior written permission.
> +#
> +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> +
> +
> +"""
> +DPDK Test suite.
> +Test ip_pipeline_configuration file.
> +"""
> +
> +import dts
> +import string
> +import time
> +import re
> +import os
> +from test_case import TestCase
> +from plotting import Plotting
> +from settings import HEADER_SIZE
> +from etgen import IxiaPacketGenerator
> +from packet import Packet, sniff_packets, load_sniff_packets
> +
> +class Test_ip_pipeline_configuration_file(TestCase):
> +
> +    def set_up_all(self):
> +        """
> +        Run at the start of each test suite.
> +        """
> +        self.dut_ports = self.dut.get_ports(self.nic)
> +        self.verify(len(self.dut_ports) >= 4, "Insufficient ports")
> +        cores = self.dut.get_core_list("1S/4C/1T")
> +        self.coremask = dts.create_mask(cores)
> +
> +        self.dut.send_expect("chmod 777 /root/","#",30)
           Why you want update /root path permissions? If you must do it, pls use ~ to replace /root
           Because not all os the root path is /root. 
> +        con = self.dut.send_expect("sed -n '510,520p'
> config/common_base","Compile librte_table",30)
> +        self.verify("CONFIG_RTE_PORT_PCAP=y" in con,
> "CONFIG_RTE_PORT_PCAP=n")
           You should change CONFIG_RTE_PORT_PCAP value to "y" in your test suite.
> +
> +        #unpack test configuration
> +        cwd = os.getcwd()
> +        self.tester.send_expect("cd %s/dep/" % cwd,"# ",50)
> +        result = self.tester.send_expect("tar xzvf
> ip_pipeline.tar.gz","# ",50)
> +        self.verify("error" not in result, "ip_pipeline unpack fail ")
> +
           Maybe you can use "update_ip_pipeline_config" for replace "package_ip_pipeline"
           Because you change config file in function package_ip_pipeline.
> +        self.packag_ip_pipeline()
> +
> +        #copy configuration file to dut and unpack
> +        self.dut.session.copy_file_to("dep/ip_pipeline.tar.gz")
> +        put = self.dut.send_expect("tar -zxvf ~/ip_pipeline.tar.gz -C
> ~","#",500)
> +        #self.verify("error" not in put, "ip_pipeline unpack fail")
           A suggest:
                You can get config file list in setup_all function, not in every case.
                Because your case only used different file in different path.
                The config file list as below:
                self.confile_path = '~/ip_pipeline/'
                self.confile_dict = {"ip_pipeline_cfg_postive":["ip_pipeline_cfg/postive/file1"," ip_pipeline_cfg/postive/file2",...,],
                                "ip_pipeline_cfg_negative ":[" ip_pipeline_cfg/negative/file3"," ip_pipeline_cfg/negative/file4",...,],
                                 ...,
                               }
                In case ip_pipeline_cfg you can design it as below:
                     if "ip_pipeline_cfg_postive" in self.confie_dict:
                         for file in  self.confile_dict["ip_pipeline_cfg_postive"]:
                              confile = self.confile_path + file
                              cmdline = self.path + '-f %s' % confile
                              ...
                     else:
                          self. Verify(false, "no ip_pipeline_cfg positive configfile")
> +
> +        self.path = "./examples/ip_pipeline/build/ip_pipeline"
> +
> +        # build sample app
> +        out = self.dut.build_dpdk_apps("./examples/ip_pipeline")
> +        self.verify("Error" not in out, "compilation error 1")
> +        self.verify("No such file" not in out, "compilation error 2")
> +
> +    def set_up(self):
> +        """
> +        Run before each test case.
> +        """
> +        pass
> +
> +    def replace_pci(self,filename,old,new):
> +        """
> +        modify the file by new code
> +        """
> +        try:
> +            lines = open(filename,'r').readlines()
> +            flen = len(lines)
> +            for i in range(flen):
> +                if old in lines[i]:
> +                    lines[i] = lines[i].replace(old,new)
> +            open(filename,'w').writelines(lines)
> +        except Exception,e:
> +            print e
> +
> +    def packag_ip_pipeline(self):
> +        """
> +        modify the ip_pipeline cfg file's pci and pack again
> +        """
> +        po_rootdir =
> "dep/ip_pipeline/ip_pipeline_link_identification/postive/"
> +        for po_parent,po_dirnames,po_filenames in os.walk(po_rootdir):
> +            for i in po_filenames:
> +                filename = po_parent + i
> +                self.replace_pci(filename,"port0","%s" %
> self.dut.ports_info[0]['pci'])
> +                self.replace_pci(filename,"port1","%s" %
> self.dut.ports_info[1]['pci'])
> +                self.replace_pci(filename,"port2","%s" %
> self.dut.ports_info[2]['pci'])
> +                self.replace_pci(filename,"port3","%s" %
> self.dut.ports_info[3]['pci'])
> +        ne_rootdir =	
> "dep/ip_pipeline/ip_pipeline_link_identification/negative/"
> +        for ne_parent,ne_dirnames,ne_filenames in os.walk(ne_rootdir):
> +            for i in ne_filenames:
> +                filename = ne_parent + i
> +                self.replace_pci(filename,"port0","%s" %
> self.dut.ports_info[0]['pci'])
> +                self.replace_pci(filename,"port1","%s" %
> self.dut.ports_info[1]['pci'])
> +                self.replace_pci(filename,"port2","%s" %
> self.dut.ports_info[2]['pci'])
> +                self.replace_pci(filename,"port3","%s" %
> self.dut.ports_info[3]['pci'])
> +        self.tester.send_expect("rm -rf ip_pipeline.tar.gz","#",10)
> +        result = self.tester.send_expect("tar -czvf ip_pipeline.tar.gz
> ip_pipeline/","#",50)
> +        self.verify("error" not in result, "ip_pipeline compression
> fail")
> +        self.tester.send_expect("cd ../","#",10)
> +
> +    def scapy_send_package(self,num):
> +        """
> +        Send a packet to port
> +        """
> +        txport = self.tester.get_local_port(self.dut_ports[0])
> +        mac = self.dut.get_mac_address(self.dut_ports[0])
> +        txItf = self.tester.get_interface(txport)
> +
> self.tester.scapy_append('sendp([Ether(dst="%s")/IP()/UDP()/Raw(\'X\'*18
> )], iface="%s",count=%s)' % (mac, txItf,num))
> +        self.tester.scapy_execute()
> +
> +    def test_ip_pipeline_cfg(self):
> +        cfg_po_rootdir = "dep/ip_pipeline/ip_pipeline_cfg/postive/"
> +        for cfg_po_parent,cfg_po_dirnames,cfg_po_filenames in
> os.walk(cfg_po_rootdir):
> +            for cfg_po_filename in cfg_po_filenames:
> +                cmd = self.path + " -p 0x3 -f
> ~/ip_pipeline/ip_pipeline_cfg/postive/%s" % cfg_po_filename
> +                self.dut.send_expect(cmd,"[PIPELINE1]",50)
> +                out = self.dut.send_expect("quit","# ",10)
> +                self.verify("Bye!" in out, "Test failed on %s" %
> cfg_po_filename)
> +        cfg_ne_rootdir = "dep/ip_pipeline/ip_pipeline_cfg/negative/"
> +        for cfg_ne_parent,cfg_ne_dirnames,cfg_ne_filenames in
> os.walk(cfg_ne_rootdir):
> +            for cfg_ne_filename in cfg_ne_filenames:
> +                cmd = self.path + " -p 0x3 -f
> ~/ip_pipeline/ip_pipeline_cfg/negative/%s" % cfg_ne_filename
> +                out = self.dut.send_expect(cmd,"# ",50)
> +                self.dut.send_expect("quit","# ",10)
> +                self.verify("error" in out, "Test failed on %s" %
> cfg_ne_filename)
> +
> +    def test_ip_pipeline_cpu_utilization(self):
> +        cmd = self.path + " -p 0xf -
> f ./examples/ip_pipeline/config/edge_router_downstream.cfg -
> s ./examples/ip_pipeline/config/edge_router_downstream.sh"
> +        self.dut.send_expect(cmd,"#p 1 route ls",50)
> +        self.scapy_send_package(1000)
> +        self.dut.send_expect("","pipeline> ",5)
> +        out = self.dut.send_expect("t 1 headroom","pipeline> ",10)
> +        self.dut.send_expect("quit","# ",10)
> +        self.verify(out == "100%" , "headroom error")
> +
> +    def test_ip_pipeline_link_identification(self):
> +        link_po_rootdir =
> "dep/ip_pipeline/ip_pipeline_link_identification/postive/"
> +        for link_po_parent,link_po_dirnames,link_po_filenames in
> os.walk(link_po_rootdir):
> +            for link_po_filename in link_po_filenames:
> +                if "_f" in link_po_filename:
> +                    cmd = self.path + " -f
> ~/ip_pipeline/ip_pipeline_link_identification/postive/%s" %
> link_po_filename
> +                    self.dut.send_expect(cmd,"[PIPELINE1]",50)
> +                    out = self.dut.send_expect("quit","# ",10)
> +                    self.verify("Bye!" in out, "Test failed on %s" %
> link_po_filename)
> +                elif "_pf" in link_po_filename:
> +                    cmd = self.path + " -p f -f
> ~/ip_pipeline/ip_pipeline_link_identification/postive/%s" %
> link_po_filename
> +                    self.dut.send_expect(cmd,"[PIPELINE1]",50)
> +                    out = self.dut.send_expect("quit","# ",10)
> +                    self.verify("Bye!" in out, "Test failed on %s" %
> link_po_filename)
> +        link_ne_rootdir =
> "dep/ip_pipeline/ip_pipeline_link_identification/negative/"
> +        for link_ne_parent,link_ne_dirnames,link_ne_filenames in
> os.walk(link_ne_rootdir):
> +            for link_ne_filename in link_ne_filenames:
> +                if "_f" in link_ne_filename:
> +                    cmd = self.path + " -p 0x3 -f
> ~/ip_pipeline/ip_pipeline_link_identification/negative/%s" %
> link_ne_filename
> +                    out = self.dut.send_expect(cmd,"# ",50)
> +                    self.dut.send_expect("quit","# ",10)
> +                    self.verify("error" in out, "Test failed on %s" %
> link_ne_filename)
> +                elif "_1p" in link_ne_filename:
> +                    cmd = self.path + " -p 1 -f
> ~/ip_pipeline/ip_pipeline_link_identification/negative/%s" %
> link_ne_filename
> +                    out = self.dut.send_expect(cmd,"# ",50)
> +                    self.dut.send_expect("quit","# ",10)
> +                    self.verify("error" in out, "Test failed on %s" %
> link_ne_filename)
> +
> +                elif "_3p" in link_ne_filename:
> +                    cmd = self.path + " -p 3 -f
> ~/ip_pipeline/ip_pipeline_link_identification/negative/%s" %
> link_ne_filename
> +                    out = self.dut.send_expect(cmd,"#",50)
> +                    self.dut.send_expect("quit","# ",10)
> +                    self.verify("error" in out, "Test failed on %s" %
> link_ne_filename)
> +
> +
> +    def test_ip_pipeline_parser_cleanup(self):
> +        parser_po_rootdir =
> "dep/ip_pipeline/ip_pipeline_parser_cleanup/postive/"
> +        for parser_po_parent,parser_po_dirnames,parser_po_filenames in
> os.walk(parser_po_rootdir):
> +            for parser_po_filename in parser_po_filenames:
> +                cmd = self.path + " -p 0x3 -f
> ~/ip_pipeline/ip_pipeline_parser_cleanup/postive/%s" %
> parser_po_filename
> +                self.dut.send_expect(cmd,"[PIPELINE1]",50)
> +                out = self.dut.send_expect("quit","# ",10)
> +                self.verify("Bye!" in out, "Test failed on %s" %
> parser_po_filename)
> +
> +        parser_ne_rootdir =
> "dep/ip_pipeline/ip_pipeline_parser_cleanup/negative/"
> +        for parser_ne_parent,parser_ne_dirnames,parser_ne_filenames in
> os.walk(parser_ne_rootdir):
> +            for parser_ne_filename in parser_ne_filenames:
> +                cmd = self.path + " -p 0x3 -f
> ~/ip_pipeline/ip_pipeline_parser_cleanup/negative/%s" %
> parser_ne_filename
> +                out = self.dut.send_expect(cmd,"# ",50)
> +                self.verify("error" in out, "Test failed on %s" %
> parser_ne_filename)
> +
> +    def test_ip_pipeline_source_sink(self):
> +        source_sink_po_rootdir =
> "dep/ip_pipeline/ip_pipeline_source_sink/postive/"
> +        for
> source_sink_po_parent,source_sink_po_dirnames,source_sink_po_filenames
> in os.walk(source_sink_po_rootdir):
> +            for source_sink_po_filename in source_sink_po_filenames:
> +                self.dut.send_expect("cp ~/ip_pipeline/*.pcap ~","#",50)
> +                if "_com" in source_sink_po_filename:
> +                    cmd = self.path + " -f
> ~/ip_pipeline/ip_pipeline_source_sink/postive/%s" %
> source_sink_po_filename
> +                    self.dut.send_expect(cmd,"PORT: Dumped",50)
> +                    time.sleep(10)
> +                    out = self.dut.send_expect("quit","# ",10)
> +                    self.verify("Bye!" in out, "Test failed on %s" %
> source_sink_po_filename)
> +                else:
> +                    cmd = self.path + " -f
> ~/ip_pipeline/ip_pipeline_source_sink/postive/%s" %
> source_sink_po_filename
> +                    self.dut.send_expect(cmd,"[PIPELINE1]",50)
> +                    out = self.dut.send_expect("quit","# ",10)
> +                    self.verify("Bye!" in out, "Test failed on %s" %
> source_sink_po_filename)
> +                self.dut.send_expect("rm -rf ~/*.pcap","#",50)
> +
> +
> +        source_sink_ne_rootdir =
> "dep/ip_pipeline/ip_pipeline_source_sink/negative/"
> +        for
> source_sink_ne_parent,source_sink_ne_dirnames,source_sink_ne_filenames
> in os.walk(source_sink_ne_rootdir):
> +            for source_sink_ne_filename in source_sink_ne_filenames:
> +                self.dut.send_expect("cp ~/ip_pipeline/*.pcap ~","#",50)
> +                cmd = self.path + " -f
> ~/ip_pipeline/ip_pipeline_source_sink/negative/%s" %
> source_sink_ne_filename
> +                out = self.dut.send_expect(cmd,"# ",50)
> +                self.verify("error" in out, "Test failed on %s" %
> source_sink_ne_filename)
> +                self.dut.send_expect("rm -rf ~/*.pcap","#",50)
> +
> +    def tear_down(self):
> +        """
> +        Run after each test case.
> +        """
> +        self.dut.kill_all()
> +        time.sleep(2)
> +
> +    def tear_down_all(self):
> +        """
> +        Run after each test suite.
> +        """
          You should set CONFIG_RTE_PORT_PCAP=n and compile dpdk again, in this function.
> +        pass
> --
> 1.9.3



More information about the dts mailing list