[dts] [PATCH V1] tests:optimize verify encryptio

xiao,qimai qimaix.xiao at intel.com
Fri Mar 22 10:07:13 CET 2019


Signed-off-by: xiao,qimai <qimaix.xiao at intel.com>
---
 tests/TestSuite_inline_ipsec.py | 88 ++++++++++++++++++++++-----------
 1 file changed, 59 insertions(+), 29 deletions(-)

diff --git a/tests/TestSuite_inline_ipsec.py b/tests/TestSuite_inline_ipsec.py
index 1813c08..b6fdaa8 100644
--- a/tests/TestSuite_inline_ipsec.py
+++ b/tests/TestSuite_inline_ipsec.py
@@ -28,7 +28,7 @@
 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
+#-*- coding:utf-8 -*-
 
 """
 DPDK Test suite.
@@ -36,13 +36,11 @@ Test inline_ipsec.
 """
 
 import utils
-import string
 import time
 import re
-import threading
 from test_case import TestCase
-import getopt
-from scapy.all import *
+from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
+import random
 
 ETHER_STANDARD_MTU = 1518
 ETHER_JUMBO_FRAME_MTU = 9000
@@ -51,14 +49,14 @@ ETHER_JUMBO_FRAME_MTU = 9000
 class TestInlineIpsec(TestCase):
     """
     This suite depend PyCryptodome,it provide authenticated encryption modes(GCM)
-    my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex (3.4.7),
-    pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.3.3.dev623)
+    my environment:cryptography (1.7.2), pycryptodome (3.4.7), pycryptodomex (3.4.7),
+    pycryptopp (0.6.0.1206569328141510525648634803928199668821045408958), scapy (2.4.2)
     """
+
     def set_up_all(self):
         """
         Run at the start of each test suite.
         """
-        self.verify(self.nic in ["niantic"], "%s NIC not support" % self.nic)
         self.verify(self.drivername in ["vfio-pci"], "%s drivername not support" % self.drivername)
         self.dut_ports = self.dut.get_ports(self.nic)
         self.verify(len(self.dut_ports) >= 2, "Insufficient ports")
@@ -86,7 +84,7 @@ class TestInlineIpsec(TestCase):
 
         self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
         # add print code in IPSEC app
-        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\printf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
+        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx, portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
         self.dut.send_expect(sedcmd, "#", 60)
 
         # build sample app
@@ -158,14 +156,15 @@ class TestInlineIpsec(TestCase):
 
     def set_cfg(self, filename, cfg):
         """
-        open file and write cfg, scp it to dut base directory  
+        open file and write cfg, scp it to dut base directory
         """
         for i in cfg:
             with open(filename, 'w') as f:
                 f.write(cfg)
         self.dut.session.copy_file_to(filename, self.dut.base_dir)
 
-    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def send_encryption_package(self, intf, paysize=64, do_encrypt=False, send_spi=5, count=1,
+                                inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         prepare a packet and send
         """
@@ -184,6 +183,8 @@ class TestInlineIpsec(TestCase):
 
         if do_encrypt == True:
             print "send encrypt package"
+            print("before encrypt, the package info is like below: ")
+            p.show()
             e = sa_gcm.encrypt(p)
         else:
             print "send normal package"
@@ -196,42 +197,58 @@ class TestInlineIpsec(TestCase):
             name='send_encryption_package')
         sendp(eth_e, iface=intf, count=count)
         self.tester.destroy_session(session_send)
+        return payload,p.src,p.dst
 
-        return payload
-
-    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5', sa_dst='172.16.2.5'):
+    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32, jumboframe=1518, do_encrypt=False,
+                         verify=True, send_spi=5, receive_spi=1005, count=1, inner_dst='192.168.105.10',
+                         sa_src='172.16.1.5', sa_dst='172.16.2.5'):
         """
         verify Ipsec receive package
         """
-        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
+        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
             self.portpci_0, self.portpci_1, jumboframe, config, file_name)
         self.dut.send_expect(cmd, "IPSEC", 60)
 
         session_receive = self.tester.create_session(
             name='receive_encryption_package')
+
         sa_gcm = r"sa_gcm=SecurityAssociation(ESP,spi=%s,crypt_algo='AES-GCM',crypt_key='\x2b\x7e\x15\x16\x28\xae\xd2\xa6\xab\xf7\x15\x88\x09\xcf\x4f\x3d\xde\xad\xbe\xef',auth_algo='NULL',auth_key=None,tunnel_header=IP(src='172.16.1.5',dst='172.16.2.5'))" % receive_spi
 
         session_receive.send_expect("scapy", ">>>", 10)
         session_receive.send_expect(
-            "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
-        send_package = self.send_encryption_package(
-            txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "", 10)
 
-        time.sleep(10)
-        out = session_receive.send_expect("pkts", "", 30)
         if do_encrypt:
+            send_package = self.send_encryption_package(
+                txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
+            time.sleep(45)
+            session_receive.send_expect("pkts", "", 30)
             out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
         else:
+            session_receive2 = self.tester.create_session(name='receive_encryption_package2')
+            session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
+            send_package = self.send_encryption_package(txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src,
+                                                        sa_dst)
+            time.sleep(45)
+            rev = session_receive2.get_session_before()
+            print(rev)
+            p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+            res = p.search(rev)
+            self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
+            self.tester.destroy_session(session_receive2)
+            session_receive.send_expect("pkts", "", 30)
             session_receive.send_expect(sa_gcm, ">>>", 10)
-            session_receive.send_expect(
-                "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
+            time.sleep(2)
+            session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
             out = session_receive.send_expect("results", ">>>", 10)
 
         if verify:
-            self.verify(send_package in out,
+            print('received packet content is %s'%out)
+            print('send pkt src ip is %s, dst ip is %s, payload is %s'%(send_package[1],send_package[2],send_package[0]))
+            self.verify(send_package[0] in out,
                         "Unreceived package or get other package")
         else:
-            self.verify(send_package not in out,
+            self.verify(send_package[0] not in out,
                         "The function is not in effect")
         session_receive.send_expect("quit()", "#", 10)
         self.tester.destroy_session(session_receive)
@@ -244,6 +261,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize)
+        self.dut.send_expect("^C","#",5)
 
     def test_Ipsec_Encryption_Jumboframe(self):
         """
@@ -253,6 +271,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
                               self.txItf, self.rxItf, paysize, ETHER_JUMBO_FRAME_MTU)
+        self.dut.send_expect("^C","#",5)
 
     def test_Ipsec_Encryption_Rss(self):
         """
@@ -264,6 +283,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C","#",5)
 
     def test_IPSec_Decryption(self):
         """
@@ -273,6 +293,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(1, ETHER_STANDARD_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, do_encrypt=True, count=2)
+        self.dut.send_expect("^C","#",5)
 
     def test_IPSec_Decryption_Jumboframe(self):
         """
@@ -282,6 +303,7 @@ class TestInlineIpsec(TestCase):
         paysize = random.randint(ETHER_STANDARD_MTU, ETHER_JUMBO_FRAME_MTU)
         self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
                               self.txItf, paysize, ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2)
+        self.dut.send_expect("^C","#",5)
 
     def test_Ipsec_Decryption_Rss(self):
         """
@@ -293,6 +315,7 @@ class TestInlineIpsec(TestCase):
         out = self.dut.get_session_output()
         verifycode = "receive 1 packet in rxqueueid=1"
         self.verify(verifycode in out, "rxqueueid error")
+        self.dut.send_expect("^C","#",5)
 
     def test_Ipsec_Decryption_wrongkey(self):
         """
@@ -304,7 +327,9 @@ class TestInlineIpsec(TestCase):
                               self.txItf, paysize, do_encrypt=True, verify=False, count=2)
         out = self.dut.get_session_output()
         verifycode = "IPSEC_ESP: failed crypto op"
-        self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
+        l=re.findall(verifycode,out)
+        self.verify(len(l)==2, "Ipsec Decryption wrongkey failed")
+        self.dut.send_expect("^C","#",5)
 
     def test_Ipsec_Encryption_Decryption(self):
         """
@@ -328,7 +353,6 @@ class TestInlineIpsec(TestCase):
         session_receive2.send_expect(sa_gcm, ">>>", 60)
         session_receive2.send_expect(
             "pkts=sniff(iface='%s',count=2,timeout=30)" % self.txItf, "", 60)
-
         payload = "test for Ipsec Encryption Decryption simultaneously"
         sa_gcm = SecurityAssociation(ESP, spi=5,
                                      crypt_algo='AES-GCM',
@@ -336,24 +360,30 @@ class TestInlineIpsec(TestCase):
                                      auth_algo='NULL', auth_key=None,
                                      tunnel_header=IP(src='172.16.1.5', dst='172.16.2.5'))
         sa_gcm.crypt_algo.icv_size = 16
-
         p = IP(src='192.168.105.10', dst='192.168.105.10')
         p /= payload
         p = IP(str(p))
-
         e1 = sa_gcm.encrypt(p)
         e2 = p
 
         eth_e1 = Ether() / e1
         eth_e1.src = self.rx_src
         eth_e1.dst = self.tx_dst
+
         eth_e2 = Ether() / e2
         eth_e2.src = self.rx_src
         eth_e2.dst = self.tx_dst
-
+        session_receive3=self.tester.create_session('check_forward_encryption_package')
+        session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
+        time.sleep(2)
         sendp(eth_e1, iface=self.rxItf, count=2)
         sendp(eth_e2, iface=self.txItf, count=1)
         time.sleep(30)
+        rev = session_receive3.get_session_before()
+        print(rev)
+        p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
+        res = p.search(rev)
+        self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
         session_receive.send_expect(
             "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
         out = session_receive.send_expect("results", ">>>", 60)
-- 
2.17.2



More information about the dts mailing list