[dts] [PATCH v2] Add auto test cases generating function. That covers different combination of keys, iv, packet length, cipher algorithms and hash algorithms
Liu, Yong
yong.liu at intel.com
Mon Jan 16 06:03:56 CET 2017
Thanks, Zhaoyan. Some comments below.
> -----Original Message-----
> From: dts [mailto:dts-bounces at dpdk.org] On Behalf Of Chen, Zhaoyan
> Sent: Tuesday, January 10, 2017 4:23 PM
> To: dts at dpdk.org
> Cc: Chen, Zhaoyan <zhaoyan.chen at intel.com>
> Subject: [dts] [PATCH v2] Add auto test cases generating function. That covers
> different combination of keys, iv, packet length, cipher algorithms and hash
> algorithms
>
> From: Zhaoyan Chen<zhaoyan.chen at intel.com>
>
> ---
> tests/TestSuite_l2fwd_crypto.py | 2007
> ++++++++++++++++++++++++++++++++-------
> 1 file changed, 1687 insertions(+), 320 deletions(-)
>
> diff --git a/tests/TestSuite_l2fwd_crypto.py b/tests/TestSuite_l2fwd_crypto.py
> index 9c78a6f..74728ea 100644
> --- a/tests/TestSuite_l2fwd_crypto.py
> +++ b/tests/TestSuite_l2fwd_crypto.py
> @@ -32,12 +32,12 @@
> import hmac
> import hashlib
> import binascii
> -
> -import utils
> import time
> -
> +import os
> +import sys
> +import dts
Please remove dts, test suite should not depend on this module now.
> from test_case import TestCase
> -
> +from CryptoMobile.CM import *
Please include required functions apparently.
>
> class TestL2fwdCrypto(TestCase):
>
> @@ -55,10 +55,10 @@ class TestL2fwdCrypto(TestCase):
> self.logger.info("dut ports = " + str(self.dut_ports))
> self.logger.info("ports_socket = " + str(self.ports_socket))
>
> - self.core_mask = utils.create_mask(self.dut.get_core_list(
> + self.core_mask = dts.create_mask(self.dut.get_core_list(
> self.core_config,
Currently we use utils.create_mask to generate mask.
> self.tester.scapy_append('sendp([Ether(src="52:00:00:00:00:00")/IP(src="192.16
> 8.1.1",dst="192.168.1.2")/Raw(load=\"%s\")], iface="%s", count=%s)' %
> (payload, self.tx_interface, PACKET_COUNT))
Suggest to use packet module for transmission and capture packets.
You can reference to userspace_ethtool suit.
> + self.logger.info("Test qat_h_AES_XCBC_MAC_01")
> + if not self.__execute_l2fwd_crypto_test(
> + test_vectors, "qat_h_AES_XCBC_MAC_01"):
> + result = False
>
> - self.tester.scapy_execute()
> + self.verify(result, True)
>
> - time.sleep(5)
> + def test_null_NULL(self):
>
> - self.tester.send_expect("killall tcpdump", "#")
> - self.tester.send_expect("^C", "#")
> + result = True
>
> - # Wait 5 secs for tcpdump exit
> - time.sleep(5)
> + self.logger.info("Test null_c_NULL_01")
> + if not self.__execute_l2fwd_crypto_test(
> + test_vectors, "null_c_NULL_01"):
> + result = False
>
> - self.tester.send_expect("scapy", ">>>")
> - self.tester.send_expect("p=rdpcap('%s.pcap', count=%s)" %
> (self.rx_interface, PACKET_COUNT), ">>>")
> + self.verify(result, True)
> +
> + def test_calculatr_case_number(self):
> +
Typo here.
> + self.__calculate_totall_cases_numb()
> +
> + def __calculate_totall_cases_numb(self):
Not sure why add this case, only dump total number of cases?
> +
> + def __execute_l2fwd_crypto_test(self, test_vectors, test_vector_name):
>
> - hex_list = []
> - for i in range(PACKET_COUNT):
> - cmd = "linehexdump(p[%s],onlyhex=1)" % i
> - hex_list.append(self.tester.send_expect(cmd, ">>>"))
> + if test_vector_name not in test_vectors:
> + self.logger.warn("SKIP : " + test_vector_name)
> + return True
>
> - # Exit the scapy
> - self.tester.send_expect("exit()", "#", 60)
> + test_vector = test_vectors[test_vector_name]
>
> - for hex_str in hex_list:
> - packet_hex = hex_str.split(" ")
> - # self.logger.info(hex_str)
> - # self.logger.info(packet_hex)
> + test_vector_list = self.__test_vector_to_vector_list(test_vector,
> + core_mask=self.core_mask,
> + port_mask=self.port_mask)
>
> - cipher_offset = 34
> - cipher_length = len(test_vector["output_cipher"])/2
> - if cipher_length == 0:
> - cipher_length = len(test_vector["input"])/2
> - cipher_text =
> "".join(packet_hex[cipher_offset:cipher_offset+cipher_length])
> - # self.logger.info("Cipher text in packet = " + cipher_text)
> - # self.logger.info("Ref Cipher text = " + test_vector["output_cipher"])
> - if str.lower(cipher_text) == str.lower(test_vector["output_cipher"]):
> - self.logger.info("Cipher Matched.")
> - else:
> - if test_vector["output_cipher"] != "":
> - result = False
> - self.logger.info("Cipher NOT Matched.")
> - self.logger.info("Cipher text in packet = " + cipher_text)
> - self.logger.info("Ref Cipher text = " +
> test_vector["output_cipher"])
> + result = True
> + self.logger.info("Total Generated {0} Tests".format(len(test_vector_list)))
> + for test_vector in test_vector_list:
> + self.logger.debug(test_vector)
> + cmd_str = self.__test_vector_to_cmd(test_vector,
> + core_mask=self.core_mask,
> + port_mask=self.port_mask)
> + self.dut.send_expect(cmd_str, "==", 30)
> +
> + self.tester.send_expect("rm -rf %s.pcap" % (self.rx_interface), "#")
> + self.tester.send_expect("tcpdump -P in -w %s.pcap -i %s &" %
> (self.rx_interface, self.rx_interface), "#")
> + # Wait 5 sec for tcpdump stable
> + time.sleep(5)
> +
> + payload = self.__format_hex_to_param(test_vector["input"], "\\x",
> "\\x")
> +
> + PACKET_COUNT = 65
> +
> + self.tester.scapy_foreground()
> +
> self.tester.scapy_append('sendp([Ether(src="52:00:00:00:00:00")/IP(src="192.16
> 8.1.1",dst="192.168.1.2")/Raw(load=\"%s\")], iface="%s", count=%s)' %
> (payload, self.tx_interface, PACKET_COUNT))
> +
> + self.tester.scapy_execute()
> +
> + time.sleep(5)
> +
> + self.tester.send_expect("killall tcpdump", "#")
> + self.tester.send_expect("^C", "#")
> +
> + # Wait 5 secs for tcpdump exit
> + time.sleep(5)
> +
> + self.tester.send_expect("scapy", ">>>")
> + self.tester.send_expect("p=rdpcap('%s.pcap', count=%s)" %
> (self.rx_interface, PACKET_COUNT), ">>>")
> +
> + hex_list = []
> + for i in range(PACKET_COUNT):
> + cmd = "linehexdump(p[%s],onlyhex=1)" % i
> + hex_list.append(self.tester.send_expect(cmd, ">>>"))
> +
Please try strip_pktload in packet module, we want suite use unified packet module.
More information about the dts
mailing list