[dts] [DTS][PATCH V1 2/2] creat a pcap file include 2000 random packet, used scapy send the pcap, and check dut not loss packet

xu,huilong huilongx.xu at intel.com
Thu Feb 4 04:22:02 CET 2016


Signed-off-by: xu,huilong <huilongx.xu at intel.com>
---
 tests/TestSuite_pmd.py | 55 +++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 50 insertions(+), 5 deletions(-)

diff --git a/tests/TestSuite_pmd.py b/tests/TestSuite_pmd.py
index eeec53f..7a8df89 100644
--- a/tests/TestSuite_pmd.py
+++ b/tests/TestSuite_pmd.py
@@ -1,6 +1,6 @@
 # BSD LICENSE
 #
-# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -37,12 +37,14 @@ Test userland 10Gb PMD
 import dts
 import re
 import time
+import random
 from test_case import TestCase
 from plotting import Plotting
 from time import sleep
 from settings import HEADER_SIZE
 from pmd_output import PmdOutput
 from etgen import IxiaPacketGenerator
+from scapy.all import *
 
 class TestPmd(TestCase,IxiaPacketGenerator):
 
@@ -81,6 +83,8 @@ class TestPmd(TestCase,IxiaPacketGenerator):
         self.frame_sizes = [64, 65, 128, 256, 512, 1024, 1280, 1518]
 
         self.rxfreet_values = [0, 8, 16, 32, 64, 128]
+        
+        self.total_packets = 2000
 
         self.test_cycles = [{'cores': '1S/1C/1T', 'Mpps': {}, 'pct': {}},
                             {'cores': '1S/2C/1T', 'Mpps': {}, 'pct': {}},
@@ -113,7 +117,37 @@ class TestPmd(TestCase,IxiaPacketGenerator):
         Run before each test case.
         """
         pass
-
+    def random_ip(self, iptype):
+        tmp = []
+        if 'ipv4' == iptype:
+            for i in range(4):
+                tmp.append(str(random.randint(10, 254)))
+            return '.'.join(tmp)
+        if 'ipv6' == iptype:
+            for i in range(8):
+                tmp.append(str(hex(random.randint(0, 65535)))[2:])
+            return ':'.join(tmp)
+        else:
+            self.logger.warning("invalid ip type: %s" % iptype)    
+    def creat_pcap(self, filename, dst_mac, tx_intf):
+        packets = []
+        packet_type = [Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IP(src="%s" % self.random_ip("ipv4"), dst="%s" % self.random_ip("ipv4"))/("X"), #ipv4 packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IP(src="%s" % self.random_ip("ipv4"), dst="%s" % self.random_ip("ipv4"))/TCP()/("X"), #ipv4 tcp packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IP(src="%s" % self.random_ip("ipv4"), dst="%s" % self.random_ip("ipv4"))/UDP()/("X"), #ipv4 udp packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IP(src="%s" % self.random_ip("ipv4"), dst="%s" % self.random_ip("ipv4"))/SCTP(tag=1)/("X"),#ipv4 sctp packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/("X"), # l2 packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IPv6(src="%s" % self.random_ip("ipv6"), dst="%s" % self.random_ip("ipv6"))/("X"), # ipv6 packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IPv6(src="%s" % self.random_ip("ipv6"), dst="%s" % self.random_ip("ipv6"))/UDP()/("X"), #ipv6 udp packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IPv6(src="%s" % self.random_ip("ipv6"), dst="%s" % self.random_ip("ipv6"))/TCP()/("X"), # ipv6 tcp packet
+                       Ether(dst="%s" % dst_mac, src=get_if_hwaddr(tx_intf))/IPv6(src="%s" % self.random_ip("ipv6"), dst="%s" % self.random_ip("ipv6"))/SCTP(tag=1)/("X"), # ipv6 sctp packet
+                      ]
+        for packet in packet_type:
+            for i in range(self.total_packets / len(packet_type)): 
+                packets.append(packet)
+        for i in range(self.total_packets % len(packet_type)):
+            packets.append(packet_type[random.randint(0, len(packet_type) - 1)])
+        wrpcap(filename, packets)
+        self.tester.session.copy_file_to(filename)
     def test_perf_pmd_performance_4ports(self):
         """
         PMD Performance Benchmarking with 4 ports.
@@ -322,11 +356,22 @@ class TestPmd(TestCase,IxiaPacketGenerator):
         port_mask = dts.create_mask([self.dut_ports[0], self.dut_ports[1]])
 
         self.pmdout.start_testpmd("1S/2C/1T", "--portmask=%s" % port_mask, socket=self.ports_socket)
+       
+        self.dut.send_expect("set fwd mac", "testpmd> ")
         self.dut.send_expect("start", "testpmd> ")
-        for size in self.frame_sizes:
-            self.send_packet(size)
+        interface = self.tester.get_interface(
+            self.tester.get_local_port(self.dut_ports[0]))
+        mac = self.dut.get_mac_address(self.dut_ports[0])
+        self.creat_pcap("random_packet.pcap", mac, interface)
+        self.tester.scapy_append('pcap = rdpcap("random_packet.pcap")')
+        self.tester.scapy_append('sendp(pcap, iface="%s")' % interface)
+        self.tester.scapy_execute(120)
+
+        #self.dut.send_expect("stop", "testpmd> ")
+        dut_rxpackets = self.get_stats(self.dut_ports[0])['RX-packets']
+        dut_txpackets = self.get_stats(self.dut_ports[1])['TX-packets']
+        self.verify(dut_txpackets == self.total_packets and dut_rxpackets == dut_txpackets, "loss packets,tester send %d packets, dut rx packets %s, dut tx packets %s" %(self.total_packets, dut_rxpackets, dut_txpackets) )
 
-        self.dut.send_expect("stop", "testpmd> ")
         self.dut.send_expect("quit", "# ", 30)
         sleep(5)
 
-- 
1.9.3



More information about the dts mailing list