[dts] [PATCH V1 3/3][scapy vxlan] tests: update vxlan name from Vxlan to VXLAN

Zhu, ShuaiX shuaix.zhu at intel.com
Tue Jul 23 11:14:12 CEST 2019


Tested-by: Zhu, ShuaiX <shuaix.zhu at intel.com>

> -----Original Message-----
> From: dts [mailto:dts-bounces at dpdk.org] On Behalf Of lihong
> Sent: Tuesday, July 23, 2019 8:51 AM
> To: dts at dpdk.org
> Cc: Ma, LihongX <lihongx.ma at intel.com>
> Subject: [dts] [PATCH V1 3/3][scapy vxlan] tests: update vxlan name from Vxlan
> to VXLAN
> 
> depend on dep/vxlan.py, update Vxlan to VXLAN
> 
> Signed-off-by: lihong <lihongx.ma at intel.com>
> ---
>  tests/TestSuite_cloud_filter.py              |  8 +++----
>  tests/TestSuite_generic_flow_api.py          | 16 ++++++-------
>  tests/TestSuite_tso.py                       |  4 ++--
>  tests/TestSuite_vxlan.py                     | 34
> ++++++++++++++--------------
>  tests/TestSuite_vxlan_gpe_support_in_i40e.py | 12 +++++++---
>  tests/TestSuite_vxlan_sample.py              | 20 ++++++++--------
>  6 files changed, 50 insertions(+), 44 deletions(-)
> 
> diff --git a/tests/TestSuite_cloud_filter.py b/tests/TestSuite_cloud_filter.py
> index d4fe129..74b3be7 100644
> --- a/tests/TestSuite_cloud_filter.py
> +++ b/tests/TestSuite_cloud_filter.py
> @@ -17,12 +17,12 @@ from packet import Packet, load_pcapfile  from
> scapy.layers.inet import UDP, IP  from scapy.packet import split_layers,
> bind_layers
> 
> -from vxlan import Vxlan
> +from vxlan import VXLAN
>  from vxlan import VXLAN_PORT
> 
>  CLOUD_PORT = 8472
> -split_layers(UDP, Vxlan, dport=VXLAN_PORT) -bind_layers(UDP, Vxlan,
> dport=CLOUD_PORT)
> +split_layers(UDP, VXLAN, dport=VXLAN_PORT) bind_layers(UDP, VXLAN,
> +dport=CLOUD_PORT)
> 
>  #
>  #
> @@ -350,7 +350,7 @@ class TestCloudFilter(TestCase):
>                  dport = cap_pkt[UDP].dport
>                  self.verify(dport == CLOUD_PORT,
>                              "Captured packet is not vxlan packet")
> -                inner_ip = cap_pkt[Vxlan][IP].dst
> +                inner_ip = cap_pkt['VXLAN'][IP].dst
>                  self.verify(inner_ip == cloud_cfg.cf_rule['iip'],
>                              "Inner ip not matched")
>              except:
> diff --git a/tests/TestSuite_generic_flow_api.py
> b/tests/TestSuite_generic_flow_api.py
> index a7810fd..3daf77f 100644
> --- a/tests/TestSuite_generic_flow_api.py
> +++ b/tests/TestSuite_generic_flow_api.py
> @@ -154,7 +154,7 @@ class TestGeneric_flow_api(TestCase):
>          dir_module = cwd + r'/' + 'dep'
>          self.tester.scapy_append('sys.path.append("%s")' % dir_module)
>          if module == "vxlan":
> -            self.tester.scapy_append("from vxlan import Vxlan")
> +            self.tester.scapy_append("from vxlan import VXLAN")
>          elif module == "nvgre":
>              self.tester.scapy_append('from nvgre import NVGRE')
> 
> @@ -312,7 +312,7 @@ class TestGeneric_flow_api(TestCase):
>                  rule_created = 1
> 
>                  # Enable vxlan packet sending
> -                if "Vxlan" in flow_pkt:
> +                if "VXLAN" in flow_pkt:
>                      self.load_module("vxlan")
>                  elif "NVGRE" in flow_pkt:
>                      self.load_module("nvgre") @@ -569,10 +569,10 @@
> class TestGeneric_flow_api(TestCase):
>                  if 'vni' in flows:
>                      vni = self.generate_random_int(0, MAX_VLAN)
>                      flow_str += "vni is %d " % vni
> -                    pkt += "/Vxlan(vni=%d)" % vni
> +                    pkt += "/VXLAN(vni=%d)" % vni
>                      extrapacket['vni'] = str(vni)
>                  else:
> -                    pkt += "/Vxlan()"
> +                    pkt += "/VXLAN()"
>              elif flow_type == "nvgre":
>                  flow_str += "/ nvgre "
>                  if 'tni' in flows:
> @@ -1515,22 +1515,22 @@ class TestGeneric_flow_api(TestCase):
> 
>          self.load_module("vxlan")
>          self.tester.scapy_append(
> -
> 'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan()/Ether(dst="%s")/Dot1Q(vlan=11)/I
> P()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, self.inner_mac,
> self.tester_itf))
> +
> + 'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN()/Ether(dst="%s")/Dot1Q(vlan=
> + 11)/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac,
> + self.inner_mac, self.tester_itf))
>          self.verify_result("pf", expect_rxpkts="1",
> expect_queue=extrapkt_rulenum['queue'][0], verify_mac=self.outer_mac)
> 
>          self.load_module("vxlan")
>          self.tester.scapy_append(
> -
> 'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan(vni=5)/Ether(dst="%s")/IP()/TCP()/R
> aw("x" * 20)], iface="%s")' % (self.outer_mac, self.wrong_mac, self.tester_itf))
> +
> + 'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN(vni=5)/Ether(dst="%s")/IP()/T
> + CP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac, self.wrong_mac,
> + self.tester_itf))
>          self.verify_result("pf", expect_rxpkts="1", expect_queue="0",
> verify_mac=self.outer_mac)
> 
>          self.load_module("vxlan")
>          self.tester.scapy_append(
> -
> 'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan(vni=%s)/Ether(dst="%s")/Dot1Q(vlan
> =%s)/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac,
> extra_packet[5]['vni'], self.wrong_mac, extra_packet[5]['invlan'], self.tester_itf))
> +
> + 'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN(vni=%s)/Ether(dst="%s")/Dot1Q
> + (vlan=%s)/IP()/TCP()/Raw("x" * 20)], iface="%s")' % (self.outer_mac,
> + extra_packet[5]['vni'], self.wrong_mac, extra_packet[5]['invlan'],
> + self.tester_itf))
>          self.verify_result("vf0", expect_rxpkts="1", expect_queue="0",
> verify_mac=self.outer_mac)
> 
>          self.load_module("vxlan")
>          self.tester.scapy_append(
> -
> 'sendp([Ether(dst="%s")/IP()/UDP()/Vxlan(vni=%s)/Ether(dst="%s")/IP()/TCP()/
> Raw("x" * 20)], iface="%s")' % (self.wrong_mac, extra_packet[6]['vni'],
> self.inner_mac, self.tester_itf))
> +
> + 'sendp([Ether(dst="%s")/IP()/UDP()/VXLAN(vni=%s)/Ether(dst="%s")/IP()/
> + TCP()/Raw("x" * 20)], iface="%s")' % (self.wrong_mac,
> + extra_packet[6]['vni'], self.inner_mac, self.tester_itf))
>          self.verify_result("vf1", expect_rxpkts="0", expect_queue="NULL",
> verify_mac=self.wrong_mac)
>          rule_num = extrapkt_rulenum['rulenum']
>          self.verify_rulenum(rule_num)
> diff --git a/tests/TestSuite_tso.py b/tests/TestSuite_tso.py index
> 34294cc..36e00fe 100644
> --- a/tests/TestSuite_tso.py
> +++ b/tests/TestSuite_tso.py
> @@ -104,7 +104,7 @@ class TestTSO(TestCase):
>          cwd = os.getcwd()
>          dir_module = cwd + r'/' + 'dep'
>          self.tester.scapy_append('sys.path.append("%s")' % dir_module)
> -        self.tester.scapy_append("from vxlan import Vxlan")
> +        self.tester.scapy_append("from vxlan import VXLAN")
>          self.tester.scapy_append('from nvgre import NVGRE')
> 
>      def tcpdump_start_sniffing(self, ifaces=[]):
> @@ -300,7 +300,7 @@ class TestTSO(TestCase):
>              self.tcpdump_start_sniffing([tx_interface, rx_interface])
>              self.load_module()
>              out = self.dut.send_expect("clear port info all", "testpmd> ", 120)
> -
> self.tester.scapy_append('sendp([Ether(dst="%s",src="52:00:00:00:00:00")/IP(sr
> c="192.168.1.1",dst="192.168.1.2")/UDP(sport=1021,dport=4789)/Vxlan()/Ether(
> dst="%s",src="52:00:00:00:00:00")/IP(src="192.168.1.1",dst="192.168.1.2")/TCP(s
> port=1021,dport=1021)/("X"*%s)], iface="%s")' % (mac, mac, loading_size,
> tx_interface))
> +
> + self.tester.scapy_append('sendp([Ether(dst="%s",src="52:00:00:00:00:00
> + ")/IP(src="192.168.1.1",dst="192.168.1.2")/UDP(sport=1021,dport=4789)/
> + VXLAN()/Ether(dst="%s",src="52:00:00:00:00:00")/IP(src="192.168.1.1",d
> + st="192.168.1.2")/TCP(sport=1021,dport=1021)/("X"*%s)], iface="%s")' %
> + (mac, mac, loading_size, tx_interface))
>              out = self.tester.scapy_execute()
>              out = self.dut.send_expect("show port stats all", "testpmd> ",
> 120)
>              print out
> diff --git a/tests/TestSuite_vxlan.py b/tests/TestSuite_vxlan.py index
> 12ab458..4183169 100644
> --- a/tests/TestSuite_vxlan.py
> +++ b/tests/TestSuite_vxlan.py
> @@ -17,7 +17,7 @@ from scapy.utils import wrpcap, rdpcap  from
> scapy.layers.inet import Ether, IP, TCP, UDP  from scapy.layers.inet6 import IPv6
> from scapy.layers.l2 import Dot1Q -from vxlan import Vxlan
> +from vxlan import VXLAN
>  from scapy.layers.sctp import SCTP, SCTPChunkData  from scapy.sendrecv
> import sniff  from scapy.config import conf @@ -178,7 +178,7 @@ class
> VxlanTestConfig(object):
>              outer[UDP].chksum = 1
> 
>          if self.outer_udp_dst == VXLAN_PORT:
> -            self.pkt = outer / Vxlan(vni=self.vni) / inner
> +            self.pkt = outer / VXLAN(vni=self.vni) / inner
>          else:
>              self.pkt = outer / ("X" * self.payload_size)
> 
> @@ -207,8 +207,8 @@ class VxlanTestConfig(object):
>          if payload.guess_payload_class(payload).name == "IP":
>              chk_sums['outer_ip'] = hex(payload[IP].chksum)
> 
> -        if pkts[0].haslayer(Vxlan) == 1:
> -            inner = pkts[0][Vxlan]
> +        if pkts[0].haslayer('VXLAN') == 1:
> +            inner = pkts[0]['VXLAN']
>              if inner.haslayer(IP) == 1:
>                  chk_sums['inner_ip'] = hex(inner[IP].chksum)
>                  if inner[IP].proto == 6:
> @@ -240,7 +240,7 @@ class VxlanTestConfig(object):
>          cwd = os.getcwd()
>          dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
>          self.test_case.tester.scapy_append("sys.path.append('%s')" %
> dir_vxlan_module)
> -        self.test_case.tester.scapy_append("from vxlan import Vxlan")
> +        self.test_case.tester.scapy_append("from vxlan import VXLAN")
>          self.test_case.tester.scapy_append(
>              'pcap = rdpcap("%s")' % self.pcap_file)
>          self.test_case.tester.scapy_append(
> @@ -341,29 +341,29 @@ class TestVxlan(TestCase, IxiaPacketGenerator):
>          self.tunnel_perf = [
>              {'Packet': 'Normal', 'tunnel_filter': 'None',
>                  'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'None',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'None',
>                  'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan',
>                  'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan-tenid',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan-tenid',
>                  'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-tenid',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-tenid',
>                  'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac',
>                  'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'omac-imac-tenid',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'omac-imac-tenid',
>                  'recvqueue': 'Single', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'None',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'None',
>                  'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan',
>                  'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-ivlan-tenid',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-ivlan-tenid',
>                  'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac-tenid',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac-tenid',
>                  'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter': 'imac',
> +            {'Packet': 'VXLAN', 'tunnel_filter': 'imac',
>                  'recvqueue': 'Multi', 'Mpps': {}, 'pct': {}},
> -            {'Packet': 'Vxlan', 'tunnel_filter':
> +            {'Packet': 'VXLAN', 'tunnel_filter':
>                  'omac-imac-tenid', 'recvqueue': 'Multi'}
>          ]
> 
> diff --git a/tests/TestSuite_vxlan_gpe_support_in_i40e.py
> b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
> index f87c541..aa3ee82 100644
> --- a/tests/TestSuite_vxlan_gpe_support_in_i40e.py
> +++ b/tests/TestSuite_vxlan_gpe_support_in_i40e.py
> @@ -43,7 +43,7 @@ from pmd_output import PmdOutput  from scapy.utils
> import wrpcap, rdpcap  from scapy.layers.inet import Ether, IP, UDP  from
> scapy.layers.l2 import Dot1Q -from vxlan import Vxlan
> +from vxlan import VXLAN
>  from scapy.config import conf
>  from test_case import TestCase
>  from settings import FOLDERS
> @@ -110,7 +110,7 @@ class VxlanGpeTestConfig(object):
>          outer[UDP].dport = self.outer_udp_dst
> 
>          if self.outer_udp_dst == VXLAN_GPE_PORT:
> -            self.pkt = outer / Vxlan(vni=self.vni) / inner
> +            self.pkt = outer / VXLAN(vni=self.vni) / inner
>          else:
>              self.pkt = outer / ("X" * self.payload_size)
> 
> @@ -124,7 +124,7 @@ class VxlanGpeTestConfig(object):
>          cwd = os.getcwd()
>          dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
>          self.test_case.tester.scapy_append("sys.path.append('%s')" %
> dir_vxlan_module)
> -        self.test_case.tester.scapy_append("from vxlan import Vxlan")
> +        self.test_case.tester.scapy_append("from vxlan import VXLAN")
>          self.test_case.tester.scapy_append(
>              'pcap = rdpcap("%s")' % self.pcap_file)
>          self.test_case.tester.scapy_append(
> @@ -231,6 +231,10 @@ class TestVxlanGpeSupportInI40e(TestCase):
>          # send one VXLAN-GPE type packet
>          packet = 'sendp([Ether(dst="%s")/IP(src="18.0.0.1")/UDP(dport=%d,
> sport=43)/' % (mac, VXLAN_GPE_PORT) + \
>                   'VXLAN(flags=12)/IP(src="10.0.0.1")], iface="%s", count=1)' %
> self.tester_iface
> +        cwd = os.getcwd()
> +        dir_vxlan_module = cwd + r'/' + FOLDERS['Depends']
> +        self.tester.scapy_append("sys.path.append('%s')" %
> dir_vxlan_module)
> +        self.tester.scapy_append("from vxlan import VXLAN")
>          self.tester.scapy_append(packet)
>          self.tester.scapy_execute()
>          out = self.dut.get_session_output(timeout=5)
> @@ -239,6 +243,8 @@ class TestVxlanGpeSupportInI40e(TestCase):
> 
>          # delete the VXLAN-GPE packet type, testpmd should treat the packet
> as a normal UDP packet
>          self.pmdout.execute_cmd('port config 0 udp_tunnel_port rm
> vxlan-gpe %s' % VXLAN_GPE_PORT)
> +        self.tester.scapy_append("sys.path.append('%s')" %
> dir_vxlan_module)
> +        self.tester.scapy_append("from vxlan import VXLAN")
>          self.tester.scapy_append(packet)
>          self.tester.scapy_execute()
>          out = self.dut.get_session_output(timeout=5)
> diff --git a/tests/TestSuite_vxlan_sample.py b/tests/TestSuite_vxlan_sample.py
> index 2033866..5bc9269 100644
> --- a/tests/TestSuite_vxlan_sample.py
> +++ b/tests/TestSuite_vxlan_sample.py
> @@ -54,7 +54,7 @@ from scapy.utils import wrpcap, rdpcap  from
> scapy.layers.inet import Ether, IP, TCP, UDP  from scapy.layers.inet6 import IPv6
> from scapy.layers.l2 import Dot1Q -from vxlan import Vxlan
> +from vxlan import VXLAN
>  from scapy.layers.sctp import SCTP, SCTPChunkData  from scapy.sendrecv
> import sniff  from scapy.config import conf @@ -310,10 +310,10 @@ class
> TestVxlanSample(TestCase):
> 
>              pkts = self.transfer_capture_file()
>              self.verify(len(pkts) >= 1, "Failed to capture packets")
> -            self.verify(pkts[0].haslayer(Vxlan) == 1,
> +            self.verify(pkts[0].haslayer('VXLAN') == 1,
>                          "Packet not encapsulated")
>              try:
> -                payload = str(pkts[0][UDP][Vxlan][UDP].payload)
> +                payload = str(pkts[0][UDP]['VXLAN'][UDP].payload)
>                  for i in range(18):
>                      self.verify(ord(payload[i]) == 88, "Check udp data
> failed")
>              except:
> @@ -344,7 +344,7 @@ class TestVxlanSample(TestCase):
>              pkts = self.transfer_capture_file()
>              # check packet number and payload
>              self.verify(len(pkts) >= 1, "Failed to capture packets")
> -            self.verify(pkts[0].haslayer(Vxlan) == 0,
> +            self.verify(pkts[0].haslayer('VXLAN') == 0,
>                          "Packet not de-encapsulated")
> 
>              try:
> @@ -380,10 +380,10 @@ class TestVxlanSample(TestCase):
> 
>              # check packet number and payload
>              self.verify(len(pkts) >= 1, "Failed to capture packets")
> -            self.verify(pkts[0].haslayer(Vxlan) == 1,
> +            self.verify(pkts[0].haslayer('VXLAN') == 1,
>                          "Packet not encapsulated")
>              try:
> -                payload = str(pkts[0][UDP][Vxlan][UDP].payload)
> +                payload = str(pkts[0][UDP]['VXLAN'][UDP].payload)
>                  for i in range(18):
>                      self.verify(ord(payload[i]) == 88, "Check udp data
> failed")
>              except:
> @@ -428,7 +428,7 @@ class TestVxlanSample(TestCase):
>              pkts = self.transfer_capture_file()
>              # check packet number and payload
>              self.verify(len(pkts) >= 1, "Failed to capture packets")
> -            self.verify(pkts[0].haslayer(Vxlan) == 1,
> +            self.verify(pkts[0].haslayer('VXLAN') == 1,
>                          "Packet not encapsulated")
>              chksums = vxlan_pkt.get_chksums(pcap='vxlan_cap.pcap')
>              print utils.GREEN("Checksum : %s" % chksums) @@ -466,7 +466,7
> @@ class TestVxlanSample(TestCase):
> 
>              # calculation  checksum, and check it
>              for pkt in pkts:
> -                inner = pkt[Vxlan]
> +                inner = pkt['VXLAN']
>                  inner_ip_chksum = inner[IP].chksum
>                  del inner.chksum
>                  inner[IP] = inner[IP].__class__(str(inner[IP]))
> @@ -484,10 +484,10 @@ class TestVxlanSample(TestCase):
> 
>              length = 0
>              for pkt in pkts:
> -                self.verify(pkt.haslayer(Vxlan) == 1,
> +                self.verify(pkt.haslayer('VXLAN') == 1,
>                              "Packet not encapsulated")
>                  try:
> -                    payload = str(pkt[UDP][Vxlan][TCP].payload)
> +                    payload = str(pkt[UDP]['VXLAN'][TCP].payload)
>                      self.verify(len(payload) <= self.def_mss,
>                                  "TCP payload oversized")
>                      length += len(payload)
> --
> 2.7.4



More information about the dts mailing list