[dts] [PATCH V1] update inline_ipsec to test dpdk19.05

Xiao, QimaiX qimaix.xiao at intel.com
Wed May 22 11:32:15 CEST 2019


Hi, wenjie:
This case depend on PyCryptodome, ,it provide authenticated encryption modes(GCM).
And due to scapy lower than 2.3.3 lack of some crypto modules, therefor this case can only run on tester with scapy 2.3.3 or later.
I've submitted a patch to update its test plan to specify the dependences info.

Best Regards,
Xiao,qiami

-----Original Message-----
From: Li, WenjieX A 
Sent: Tuesday, May 7, 2019 3:19 PM
To: Xiao, QimaiX <qimaix.xiao at intel.com>; dts at dpdk.org
Cc: Xiao, QimaiX <qimaix.xiao at intel.com>
Subject: RE: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05

Hi Qimai,

1. Please do not change script like this:
> -import utils
> -import time
...
> +import time
> +import utils

2. You are using scapy 2.4.2. Does this script still work with scapy 2.2.* or 2.3.*?  Scapy 2.4.2 can cause below error with current dts(dep/Dot1BR.py).

"dts: name 'ETHER_TYPES' is not defined
Traceback (most recent call last):
  File "./main.py", line 162, in <module>
    args.debug, args.debugcase, args.re_run, args.commands)
  File "/home/autoregression/dts/framework/dts.py", line 563, in run_all
    dts_run_target(duts, tester, targets, test_suites)
  File "/home/autoregression/dts/framework/dts.py", line 381, in dts_run_target
    dts_run_suite(duts, tester, test_suites, target)
  File "/home/autoregression/dts/framework/dts.py", line 435, in dts_run_suite
    suite_obj.execute_tear_downall()
UnboundLocalError: local variable 'suite_obj' referenced before assignment
dts: DTS ended
"

Thank you!

BR,
Wenjie

> -----Original Message-----
> From: dts [mailto:dts-bounces at dpdk.org] On Behalf Of xiao,qimai
> Sent: Friday, April 26, 2019 4:58 PM
> To: dts at dpdk.org
> Cc: Xiao, QimaiX <qimaix.xiao at intel.com>
> Subject: [dts] [PATCH V1] update inline_ipsec to test dpdk19.05
> 
> 1.remove unused imports
> 2.add some steps to verfiy encrypt package 3.upadte verify message of 
> decrypt with wrong key
> 
> Signed-off-by: xiao,qimai <qimaix.xiao at intel.com>
> ---
>  tests/TestSuite_inline_ipsec.py | 86 
> +++++++++++++++++++++++----------
>  1 file changed, 60 insertions(+), 26 deletions(-)
> 
> diff --git a/tests/TestSuite_inline_ipsec.py 
> b/tests/TestSuite_inline_ipsec.py index 1813c08..369d4f0 100644
> --- a/tests/TestSuite_inline_ipsec.py
> +++ b/tests/TestSuite_inline_ipsec.py
> @@ -35,14 +35,13 @@ DPDK Test suite.
>  Test inline_ipsec.
>  """
> 
> -import utils
> -import string
> -import time
> +import random
>  import re
> -import threading
> +import time
> +
> +import utils
> +from scapy.all import ESP, IP, Ether, sendp, SecurityAssociation
>  from test_case import TestCase
> -import getopt
> -from scapy.all import *
> 
>  ETHER_STANDARD_MTU = 1518
>  ETHER_JUMBO_FRAME_MTU = 9000
> @@ -51,9 +50,10 @@ ETHER_JUMBO_FRAME_MTU = 9000  class
> TestInlineIpsec(TestCase):
>      """
>      This suite depend PyCryptodome,it provide authenticated 
> encryption
> modes(GCM)
> -    my environment:asn1crypto (0.22.0), pycryptodome (3.4.7), pycryptodomex
> (3.4.7),
> -    pycryptopp
> (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> (2.3.3.dev623)
> +    my environment:cryptography (1.7.2), pycryptodome (3.4.7), 
> + pycryptodomex
> (3.4.7),
> +    pycryptopp
> + (0.6.0.1206569328141510525648634803928199668821045408958), scapy
> + (2.4.2)
>      """
> +
>      def set_up_all(self):
>          """
>          Run at the start of each test suite.
> @@ -86,7 +86,7 @@ class TestInlineIpsec(TestCase):
> 
>          self.path = "./examples/ipsec-secgw/build/ipsec-secgw"
>          # add print code in IPSEC app
> -        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\printf("[debug]receive %hhu packet in 
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
> +        sedcmd = r"""sed -i -e '/process_pkts(qconf, pkts, nb_rx,
> portid);/i\\t\t\t\tprintf("[debug]receive %hhu packet in 
> rxqueueid=%hhu\\n",nb_rx, queueid);' examples/ipsec-secgw/ipsec-secgw.c"""
>          self.dut.send_expect(sedcmd, "#", 60)
> 
>          # build sample app
> @@ -165,7 +165,8 @@ class TestInlineIpsec(TestCase):
>                  f.write(cfg)
>          self.dut.session.copy_file_to(filename, self.dut.base_dir)
> 
> -    def send_encryption_package(self, intf, paysize=64, do_encrypt=False,
> send_spi=5, count=1, inner_dst='192.168.105.10', sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> +    def send_encryption_package(self, intf, paysize=64, 
> + do_encrypt=False,
> send_spi=5, count=1,
> +                                inner_dst='192.168.105.10', 
> + sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
>          """
>          prepare a packet and send
>          """
> @@ -184,6 +185,8 @@ class TestInlineIpsec(TestCase):
> 
>          if do_encrypt == True:
>              print "send encrypt package"
> +            print("before encrypt, the package info is like below: ")
> +            p.show()
>              e = sa_gcm.encrypt(p)
>          else:
>              print "send normal package"
> @@ -196,14 +199,15 @@ class TestInlineIpsec(TestCase):
>              name='send_encryption_package')
>          sendp(eth_e, iface=intf, count=count)
>          self.tester.destroy_session(session_send)
> +        return payload, p.src, p.dst
> 
> -        return payload
> -
> -    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, paysize=32,
> jumboframe=1518, do_encrypt=False, verify=True, send_spi=5, 
> receive_spi=1005, count=1, inner_dst='192.168.105.10', 
> sa_src='172.16.1.5',
> sa_dst='172.16.2.5'):
> +    def Ipsec_Encryption(self, config, file_name, txItf, rxItf, 
> + paysize=32,
> jumboframe=1518, do_encrypt=False,
> +                         verify=True, send_spi=5, receive_spi=1005, 
> + count=1,
> inner_dst='192.168.105.10',
> +                         sa_src='172.16.1.5', sa_dst='172.16.2.5'):
>          """
>          verify Ipsec receive package
>          """
> -        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null' --log-level 8 --
> socket-mem 1024,1 -- -p 0xf -P -u 0x2 -j %s --config='%s' -f %s" % (
> +        cmd = self.path + " -l 20,21 -w %s -w %s --vdev 'crypto_null'
> + --log-level 8 --socket-mem 1024,1024 -- -p 0xf -P -u 0x2 -j %s 
> + --config='%s' -f %s" % (
>              self.portpci_0, self.portpci_1, jumboframe, config, file_name)
>          self.dut.send_expect(cmd, "IPSEC", 60)
> 
> @@ -213,25 +217,40 @@ class TestInlineIpsec(TestCase):
> 
>          session_receive.send_expect("scapy", ">>>", 10)
>          session_receive.send_expect(
> -            "pkts=sniff(iface='%s',count=1,timeout=10)" % rxItf, "", 30)
> -        send_package = self.send_encryption_package(
> -            txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> +            "pkts=sniff(iface='%s',count=1,timeout=45)" % rxItf, "",
> + 10)
> 
> -        time.sleep(10)
> -        out = session_receive.send_expect("pkts", "", 30)
>          if do_encrypt:
> +            send_package = self.send_encryption_package(
> +                txItf, paysize, do_encrypt, send_spi, count, inner_dst, sa_src, sa_dst)
> +            time.sleep(45)
> +            session_receive.send_expect("pkts", "", 30)
>              out = session_receive.send_expect("pkts[0]['IP'] ", ">>>", 10)
>          else:
> +            session_receive2 =
> self.tester.create_session(name='receive_encryption_package2')
> +            session_receive2.send_expect("tcpdump -Xvvvi %s -c 1" % rxItf, "", 30)
> +            send_package = self.send_encryption_package(txItf, 
> + paysize,
> do_encrypt, send_spi, count, inner_dst, sa_src,
> +                                                        sa_dst)
> +            time.sleep(45)
> +            rev = session_receive2.get_session_before()
> +            print(rev)
> +            p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> +            res = p.search(rev)
> +            self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
> +            self.tester.destroy_session(session_receive2)
> +            session_receive.send_expect("pkts", "", 30)
>              session_receive.send_expect(sa_gcm, ">>>", 10)
> -            session_receive.send_expect(
> -                "results=sa_gcm.decrypt(pkts[0]['IP'])", ">>>", 10)
> +            time.sleep(2)
> +
> + session_receive.send_expect("results=sa_gcm.decrypt(pkts[0]['IP'])",
> + ">>>", 10)
>              out = session_receive.send_expect("results", ">>>", 10)
> 
>          if verify:
> -            self.verify(send_package in out,
> +            print('received packet content is %s' % out)
> +            print('send pkt src ip is %s, dst ip is %s, payload is %s' % (
> +                send_package[1], send_package[2], send_package[0]))
> +            self.verify(send_package[0] in out,
>                          "Unreceived package or get other package")
>          else:
> -            self.verify(send_package not in out,
> +            self.verify(send_package[0] not in out,
>                          "The function is not in effect")
>          session_receive.send_expect("quit()", "#", 10)
>          self.tester.destroy_session(session_receive)
> @@ -244,6 +263,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(1, ETHER_STANDARD_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
>                                self.txItf, self.rxItf, paysize)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Jumboframe(self):
>          """
> @@ -253,6 +273,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/enc.cfg',
>                                self.txItf, self.rxItf, paysize, 
> ETHER_JUMBO_FRAME_MTU)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Rss(self):
>          """
> @@ -264,6 +285,7 @@ class TestInlineIpsec(TestCase):
>          out = self.dut.get_session_output()
>          verifycode = "receive 1 packet in rxqueueid=1"
>          self.verify(verifycode in out, "rxqueueid error")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_IPSec_Decryption(self):
>          """
> @@ -273,6 +295,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(1, ETHER_STANDARD_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
>                                self.txItf, paysize, do_encrypt=True, 
> count=2)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_IPSec_Decryption_Jumboframe(self):
>          """
> @@ -282,6 +305,7 @@ class TestInlineIpsec(TestCase):
>          paysize = random.randint(ETHER_STANDARD_MTU,
> ETHER_JUMBO_FRAME_MTU)
>          self.Ipsec_Encryption(config, '/root/dpdk/dec.cfg', self.rxItf,
>                                self.txItf, paysize, 
> ETHER_JUMBO_FRAME_MTU, do_encrypt=True, count=2)
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Decryption_Rss(self):
>          """
> @@ -293,6 +317,7 @@ class TestInlineIpsec(TestCase):
>          out = self.dut.get_session_output()
>          verifycode = "receive 1 packet in rxqueueid=1"
>          self.verify(verifycode in out, "rxqueueid error")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Decryption_wrongkey(self):
>          """
> @@ -303,8 +328,10 @@ class TestInlineIpsec(TestCase):
>          self.Ipsec_Encryption(config, '/root/dpdk/dec_wrong_key.cfg', self.rxItf,
>                                self.txItf, paysize, do_encrypt=True, verify=False, count=2)
>          out = self.dut.get_session_output()
> -        verifycode = "IPSEC_ESP: failed crypto op"
> -        self.verify(verifycode in out, "Ipsec Decryption wrongkey failed")
> +        verifycode = "IPSEC_ESP: esp_inbound_post() failed crypto op"
> +        l = re.findall(verifycode, out)
> +        self.verify(len(l) == 2, "Ipsec Decryption wrongkey failed")
> +        self.dut.send_expect("^C", "#", 5)
> 
>      def test_Ipsec_Encryption_Decryption(self):
>          """
> @@ -350,10 +377,17 @@ class TestInlineIpsec(TestCase):
>          eth_e2 = Ether() / e2
>          eth_e2.src = self.rx_src
>          eth_e2.dst = self.tx_dst
> -
> +        session_receive3 =
> self.tester.create_session('check_forward_encryption_package')
> +        session_receive3.send_expect("tcpdump -Xvvvi %s -c 1" % self.rxItf, "", 30)
> +        time.sleep(2)
>          sendp(eth_e1, iface=self.rxItf, count=2)
>          sendp(eth_e2, iface=self.txItf, count=1)
>          time.sleep(30)
> +        rev = session_receive3.get_session_before()
> +        print(rev)
> +        p = re.compile(': ESP\(spi=0x\w+,seq=0x\w+\),')
> +        res = p.search(rev)
> +        self.verify(res, 'encrypt failed, tcpdump get %s' % rev)
>          session_receive.send_expect(
>              "results=sa_gcm.decrypt(pkts[2]['IP'])", ">>>", 60)
>          out = session_receive.send_expect("results", ">>>", 60)
> --
> 2.17.0



More information about the dts mailing list