[dts] [PATCH 2/3] framework: add packet integrity verification funtion

Yong Liu yong.liu at intel.com
Thu Oct 22 03:02:19 CEST 2015


From: Marvin Liu <yong.liu at intel.com>

Add new function which support packet integrity checkt with random content.
This function will send out 2000 packets at the interval of 0.01 seconds.
Then check recevied packet exactly match transmitted packets.

Signed-off-by: Marvin Liu <yong.liu at intel.com>

diff --git a/framework/tester.py b/framework/tester.py
index de0bc24..1f4f7ae 100644
--- a/framework/tester.py
+++ b/framework/tester.py
@@ -42,6 +42,7 @@ from net_device import NetDevice
 from etgen import IxiaPacketGenerator, SoftwarePacketGenerator
 from logger import getLogger
 from settings import IXIA
+import random
 
 
 class Tester(Crb):
@@ -444,6 +445,61 @@ class Tester(Crb):
         else:
             return None
 
+    def check_random_pkts(self, portList, pktnum=2000, interval=0.01, allow_miss=True):
+        """
+        Send several random packets and check rx packets matched
+        """
+        # load functions in packet module
+        module = __import__("packet")
+        pkt_c = getattr(module, "Packet")
+        send_f = getattr(module, "send_packets")
+        sniff_f = getattr(module, "sniff_packets")
+        load_f = getattr(module, "load_sniff_packets")
+        compare_f = getattr(module, "compare_pktload")
+        pkts = []
+        # packet type random between tcp/udp/ipv6
+        random_type = ['TCP', 'UDP', 'IPv6_TCP', 'IPv6_UDP']
+        # at least wait 2 seconds
+        timeout = int(pktnum * (interval + 0.01)) + 2
+        for txport, rxport in portList:
+            txIntf = self.get_interface(txport)
+            rxIntf = self.get_interface(rxport)
+            for num in range(pktnum):
+                # chose random packet
+                pkt_type = random.choice(random_type)
+                pkt = pkt_c(pkt_type=pkt_type,
+                            pkt_len=random.randint(64, 1514),
+                            ran_payload=True)
+                # sequence saved in layer4 source port
+                if "TCP" in pkt_type:
+                    pkt.config_layer('tcp', {'src': num % 65536})
+                else:
+                    pkt.config_layer('udp', {'src': num % 65536})
+                pkts.append(pkt)
+
+            # send and sniff packets
+            inst = sniff_f(intf=rxIntf, count=pktnum, timeout=timeout)
+            send_f(intf=txIntf, pkts=pkts, interval=interval)
+            recv_pkts = load_f(inst)
+
+            # only report when recevied number not matched
+            if len(pkts) != len(recv_pkts):
+                if allow_miss is False:
+                    return False
+
+                print "Pkt number not matched,%d sent and %d received\n" \
+                       % (len(pkts), len(recv_pkts))
+
+            # check each received packet content
+            for idx in range(len(recv_pkts)):
+                t_idx = recv_pkts[idx].strip_element_layer4('src')
+                if compare_f(pkts[t_idx], recv_pkts[idx], "L4") is False:
+                    print "Pkt recevied index %d not match original " \
+                          "index %d" % (idx, t_idx)
+                    return False
+
+        return True
+
     def extend_external_packet_generator(self, clazz, instance):
         """
         Update packet generator function, will implement later.
-- 
1.9.3



More information about the dts mailing list