[dts] [PATCH]tests: add VLAN ethertype script

Lijuan Tu lijuanx.a.tu at intel.com
Mon Jun 13 10:54:41 CEST 2016


Signed-off-by: Lijuan Tu <lijuanx.a.tu at intel.com>
---
 tests/TestSuite_vlan_ethertype_config.py | 293 +++++++++++++++++++++++++++++++
 1 file changed, 293 insertions(+)
 create mode 100644 tests/TestSuite_vlan_ethertype_config.py

diff --git a/tests/TestSuite_vlan_ethertype_config.py b/tests/TestSuite_vlan_ethertype_config.py
new file mode 100644
index 0000000..99d7752
--- /dev/null
+++ b/tests/TestSuite_vlan_ethertype_config.py
@@ -0,0 +1,293 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+#   * Redistributions of source code must retain the above copyright
+#     notice, this list of conditions and the following disclaimer.
+#   * Redistributions in binary form must reproduce the above copyright
+#     notice, this list of conditions and the following disclaimer in
+#     the documentation and/or other materials provided with the
+#     distribution.
+#   * Neither the name of Intel Corporation nor the names of its
+#     contributors may be used to endorse or promote products derived
+#     from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+"""
+DPDK Test suite.
+
+Test the support of VLAN Offload Features by Poll Mode Drivers.
+
+"""
+
+import dts
+import time
+
+
+from test_case import TestCase
+from pmd_output import PmdOutput
+from packet import Packet, sniff_packets, load_sniff_packets
+from scapy.utils import struct, socket, wrpcap, rdpcap
+#from scapy.all import *
+class TestVlan(TestCase):
+
+    def set_up_all(self):
+        """
+        Run at the start of each test suite.
+
+
+        Vlan Prerequistites
+        """
+        global dutRxPortId
+        global dutTxPortId
+
+        # Based on h/w type, choose how many ports to use
+        ports = self.dut.get_ports()
+
+        # Verify that enough ports are available
+        self.verify(len(ports) >= 2, "Insufficient ports")
+
+        valports = [_ for _ in ports if self.tester.get_local_port(_) != -1]
+        dutRxPortId = valports[0]
+        dutTxPortId = valports[1]
+        port = self.tester.get_local_port(dutTxPortId)
+        self.rxItf = self.tester.get_interface(port)
+
+        self.portmask = dts.create_mask(valports[:2])
+
+
+    def start_tcpdump(self, rxItf):
+
+        self.tester.alt_session.send_expect("rm -rf /tmp/getPkgByTcpdump_%s.cap" % rxItf, "#")
+        self.tester.alt_session.send_expect("tcpdump -i %s -w /tmp/getPkgByTcpdump_%s.cap" % (rxItf, rxItf), "listening on")
+
+    def get_tcpdump_package(self, rxItf):
+        out = self.tester.alt_session.send_expect("^C", "#")
+        out = self.tester.send_expect("tcpdump -nn -e -v -r /tmp/getPkgByTcpdump_%s.cap 2> /dev/null" % rxItf, "#")
+        return out
+
+    def vlan_send_packet(self, vid, tpid="8100", num=1):
+        """
+        Send $num of packet to portid, if vid is -1, it means send pakcage not include vlan id.
+        """
+        # The package stream : testTxPort->dutRxPort->dutTxport->testRxPort
+        port = self.tester.get_local_port(dutRxPortId)
+        self.txItf = self.tester.get_interface(port)
+        self.smac = self.tester.get_mac(port)
+
+        port = self.tester.get_local_port(dutTxPortId)
+        self.rxItf = self.tester.get_interface(port)
+
+        # the package dect mac must is dut tx port id when the port promisc is off
+        self.dmac = self.dut.get_mac_address(dutRxPortId)
+
+        self.inst = sniff_packets(self.rxItf)
+        # FIXME  send a burst with only num packet
+        if vid == -1:
+            pkt = Packet(pkt_type='UDP')
+            pkt.config_layer('ether', {'dst': self.dmac, 'src': self.smac})
+        else:
+            pkt = Packet(pkt_type='VLAN_UDP')
+            pkt.config_layer('ether', {'dst': self.dmac, 'src': self.smac})
+            pkt.config_layer('vlan', {'vlan': vid})
+            if tpid != "8100":
+                self.tpid_ori_file = "/tmp/tpid_ori.pcap"
+                self.tpid_new_file = "/tmp/tpid_new.pcap"
+                pkt.pktgen.write_pcap("%s" % self.tpid_ori_file)
+                fmt = '1/1 "%02x"'
+                out = self.tester.send_expect("hexdump -ve '%s' '%s' |sed 's/8100/%s/' |xxd -r -p > '%s'" % (fmt, self.tpid_ori_file, tpid, self.tpid_new_file),"# ")
+                pkt.pktgen.pkt = pkt.pktgen.read_pcap("%s" % self.tpid_new_file)
+        
+        pkt.send_pkt(tx_port=self.txItf)
+
+    def set_up(self):
+        """
+        Run before each test case.
+        """
+        self.vlan = 51
+        self.tpid = "8100"
+        self.pmdout = PmdOutput(self.dut)
+        self.pmdout.start_testpmd("Default", "--portmask=%s" % self.portmask)
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+
+    def test_vlan_change_tpid(self):
+        """
+        Test Case 1: change VLAN TPID
+        """
+        if self.kdriver == "fm10k":
+            print dts.RED("fm10k not support this case\n")
+            return
+        self.dut.send_expect("set fwd rxonly", "testpmd> ")
+        self.dut.send_expect("set verbose 1", "testpmd> ")
+        self.dut.send_expect("start", "testpmd> ")
+        self.dut.send_expect("vlan set filter off %s" % dutRxPortId, "testpmd> ")
+        self.dut.send_expect("vlan set strip on %s" % dutRxPortId, "testpmd> ", 20)
+        self.tpid = "a100"
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.dut.get_session_output()
+        self.verify("PKT_RX_VLAN_PKT" in out, "Wrong vlan:" + str(out)) 
+
+    def test_vlan_filter_on_off(self):
+        """
+        Disable receipt of VLAN packets
+        """
+        self.dut.send_expect("set fwd mac", "testpmd> ")
+        self.dut.send_expect("start", "testpmd> ")
+        self.dut.send_expect("vlan set strip off %s" % dutRxPortId, "testpmd> ", 20)
+        # test vlan filter on
+        self.dut.send_expect("vlan set filter on  %s" % dutRxPortId, "testpmd> ")
+        
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify(self.tpid not in out, "Wrong vlan:" + str(out))
+
+        # test vlan filter off
+        self.dut.send_expect("vlan set filter off  %s" % dutRxPortId, "testpmd> ")
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify("%s" %self.tpid in out, "Wrong vlan:" + str(out))
+
+    def test_vlan_add_vlan_tag(self):
+        """
+        test adding VLAN Tag Identifier with changing VLAN TPID
+        """
+        self.dut.send_expect("set fwd mac", "testpmd> ")
+        self.dut.send_expect("vlan set filter on  %s" % dutRxPortId, "testpmd> ")
+        self.dut.send_expect("rx_vlan add %d %s" % (self.vlan, dutRxPortId), "testpmd> ")
+        self.dut.send_expect("vlan set strip off %s" % dutRxPortId, "testpmd> ", 20)
+        self.dut.send_expect("start", "testpmd> ")
+
+        self.tpid = "8100"
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.get_tcpdump_package(self.rxItf)
+
+        self.verify("%s" %self.vlan in out, "Vlan not found:" + str(out))
+        self.verify("%s" %self.tpid in out, "Wrong vlan:" + str(out))
+
+        self.tpid = "a100"
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify("%s" %self.tpid in out, "Wrong vlan:" + str(out))
+        self.verify("%x" %self.vlan in out, "Vlan not found:" + str(out))
+        self.dut.send_expect("rx_vlan rm %d %d" % (self.vlan, dutRxPortId), "testpmd> ", 30)
+        self.dut.send_expect("stop", "testpmd> ", 30)
+
+
+    def test_vlan_strip(self):
+        """
+        Test Case 4: test VLAN header striping with changing VLAN TPID
+        """
+        self.dut.send_expect("set fwd mac", "testpmd> ")
+        self.dut.send_expect("vlan set filter off %s" % dutRxPortId, "testpmd> ")
+        self.dut.send_expect("vlan set strip on %s" % dutRxPortId, "testpmd> ", 20)
+        self.dut.send_expect("start", "testpmd> ", 20)
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify(self.tpid not in out, "Wrong vlan:" + str(out))
+
+        self.tpid = "a100"
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify(self.tpid not in out, "Wrong vlan:" + str(out))
+
+        self.dut.send_expect("vlan set strip off %s" % dutRxPortId, "testpmd> ", 20)
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(self.vlan, self.tpid)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify("%x" %self.vlan in out, "Vlan not found:" + str(out))
+
+    def test_vlan_enable_vlan_insertion(self):
+        """
+        test VLAN header inserting with changing VLAN TPID
+        """
+        self.dut.send_expect("set fwd mac", "testpmd> ")
+        self.dut.send_expect("vlan set filter off %s" % dutRxPortId, "testpmd> ")
+        self.dut.send_expect("vlan set strip off %s" % dutRxPortId, "testpmd> ", 20)
+        self.dut.send_expect("start", "testpmd> ")
+        self.dut.send_expect("tx_vlan set %s %d" % (dutTxPortId, self.vlan), "testpmd> ")
+        
+        self.tpid = "8100"
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(-1)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify(self.tpid in out, "Wrong vlan:" + str(out))
+        self.verify("%s" % self.vlan in out, "Vlan not found:" + str(out))
+
+        self.tpid = "a100"
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(-1)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify(self.tpid in out, "Wrong vlan:" + str(out))
+        self.verify("%x" % self.vlan in out, "Vlan not found:" + str(out))
+
+        self.dut.send_expect("tx_vlan reset %s" % dutTxPortId, "testpmd> ", 30)
+        self.start_tcpdump(self.rxItf)
+        self.vlan_send_packet(-1)
+        out = self.get_tcpdump_package(self.rxItf)
+        self.verify(self.tpid not in out, "Wrong vlan:" + str(out))
+
+        if self.kdriver == "fm10k":
+            netobj = self.dut.ports_info[dutTxPortId]['port']
+            # not delete vlan for self.vlan will used later
+            netobj.delete_txvlan(vlan_id = self.vlan)
+
+    def _test_vlan_qinq_tpid(self):
+        """
+        Test Case 6: Change S-Tag and C-Tag within QinQ
+        """
+        self.dut.send_expect("vlan set qinq on %d" %dutTxPortId, "testpmd> ", 20)
+        self.dut.send_expect("set verbose 1", "testpmd> ")
+        self.dut.send_expect("set fwd rxonly", "testpmd> ")
+        self.dut.send_expect("start", "testpmd> ")
+        self.dut.send_expect("rx_vlan add %d %s" % (self.vlan, dutRxPortId), "testpmd> ")
+        self.dut.send_expect("vlan set outer tpid 0x%s %s" % (self.tpid, dutRxPortId), "testpmd> ")
+        self.dut.send_expect("vlan set filter off  %s" % dutRxPortId, "testpmd> ")
+        self.dut.send_expect("vlan set strip off %s" % dutRxPortId, "testpmd> ", 20)
+ 
+
+    def tear_down(self):
+        """
+        Run after each test case.
+        """
+        self.dut.send_expect("stop", "testpmd> ", 30)
+        self.dut.send_expect("quit", "# ", 30)
+        pass
+
+    def tear_down_all(self):
+        """
+        Run after each test suite.
+        """
+        self.dut.kill_all()
+        if self.kdriver == "fm10k":
+            netobj = self.dut.ports_info[dutRxPortId]['port']
+            netobj.delete_txvlan(vlan_id = self.vlan)
+            netobj.delete_vlan(vlan_id = self.vlan)
-- 
1.9.3



More information about the dts mailing list