[PATCH v6 3/7] dts: add optional packet filtering to scapy sniffer

Juraj Linkeš juraj.linkes at pantheon.tech
Mon Jan 8 13:01:46 CET 2024


On Wed, Jan 3, 2024 at 11:33 PM <jspewock at iol.unh.edu> wrote:
>
> From: Jeremy Spewock <jspewock at iol.unh.edu>
>
> Added the options to filter out LLDP and ARP packets when
> sniffing for packets with scapy. This was done using BPF filters to
> ensure that the noise these packets provide does not interfere with test
> cases.
>
> Signed-off-by: Jeremy Spewock <jspewock at iol.unh.edu>
> ---
>  dts/framework/test_suite.py                   | 15 +++++++++--
>  dts/framework/testbed_model/tg_node.py        | 14 ++++++++--
>  .../traffic_generator/__init__.py             |  7 ++++-
>  .../capturing_traffic_generator.py            | 22 ++++++++++++++-
>  .../testbed_model/traffic_generator/scapy.py  | 27 +++++++++++++++++++
>  5 files changed, 79 insertions(+), 6 deletions(-)
>

<snip>

> diff --git a/dts/framework/testbed_model/traffic_generator/capturing_traffic_generator.py b/dts/framework/testbed_model/traffic_generator/capturing_traffic_generator.py
> index 0246590333..c1c9facedd 100644
> --- a/dts/framework/testbed_model/traffic_generator/capturing_traffic_generator.py
> +++ b/dts/framework/testbed_model/traffic_generator/capturing_traffic_generator.py
<snip>
> @@ -26,6 +27,19 @@ def _get_default_capture_name() -> str:
>      return str(uuid.uuid4())
>
>
> + at dataclass(slots=True)

This should also be frozen. If we need a different filter, it's better
to create a new object I think.

> +class PacketFilteringConfig:
> +    """The supported filtering options for :class:`CapturingTrafficGenerator`.
> +
> +    Attributes:
> +        no_lldp: If :data:`True`, LLDP packets will be filtered out when capturing.
> +        no_arp: If :data:`True`, ARP packets will be filtered out when capturing.
> +    """
> +
> +    no_lldp: bool = True
> +    no_arp: bool = True
> +
> +
>  class CapturingTrafficGenerator(TrafficGenerator):
>      """Capture packets after sending traffic.
>
<snip>
> diff --git a/dts/framework/testbed_model/traffic_generator/scapy.py b/dts/framework/testbed_model/traffic_generator/scapy.py
> index 5b60f66237..505de0be94 100644
> --- a/dts/framework/testbed_model/traffic_generator/scapy.py
> +++ b/dts/framework/testbed_model/traffic_generator/scapy.py
<snip>
> @@ -260,11 +263,34 @@ def _send_packets(self, packets: list[Packet], port: Port) -> None:
>          packets = [packet.build() for packet in packets]
>          self.rpc_server_proxy.scapy_send_packets(packets, port.logical_name)
>
> +    def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str:
> +        """Combines filter settings from `filter_config` into a BPF that scapy can use.
> +
> +        Scapy allows for the use of Berkeley Packet Filters (BPFs) to filter what packets are
> +        collected based on various attributes of the packet.
> +
> +        Args:
> +            filter_config: Config class that specifies which filters should be applied.
> +
> +        Returns:
> +            A string representing the combination of BPF filters to be passed to scapy. For
> +            example:
> +
> +            "ether[12:2] != 0x88cc && ether[12:2] != 0x0806"
> +        """
> +        bpf_filter: list[str] = []

The type hint here is not needed, so let's make this consistent with
the rest of the code - we don't specify local type hints if they're
not necessary.

> +        if filter_config.no_arp:
> +            bpf_filter.append("ether[12:2] != 0x0806")
> +        if filter_config.no_lldp:
> +            bpf_filter.append("ether[12:2] != 0x88cc")
> +        return " && ".join(bpf_filter)
> +
>      def _send_packets_and_capture(
>          self,
>          packets: list[Packet],
>          send_port: Port,
>          receive_port: Port,
> +        filter_config: PacketFilteringConfig,
>          duration: float,
>          capture_name: str = _get_default_capture_name(),
>      ) -> list[Packet]:


More information about the dev mailing list