[dts] [PATCH V1 3/3][scapy vxlan] tests: update vxlan name from Vxlan to VXLAN

lihong lihongx.ma at intel.com
Tue Jul 23 02:50:53 CEST 2019


depend on dep/vxlan.py, update Vxlan to VXLAN

Signed-off-by: lihong <lihongx.ma at intel.com>
---
 tests/TestSuite_cloud_filter.py              |  8 +++----
 tests/TestSuite_generic_flow_api.py          | 16 ++++++-------
 tests/TestSuite_tso.py                       |  4 ++--
 tests/TestSuite_vxlan.py                     | 34 ++++++++++++++--------------
 tests/TestSuite_vxlan_gpe_support_in_i40e.py | 12 +++++++---
 tests/TestSuite_vxlan_sample.py              | 20 ++++++++--------
 6 files changed, 50 insertions(+), 44 deletions(-)

diff --git a/tests/TestSuite_cloud_filter.py b/tests/TestSuite_cloud_filter.py
index d4fe129..74b3be7 100644
--- a/tests/TestSuite_cloud_filter.py
+++ b/tests/TestSuite_cloud_filter.py
@@ -17,12 +17,12 @@ from packet import Packet, load_pcapfile
 from scapy.layers.inet import UDP, IP
 from scapy.packet import split_layers, bind_layers
 
-from vxlan import Vxlan
+from vxlan import VXLAN
 from vxlan import VXLAN_PORT
 
 CLOUD_PORT = 8472
-split_layers(UDP, Vxlan, dport=VXLAN_PORT)
-bind_layers(UDP, Vxlan, dport=CLOUD_PORT)
+split_layers(UDP, VXLAN, dport=VXLAN_PORT)
+bind_layers(UDP, VXLAN, dport=CLOUD_PORT)
 
 #
 #
@@ -350,7 +350,7 @@ class TestCloudFilter(TestCase):
                 dport = cap_pkt[UDP].dport
                 self.verify(dport == CLOUD_PORT,
                             "Captured packet is not vxlan packet")
-                inner_ip = cap_pkt[Vxlan][IP].dst
+                inner_ip = cap_pkt['VXLAN'][IP].dst
                 self.verify(inner_ip == cloud_cfg.cf_rule['iip'],
                             "Inner ip not matched")
             except:
diff --git a/tests/TestSuite_generic_flow_api.py b/tests/TestSuite_generic_flow_api.py
index a7810fd..3daf77f 100644
--- a/tests/TestSuite_generic_flow_api.py
+++ b/tests/TestSuite_generic_flow_api.py
@@ -154,7 +154,7 @@ class TestGeneric_flow_api(TestCase):
         dir_module = cwd + r'/' + 'dep'
         self.tester.scapy_append('sys.path.append("%s")' % dir_module)
         if module == "vxlan":
-            self.tester.scapy_append("from vxlan import Vxlan")
+            self.tester.scapy_append("from vxlan import VXLAN")
         elif module == "nvgre":
             self.tester.scapy_append('from nvgre import NVGRE')
 
@@ -312,7 +312,7 @@ class TestGeneric_flow_api(TestCase):
                 rule_created = 1
 
                 # Enable vxlan packet sending
-                if "Vxlan" in flow_pkt:
+                if "VXLAN" in flow_pkt:
                     self.load_module("vxlan")
                 elif "NVGRE" in flow_pkt:
                     self.load_module("nvgre")
@@ -569,10 +569,10 @@ class TestGeneric_flow_api(TestCase):
                 if 'vni' in flows:
                     vni = self.generate_random_int(0, MAX_VLAN)
                     flow_str += "vni is %d " % vni
-                    pkt += "/Vxlan(vni=%d)" % vni
+                    pkt += "/VXLAN(vni=%d)" % vni
                     extrapacket['vni'] = str(vni)
                 else:
-                    pkt += "/Vxlan()"
+                    pkt += "/VXLAN()"
             elif flow_type == "nvgre":
                 flow_str += "/ nvgre "
                 if 'tni' in flows:
@@ -1515,22 +1515,22 @@ class TestGeneric_flow_api(TestCase):
 
         self.load_module("vxlan")
         self.tester.scapy_append(
-            'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan()/Ether(dst="%s")/Dot1Q(vlan=11)/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, self.inner_mac, self.tester_itf))
+            'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN()/Ether(dst="%s")/Dot1Q(vlan=11)/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, self.inner_mac, self.tester_itf))
         self.verify_result("pf", expect_rxpkts="1", expect_queue=extrapkt_rulenum['queue'][0], verify_mac=self.outer_mac)
 
         self.load_module("vxlan")
         self.tester.scapy_append(
-            'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan(vni=5)/Ether(dst="%s")/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, self.wrong_mac, self.tester_itf))
+            'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN(vni=5)/Ether(dst="%s")/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, self.wrong_mac, self.tester_itf))
         self.verify_result("pf", expect_rxpkts="1", expect_queue="0", verify_mac=self.outer_mac)
 
         self.load_module("vxlan")
         self.tester.scapy_append(
-            'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan(vni=%s)/Ether(dst="%s")/Dot1Q(vlan=%s)/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, extra_packet[5]['vni'], self.wrong_mac, extra_packet[5]['invlan'], self.tester_itf))
+            'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN(vni=%s)/Ether(dst="%s")/Dot1Q(vlan=%s)/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, extra_packet[5]['vni'], self.wrong_mac, extra_packet[5]['invlan'], self.tester_itf))
         self.verify_result("vf0", expect_rxpkts="1", expect_queue="0", verify_mac=self.outer_mac)
 
         self.load_module("vxlan")
         self.tester.scapy_append(
-            'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan(vni=%s)/Ether(dst="%s")/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.wrong_mac, extra_packet[6]['vni'], self.inner_mac, self.tester_itf))
+            'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN(vni=%s)/Ether(dst="%s")/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.wrong_mac, extra_packet[6]['vni'], self.inner_mac, self.tester_itf))
         self.verify_result("vf1", expect_rxpkts="0", expect_queue="NULL", verify_mac=self.wrong_mac)
         rule_num = extrapkt_rulenum['rulenum']
         self.verify_rulenum(rule_num)
diff --git a/tests/TestSuite_tso.py b/tests/TestSuite_tso.py
index 34294cc..36e00fe 100644
--- a/tests/TestSuite_tso.py
+++ b/tests/TestSuite_tso.py
@@ -104,7 +104,7 @@ class TestTSO(TestCase):
         cwd = os.getcwd()
         dir_module = cwd + r'/' + 'dep'
         self.tester.scapy_append('sys.path.append("%s")' % dir_module)
-        self.tester.scapy_append("from vxlan import Vxlan")
+        self.tester.scapy_append("from vxlan import VXLAN")
         self.tester.scapy_append('from nvgre import NVGRE')
 
     def tcpdump_start_sniffing(self, ifaces=[]):
@@ -300,7 +300,7 @@ class TestTSO(TestCase):
             self.tcpdump_start_sniffing([tx_interface, rx_interface])
             self.load_module()
             out = self.dut.send_expect("clear port info all", "testpmd> ", 120)
-            self.tester.scapy_append('sendp([Ether(dst="%s",src="52:00:00:00:00:00")/IP(src="192.168.1.1",dst="192.168.1.2")/UDP(sport=1021,dport=4789)/Vxlan()/Ether(dst="%s",src="52:00:00:00:00:00")/IP(src="192.168.1.1",dst="192.168.1.2")/TCP(sport=1021,dport=1021)/("X"*%s)], iface="%s")' % (mac, mac, loading_size, tx_interface))
+            self.tester.scapy_append('sendp([Ether(dst="%s",src="52:00:00:00:00:00")/IP(src="192.168.1.1",dst="192.168.1.2")/UDP(sport=1021,dport=4789)/VXLAN()/Ether(dst="%s",src="52:00:00:00:00:00")/IP(src="192.168.1.1",dst="192.168.1.2")/TCP(sport=1021,dport=1021)/("X"*%s)], iface="%s")' % (mac, mac, loading_size, tx_interface))
             out = self.tester.scapy_execute()
             out = self.dut.send_expect("show port stats all", "testpmd> ", 120)
             print out
diff --git a/tests/TestSuite_vxlan.py b/tests/TestSuite_vxlan.py
index 12ab458..4183169 100644
--- a/tests/TestSuite_vxlan.py
+++ b/tests/TestSuite_vxlan.py
@@ -17,7 +17,7 @@ from scapy.utils import wrpcap, rdpcap
 from scapy.layers.inet import Ether, IP, TCP, UDP
 from scapy.layers.inet6 import IPv6
 from scapy.layers.l2 import Dot1Q
-from vxlan import Vxlan
+from vxlan import VXLAN
 from scapy.layers.sctp import SCTP, SCTPChunkData
 from scapy.sendrecv import sniff
 from scapy.config import conf
@@ -178,7 +178,7 @@ class VxlanTestConfig(object):
             outer[UDP].chksum = 1
 
         if self.outer_udp_dst == VXLAN_PORT:
-            self.pkt = outer / Vxlan(vni=self.vni) / inner
+            self.pkt = outer / VXLAN(vni=self.vni) / inner
         else:
             self.pkt = outer / ("X" * self.payload_size)
 
@@ -207,8 +207,8 @@ class VxlanTestConfig(object):
         if payload.guess_payload_class(payload).name == "IP":
             chk_sums['outer_ip'] = hex(payload[IP].chksum)
 
-        if pkts[0].haslayer(Vxlan) == 1:
-            inner = pkts[0][Vxlan]
+        if pkts[0].haslayer('VXLAN') == 1:
+            inner = pkts[0]['VXLAN']
             if inner.haslayer(IP) == 1:
                 chk_sums['inner_ip'] = hex(inner[IP].chksum)
                 if inner[IP].proto == 6:
@@ -240,7 +240,7 @@ class VxlanTestConfig(object):
         cwd = os.getcwd()
         dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
         self.test_case.tester.scapy_append("sys.path.append('%s')" % dir_vxlan_module)
-        self.test_case.tester.scapy_append("from vxlan import Vxlan")
+        self.test_case.tester.scapy_append("from vxlan import VXLAN")
         self.test_case.tester.scapy_append(
             'pcap = rdpcap("%s")' % self.pcap_file)
         self.test_case.tester.scapy_append(
@@ -341,29 +341,29 @@ class TestVxlan(TestCase, IxiaPacketGenerator):
         self.tunnel_perf = [
             {'Packet': 'Normal', 'tunnel_filter': 'None',
                 'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'None',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'None',
                 'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan',
                 'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan-tenid',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan-tenid',
                 'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-tenid',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-tenid',
                 'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac',
                 'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'omac-imac-tenid',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'omac-imac-tenid',
                 'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'None',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'None',
                 'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan',
                 'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan-tenid',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan-tenid',
                 'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-tenid',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-tenid',
                 'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter': 'imac',
+            {'Packet': 'VXLAN', 'tunnel_filter': 'imac',
                 'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
-            {'Packet': 'Vxlan', 'tunnel_filter':
+            {'Packet': 'VXLAN', 'tunnel_filter':
                 'omac-imac-tenid', 'recvqueue': 'Multi'}
         ]
 
diff --git a/tests/TestSuite_vxlan_gpe_support_in_i40e.py b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
index f87c541..aa3ee82 100644
--- a/tests/TestSuite_vxlan_gpe_support_in_i40e.py
+++ b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
@@ -43,7 +43,7 @@ from pmd_output import PmdOutput
 from scapy.utils import wrpcap, rdpcap
 from scapy.layers.inet import Ether, IP, UDP
 from scapy.layers.l2 import Dot1Q
-from vxlan import Vxlan
+from vxlan import VXLAN
 from scapy.config import conf
 from test_case import TestCase
 from settings import FOLDERS
@@ -110,7 +110,7 @@ class VxlanGpeTestConfig(object):
         outer[UDP].dport = self.outer_udp_dst
 
         if self.outer_udp_dst == VXLAN_GPE_PORT:
-            self.pkt = outer / Vxlan(vni=self.vni) / inner
+            self.pkt = outer / VXLAN(vni=self.vni) / inner
         else:
             self.pkt = outer / ("X" * self.payload_size)
 
@@ -124,7 +124,7 @@ class VxlanGpeTestConfig(object):
         cwd = os.getcwd()
         dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
         self.test_case.tester.scapy_append("sys.path.append('%s')" % dir_vxlan_module)
-        self.test_case.tester.scapy_append("from vxlan import Vxlan")
+        self.test_case.tester.scapy_append("from vxlan import VXLAN")
         self.test_case.tester.scapy_append(
             'pcap = rdpcap("%s")' % self.pcap_file)
         self.test_case.tester.scapy_append(
@@ -231,6 +231,10 @@ class TestVxlanGpeSupportInI40e(TestCase):
         # send one VXLAN-GPE type packet
         packet = 'sendp([Ether(dst="%s")/IP(src="18.0.0.1")/UDP(dport=%d, sport=43)/' % (mac, VXLAN_GPE_PORT) + \
                  'VXLAN(flags=12)/IP(src="10.0.0.1")], iface="%s", count=1)' % self.tester_iface
+        cwd = os.getcwd()
+        dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
+        self.tester.scapy_append("sys.path.append('%s')" % dir_vxlan_module)
+        self.tester.scapy_append("from vxlan import VXLAN")
         self.tester.scapy_append(packet)
         self.tester.scapy_execute()
         out = self.dut.get_session_output(timeout=5)
@@ -239,6 +243,8 @@ class TestVxlanGpeSupportInI40e(TestCase):
 
         # delete the VXLAN-GPE packet type, testpmd should treat the packet as a normal UDP packet
         self.pmdout.execute_cmd('port config 0 udp_tunnel_port rm vxlan-gpe %s' % VXLAN_GPE_PORT)
+        self.tester.scapy_append("sys.path.append('%s')" % dir_vxlan_module)
+        self.tester.scapy_append("from vxlan import VXLAN")
         self.tester.scapy_append(packet)
         self.tester.scapy_execute()
         out = self.dut.get_session_output(timeout=5)
diff --git a/tests/TestSuite_vxlan_sample.py b/tests/TestSuite_vxlan_sample.py
index 2033866..5bc9269 100644
--- a/tests/TestSuite_vxlan_sample.py
+++ b/tests/TestSuite_vxlan_sample.py
@@ -54,7 +54,7 @@ from scapy.utils import wrpcap, rdpcap
 from scapy.layers.inet import Ether, IP, TCP, UDP
 from scapy.layers.inet6 import IPv6
 from scapy.layers.l2 import Dot1Q
-from vxlan import Vxlan
+from vxlan import VXLAN
 from scapy.layers.sctp import SCTP, SCTPChunkData
 from scapy.sendrecv import sniff
 from scapy.config import conf
@@ -310,10 +310,10 @@ class TestVxlanSample(TestCase):
 
             pkts = self.transfer_capture_file()
             self.verify(len(pkts) >= 1, "Failed to capture packets")
-            self.verify(pkts[0].haslayer(Vxlan) == 1,
+            self.verify(pkts[0].haslayer('VXLAN') == 1,
                         "Packet not encapsulated")
             try:
-                payload = str(pkts[0][UDP][Vxlan][UDP].payload)
+                payload = str(pkts[0][UDP]['VXLAN'][UDP].payload)
                 for i in range(18):
                     self.verify(ord(payload[i]) == 88, "Check udp data failed")
             except:
@@ -344,7 +344,7 @@ class TestVxlanSample(TestCase):
             pkts = self.transfer_capture_file()
             # check packet number and payload
             self.verify(len(pkts) >= 1, "Failed to capture packets")
-            self.verify(pkts[0].haslayer(Vxlan) == 0,
+            self.verify(pkts[0].haslayer('VXLAN') == 0,
                         "Packet not de-encapsulated")
 
             try:
@@ -380,10 +380,10 @@ class TestVxlanSample(TestCase):
 
             # check packet number and payload
             self.verify(len(pkts) >= 1, "Failed to capture packets")
-            self.verify(pkts[0].haslayer(Vxlan) == 1,
+            self.verify(pkts[0].haslayer('VXLAN') == 1,
                         "Packet not encapsulated")
             try:
-                payload = str(pkts[0][UDP][Vxlan][UDP].payload)
+                payload = str(pkts[0][UDP]['VXLAN'][UDP].payload)
                 for i in range(18):
                     self.verify(ord(payload[i]) == 88, "Check udp data failed")
             except:
@@ -428,7 +428,7 @@ class TestVxlanSample(TestCase):
             pkts = self.transfer_capture_file()
             # check packet number and payload
             self.verify(len(pkts) >= 1, "Failed to capture packets")
-            self.verify(pkts[0].haslayer(Vxlan) == 1,
+            self.verify(pkts[0].haslayer('VXLAN') == 1,
                         "Packet not encapsulated")
             chksums = vxlan_pkt.get_chksums(pcap='vxlan_cap.pcap')
             print utils.GREEN("Checksum : %s" % chksums)
@@ -466,7 +466,7 @@ class TestVxlanSample(TestCase):
 
             # calculation  checksum, and check it
             for pkt in pkts:
-                inner = pkt[Vxlan]
+                inner = pkt['VXLAN']
                 inner_ip_chksum = inner[IP].chksum
                 del inner.chksum
                 inner[IP] = inner[IP].__class__(str(inner[IP]))
@@ -484,10 +484,10 @@ class TestVxlanSample(TestCase):
 
             length = 0
             for pkt in pkts:
-                self.verify(pkt.haslayer(Vxlan) == 1,
+                self.verify(pkt.haslayer('VXLAN') == 1,
                             "Packet not encapsulated")
                 try:
-                    payload = str(pkt[UDP][Vxlan][TCP].payload)
+                    payload = str(pkt[UDP]['VXLAN'][TCP].payload)
                     self.verify(len(payload) <= self.def_mss,
                                 "TCP payload oversized")
                     length += len(payload)
-- 
2.7.4



More information about the dts mailing list