[dts] [PATCH V1] bind scapy required modules to packet module
Dong, JunX
junx.dong at intel.com
Fri Oct 29 09:33:32 CEST 2021
From: "Dong,JunX" <junx.dong at intel.com>
scapy required modules contain itself contained modules and dts self
contained module, dts self contained module that be in dep dir must
copy from execute platform to tester, so can delay import the modules
after tester copied it
Signed-off-by: Dong,JunX <junx.dong at intel.com>
---
framework/packet.py | 32 +++++++++++++++-----------------
framework/tester.py | 12 +++++++++++-
2 files changed, 26 insertions(+), 18 deletions(-)
diff --git a/framework/packet.py b/framework/packet.py
index e6fd74a..eb9d21d 100644
--- a/framework/packet.py
+++ b/framework/packet.py
@@ -35,9 +35,7 @@ Base on scapy(python program for packet manipulation)
from importlib import import_module
from socket import AF_INET6
-
from scapy.all import *
-
from .utils import convert_int2ip, convert_ip2int
# load extension layers
@@ -50,20 +48,24 @@ if not os.path.exists(TMP_PATH):
scapy_modules_required = {'scapy.contrib.gtp': ['GTP_U_Header', 'GTPPDUSessionContainer'],
'scapy.contrib.lldp': ['LLDPDU', 'LLDPDUManagementAddress'],
- 'dep.scapy_modules.Dot1BR': ['Dot1BR'],
+ 'Dot1BR': ['Dot1BR'],
'scapy.contrib.pfcp': ['PFCP'],
'scapy.contrib.nsh': ['NSH'],
'scapy.contrib.igmp': ['IGMP'],
'scapy.contrib.mpls': ['MPLS'],
}
-for m in scapy_modules_required:
- try:
- module = import_module(m)
- for clazz in scapy_modules_required[m]:
- locals().update({clazz: getattr(module, clazz)})
- except Exception as e:
- print(e)
+
+def bind_scapy_modules(_module_path):
+ sys.path.append(_module_path)
+ for _module in scapy_modules_required:
+ try:
+ module = import_module(_module)
+ for clazz in scapy_modules_required[_module]:
+ setattr(sys.modules[__name__], clazz, getattr(module, clazz))
+ except Exception as e:
+ print(e)
+
def get_scapy_module_impcmd():
cmd_li = list()
@@ -71,11 +73,10 @@ def get_scapy_module_impcmd():
cmd_li.append(f'from {m} import {",".join(scapy_modules_required[m])}')
return ';'.join(cmd_li)
-SCAPY_IMP_CMD = get_scapy_module_impcmd()
+SCAPY_IMP_CMD = get_scapy_module_impcmd()
# packet generator type should be configured later
PACKETGEN = "scapy"
-
LayersTypes = {
"L2": ['ether', 'vlan', 'etag', '1588', 'arp', 'lldp', 'mpls', 'nsh'],
# ipv4_ext_unknown, ipv6_ext_unknown
@@ -96,17 +97,15 @@ LayersTypes = {
"INNER L4": ['inner_tcp', 'inner_udp', 'inner_frag', 'inner_sctp', 'inner_icmp', 'inner_nofrag'],
"PAYLOAD": ['raw']
}
-
# Saved background sniff process id
SNIFF_PIDS = {}
-
# Saved packet generator process id
# used in pktgen or tgen
PKTGEN_PIDS = {}
-
# default filter for LLDP packet
LLDP_FILTER = {'layer': 'ether', 'config': {'type': 'not lldp'}}
+
def write_raw_pkt(pkt_str, file_name):
tmp = eval(pkt_str)
tmp = bytearray(bytes(tmp))
@@ -114,6 +113,7 @@ def write_raw_pkt(pkt_str, file_name):
w.write(tmp)
w.close()
+
class scapy(object):
SCAPY_LAYERS = {
'ether': Ether(dst="ff:ff:ff:ff:ff:ff"),
@@ -1117,8 +1117,6 @@ def strip_pktload(pkt=None, layer="L2", p_index=0):
return load
-###############################################################################
-###############################################################################
if __name__ == "__main__":
pkt = Packet('Ether(type=0x894f)/NSH(Len=0x6,NextProto=0x0,NSP=0x000002,NSI=0xff)')
sendp(pkt, iface='lo')
diff --git a/framework/tester.py b/framework/tester.py
index 9dba4e4..3270084 100644
--- a/framework/tester.py
+++ b/framework/tester.py
@@ -52,6 +52,7 @@ from .packet import (
start_tcpdump,
stop_and_load_tcpdump_packets,
strip_pktload,
+ bind_scapy_modules
)
from .pktgen import getPacketGenerator
from .settings import (
@@ -106,7 +107,11 @@ class Tester(Crb):
session_name = 'tester_scapy' if not self.scapy_sessions_li else f'tester_scapy_{random.random()}'
session = self.create_session(session_name)
self.scapy_sessions_li.append(session)
+
+ # launch scapy
session.send_expect('scapy', '>>> ')
+
+ # copy scapy dep modules in dts to tester
file_dir = os.path.dirname(__file__).split(os.path.sep)
lib_path = os.path.sep.join(file_dir[:-1]) + '/dep/scapy_modules/'
exists_flag = self.alt_session.session.send_expect(f'ls {self.tmp_scapy_module_dir}', '# ', verify=True)
@@ -115,11 +120,16 @@ class Tester(Crb):
scapy_modules_path = [lib_path+i for i in os.listdir(lib_path) if i.endswith('.py')]
path = ' '.join(scapy_modules_path)
session.copy_file_to(src=path, dst=self.tmp_scapy_module_dir)
- session.session.send_expect(f"sys.path.append('{self.tmp_scapy_module_dir}')", ">>> ")
+ # bind scapy modules to packet module
+ bind_scapy_modules(self.tmp_scapy_module_dir)
+
+ # import scapy required modules to scapy env
+ session.session.send_expect(f"sys.path.append('{self.tmp_scapy_module_dir}')", ">>> ")
out = session.session.send_expect(SCAPY_IMP_CMD, '>>> ')
if 'ImportError' in out:
session.logger.warning(f'entering import error: {out}')
+
return session
def check_scapy_version(self):
--
1.8.3.1
More information about the dts
mailing list