[dts] [PATCH V1] tests/vf_port_start_stop: use Packet to send pkt
Tu, Lijuan
lijuan.tu at intel.com
Tue Oct 22 11:06:33 CEST 2019
Applied, thanks
> -----Original Message-----
> From: dts [mailto:dts-bounces at dpdk.org] On Behalf Of lihong
> Sent: Tuesday, October 15, 2019 9:25 AM
> To: dts at dpdk.org
> Cc: Ma, LihongX <lihongx.ma at intel.com>
> Subject: [dts] [PATCH V1] tests/vf_port_start_stop: use Packet to send pkt
>
> Signed-off-by: lihong <lihongx.ma at intel.com>
> ---
> tests/TestSuite_vf_port_start_stop.py | 44 +++++++++++------------------------
> 1 file changed, 14 insertions(+), 30 deletions(-)
>
> diff --git a/tests/TestSuite_vf_port_start_stop.py
> b/tests/TestSuite_vf_port_start_stop.py
> index dcd799b..945c671 100644
> --- a/tests/TestSuite_vf_port_start_stop.py
> +++ b/tests/TestSuite_vf_port_start_stop.py
> @@ -6,11 +6,7 @@ import time
> from virt_common import VM
> from test_case import TestCase
> from pmd_output import PmdOutput
> -from utils import RED, GREEN
> -from net_device import NetDevice
> -from crb import Crb
> -from scapy.all import *
> -from scapy.layers.sctp import SCTP, SCTPChunkData
> +from packet import Packet
> VM_CORES_MASK = 'all'
>
> class TestVfPortStartStop(TestCase):
> @@ -22,10 +18,8 @@ class TestVfPortStartStop(TestCase):
> self.dut_ports = self.dut.get_ports(self.nic)
> self.verify(len(self.dut_ports) >= 1, "Insufficient ports")
> self.vm0 = None
> - self.filename = "/tmp/vf.pcap"
> self.tester_tx_port = self.tester.get_local_port(self.dut_ports[0])
> self.tester_tintf = self.tester.get_interface(self.tester_tx_port)
> - self.send_pks_session = None
> # set vf assign method and vf driver
> self.vf_driver = self.get_suite_cfg()['vf_driver']
> if self.vf_driver is None:
> @@ -40,6 +34,8 @@ class TestVfPortStartStop(TestCase):
> def set_up(self):
>
> self.setup_1pf_2vf_1vm_env_flag = 0
> + self.send_pks_session = None
> + self.pkts = Packet()
>
> def send_and_verify(self, dst_mac, testpmd):
> """
> @@ -50,33 +46,23 @@ class TestVfPortStartStop(TestCase):
> src_mac = self.tester.get_mac(self.tester_tx_port)
> if src_mac == 'N/A':
> src_mac = "02:00:00:00:01"
> - self.send_pkts(self.filename, dst_mac, src_mac)
> + self.send_pkts(dst_mac, src_mac)
> time.sleep(1)
> self.check_port_start_stop(testpmd)
> - self.tester.send_expect('killall -s INT scapy', '# ')
> - self.tester.destroy_session(self.send_pks_session)
> - self.send_pks_session = None
>
> - def send_pkts(self, filename, dst_mac, src_mac):
> + def send_pkts(self, dst_mac, src_mac):
> """
> Generates a valid PCAP file with the given configuration.
> """
> - def_pkts = {'IP/UDP': Ether(dst="%s" % dst_mac, src="%s" %
> src_mac)/IP(src="127.0.0.2")/UDP()/("X"*46),
> - 'IP/TCP': Ether(dst="%s" % dst_mac, src="%s" %
> src_mac)/IP(src="127.0.0.2")/TCP()/("X"*46),
> - 'IP/SCTP': Ether(dst="%s" % dst_mac, src="%s" %
> src_mac)/IP(src="127.0.0.2")/SCTP()/("X"*48),
> - 'IPv6/UDP': Ether(dst="%s" % dst_mac, src="%s" %
> src_mac)/IPv6(src="::2")/UDP()/("X"*46),
> - 'IPv6/TCP': Ether(dst="%s" % dst_mac, src="%s" %
> src_mac)/IPv6(src="::2")/TCP()/("X"*46),}
> + def_pkts = {'IP/UDP': 'Ether(dst="%s",
> src="%s")/IP(src="127.0.0.2")/UDP()/("X"*46)' % (dst_mac, src_mac),
> + 'IP/TCP': 'Ether(dst="%s",
> src="%s")/IP(src="127.0.0.2")/TCP()/("X"*46)' % (dst_mac, src_mac),
> + 'IP/SCTP': 'Ether(dst="%s",
> src="%s")/IP(src="127.0.0.2")/SCTP()/("X"*48)' % (dst_mac, src_mac),
> + 'IPv6/UDP': 'Ether(dst="%s",
> src="%s")/IPv6(src="::2")/UDP()/("X"*46)' % (dst_mac, src_mac),
> + 'IPv6/TCP': 'Ether(dst="%s",
> src="%s")/IPv6(src="::2")/TCP()/("X"*46)' % (dst_mac, src_mac),}
>
> - pkts = []
> for key in def_pkts.keys():
> - pkts.append(def_pkts[key])
> - wrpcap(filename, pkts)
> -
> - sendp_fmt = "sendp(pk, iface='%s', loop=1)" % (self.tester_tintf)
> - self.send_pks_session = self.tester.create_session("scapy1")
> - self.send_pks_session.send_expect("scapy", ">>>")
> - self.send_pks_session.send_expect("pk=rdpcap('%s')" % filename,
> ">>>")
> - self.send_pks_session.send_command(sendp_fmt)
> + self.pkts.append_pkt(def_pkts[key])
> + self.send_pks_session = self.pkts.send_pkt_bg(self.tester,
> self.tester_tintf)
>
> def testpmd_reset_status(self, testpmd):
> """
> @@ -195,15 +181,13 @@ class TestVfPortStartStop(TestCase):
>
> def tear_down(self):
>
> + if self.send_pks_session:
> + self.pkts.stop_send_pkt_bg(self.tester, self.send_pks_session)
> if self.setup_1pf_2vf_1vm_env_flag == 1:
> self.destroy_1pf_2vf_1vm_env()
>
> def tear_down_all(self):
>
> - if self.send_pks_session:
> - self.tester.send_expect('killall -s INT scapy', '# ')
> - self.tester.destroy_session(self.send_pks_session)
> -
> if getattr(self, 'vm0', None):
> self.vm0.stop()
>
> --
> 2.7.4
More information about the dts
mailing list