diff --git a/modules/test/protocol/bin/get_bacnet_packets.sh b/modules/test/protocol/bin/get_bacnet_packets.sh new file mode 100644 index 000000000..217e56b9c --- /dev/null +++ b/modules/test/protocol/bin/get_bacnet_packets.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +CAPTURE_FILE="$1" +OBJECT_ID="$2" + +TSHARK_OUTPUT="-T json -e ip.src -e ip.dst -e eth.src -e eth.dst -e bacapp.instance_number" +TSHARK_FILTER="bacapp.instance_number == $OBJECT_ID" + +response=$(tshark -r "$CAPTURE_FILE" $TSHARK_OUTPUT $TSHARK_FILTER) + +echo "$response" + \ No newline at end of file diff --git a/modules/test/protocol/protocol.Dockerfile b/modules/test/protocol/protocol.Dockerfile index abfbc16b0..6f55520e1 100644 --- a/modules/test/protocol/protocol.Dockerfile +++ b/modules/test/protocol/protocol.Dockerfile @@ -15,6 +15,12 @@ # Image name: test-run/protocol-test FROM test-run/base-test:latest +# Set DEBIAN_FRONTEND to noninteractive mode +ENV DEBIAN_FRONTEND=noninteractive + +# Install required software +RUN apt-get update && apt-get install -y tshark + ARG MODULE_NAME=protocol ARG MODULE_DIR=modules/test/$MODULE_NAME @@ -30,5 +36,8 @@ COPY $MODULE_DIR/conf /testrun/conf # Copy over all binary files COPY $MODULE_DIR/bin /testrun/bin +# Copy over all binary files +COPY $MODULE_DIR/bin /testrun/bin + # Copy over all python files COPY $MODULE_DIR/python /testrun/python \ No newline at end of file diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index a987a9f4c..a17c9cdd3 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -15,6 +15,9 @@ import BAC0 import logging +import json +from common import util +import os from BAC0.core.io.IOExceptions import (UnknownPropertyError, ReadPropertyException, NoResponseFromController, @@ -22,12 +25,20 @@ LOGGER = None BAC0_LOG = '/root/.BAC0/BAC0.log' +DEFAULT_CAPTURES_DIR = '/runtime/output' +DEFAULT_CAPTURE_FILE = 'protocol.pcap' +DEFAULT_BIN_DIR = '/testrun/bin' class BACnet(): """BACnet Test module""" - def __init__(self, log): + def __init__(self, + log, + captures_dir=DEFAULT_CAPTURES_DIR, + capture_file=DEFAULT_CAPTURE_FILE, + bin_dir=DEFAULT_BIN_DIR, + device_hw_addr=None): # Set the log global LOGGER LOGGER = log @@ -37,8 +48,13 @@ def __init__(self, log): stdout=logging.INFO, stderr=logging.CRITICAL) + self._captures_dir = captures_dir + self._capture_file = capture_file + self._bin_dir = bin_dir + self.device_hw_addr = device_hw_addr self.devices = [] self.bacnet = None + self._bin_dir = bin_dir def discover(self, local_ip=None): LOGGER.info('Performing BACnet discovery...') @@ -52,29 +68,32 @@ def discover(self, local_ip=None): with open(BAC0_LOG, 'r', encoding='utf-8') as f: bac0_log = f.read() LOGGER.info('BAC0 Log:\n' + bac0_log) - self.devices = self.bacnet.devices + LOGGER.info('BACnet devices found: ' + str(len(self.devices))) # Check if the device being tested is in the discovered devices list - def validate_device(self, local_ip, device_ip): - LOGGER.info('Validating BACnet device: ' + device_ip) - self.discover(local_ip + '/24') - LOGGER.info('BACnet devices found: ' + str(len(self.devices))) - if len(self.devices) > 0: - result = (False, - 'BACnet device was found but was not device under test') - for device in self.devices: - address = device[2] - LOGGER.info('Checking device: ' + str(device)) - if device_ip in address: - result = True, 'BACnet device discovered' - break - else: - result = ('Feature Not Detected', - 'BACnet device could not be discovered') - if result is not None: - LOGGER.info(result[1]) - return result + # discover needs to be called before this method is invoked + def validate_device(self): + result = None + description = '' + try: + if len(self.devices) > 0: + result = True + for device in self.devices: + object_id = str(device[3]) # BACnet Object ID + LOGGER.info('Checking device: ' + str(device)) + result &= self.validate_bacnet_source( + object_id=object_id, device_hw_addr=self.device_hw_addr) + description = ('BACnet device discovered' if result else + 'BACnet device was found but was not device under test') + else: + result = 'Feature Not Detected' + description = 'BACnet device could not be discovered' + LOGGER.info(description) + except Exception: # pylint: disable=W0718 + LOGGER.error('Error occured when validating device', exc_info=True) + return result, description + def validate_protocol_version(self, device_ip, device_id): LOGGER.info(f'Resolving protocol version for BACnet device: {device_id}') @@ -85,11 +104,44 @@ def validate_protocol_version(self, device_ip, device_id): f'{device_ip} device {device_id} protocolRevision') protocol_version = f'{version}.{revision}' result = True - result_description = ( - f'Device uses BACnet version {protocol_version}') + result_description = f'Device uses BACnet version {protocol_version}' except (UnknownPropertyError, ReadPropertyException, NoResponseFromController, DeviceNotConnected) as e: result = False result_description = f'Failed to resolve protocol version {e}' LOGGER.error(result_description) return result, result_description + + # Validate that all traffic to/from BACnet device from + # discovered object id matches the MAC address of the device + def validate_bacnet_source(self, object_id, device_hw_addr): + try: + LOGGER.info(f'Checking BACnet traffic for object id {object_id}') + capture_file = os.path.join(self._captures_dir, self._capture_file) + packets = self.get_bacnet_packets(capture_file, object_id) + valid = None + for packet in packets: + if object_id in packet['_source']['layers']['bacapp.instance_number']: + if device_hw_addr.lower() in packet['_source']['layers']['eth.src']: + LOGGER.debug('BACnet detected from device') + valid = True if valid is None else valid and True + elif device_hw_addr.lower() in packet['_source']['layers']['eth.dst']: + LOGGER.debug('BACnet detected to device') + valid = valid = True if valid is None else valid and True + else: + LOGGER.debug('BACnet detected for wrong MAC address') + src = packet['_source']['layers']['eth.src'][0] + dst = packet['_source']['layers']['eth.dst'][0] + LOGGER.debug(f'From: {src} To: {dst} Expected: {device_hw_addr}') + valid = False + return valid + except Exception: # pylint: disable=W0718 + LOGGER.error('Error occured when validating source', exc_info=True) + return False + + def get_bacnet_packets(self, capture_file, object_id): + bin_file = self._bin_dir + '/get_bacnet_packets.sh' + args = f'"{capture_file}" {object_id}' + command = f'{bin_file} {args}' + response = util.run_command(command) + return json.loads(response[0].strip()) diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index bfb248cd5..407a46777 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -29,13 +29,12 @@ def __init__(self, module): super().__init__(module_name=module, log_name=LOG_NAME) global LOGGER LOGGER = self._get_logger() - self._bacnet = BACnet(LOGGER) + self._bacnet = BACnet(log=LOGGER,device_hw_addr=self._device_mac) def _protocol_valid_bacnet(self): LOGGER.info('Running protocol.valid_bacnet') result = None interface_name = 'veth0' - # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is None: self._device_ipv4_addr = self._get_device_ipv4() @@ -47,8 +46,8 @@ def _protocol_valid_bacnet(self): # Resolve the appropriate IP for BACnet comms local_address = self.get_local_ip(interface_name) if local_address: - result = self._bacnet.validate_device(local_address, - self._device_ipv4_addr) + self._bacnet.discover(local_address + '/24') + result = self._bacnet.validate_device() if result[0]: self._supports_bacnet = True else: diff --git a/testing/unit/protocol/captures/bacnet.pcap b/testing/unit/protocol/captures/bacnet.pcap new file mode 100644 index 000000000..4db710b47 Binary files /dev/null and b/testing/unit/protocol/captures/bacnet.pcap differ diff --git a/testing/unit/protocol/protocol_module_test.py b/testing/unit/protocol/protocol_module_test.py new file mode 100644 index 000000000..32a0021cd --- /dev/null +++ b/testing/unit/protocol/protocol_module_test.py @@ -0,0 +1,106 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module run all the DNS related unit tests""" +from protocol_bacnet import BACnet +import unittest +import os +from common import logger +import inspect + +MODULE = 'protocol' + +# Define the directories +TEST_FILES_DIR = 'testing/unit/' + MODULE +OUTPUT_DIR = os.path.join(TEST_FILES_DIR, 'output/') +REPORTS_DIR = os.path.join(TEST_FILES_DIR, 'reports/') +CAPTURES_DIR = os.path.join(TEST_FILES_DIR, 'captures/') + +# Define the capture files to be used for the test +PROTOCOL_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'bacnet.pcap') + +HW_ADDR = 'AA:BB:CC:DD:EE:FF' +HW_ADDR_BAD = 'AA:BB:CC:DD:EE:FE' +BACNET = None +LOGGER = None + + +class ProtocolModuleTest(unittest.TestCase): + """Contains and runs all the unit tests concerning DNS behaviors""" + + @classmethod + def setUpClass(cls): + global LOGGER + LOGGER = logger.get_logger('unit_test_' + MODULE) + global BACNET + BACNET = BACnet(log=LOGGER, + captures_dir=CAPTURES_DIR, + capture_file='bacnet.pcap', + bin_dir='modules/test/protocol/bin', + device_hw_addr=HW_ADDR) + + # Test the BACNet traffic for a matching Object ID and HW address + def bacnet_protocol_traffic_test(self): + LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') + result = BACNET.validate_bacnet_source(object_id='1761001', + device_hw_addr=HW_ADDR) + LOGGER.info(f'Test Result: {result}') + self.assertEqual(result, True) + + # Test the BACNet test when Object ID and HW address + # do not match + def bacnet_protocol_traffic_fail_test(self): + LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') + result = BACNET.validate_bacnet_source(object_id='1761001', + device_hw_addr=HW_ADDR_BAD) + LOGGER.info(f'Test Result: {result}') + self.assertEqual(result, False) + + # Test a BACnet device with valid traffic to/from an + # expected HW address and Object ID + def bacnet_protocol_validate_device_test(self): + LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') + # Load bacnet devices to simulate a discovery + bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001) + BACNET.devices = [bac_dev] + result = BACNET.validate_device() + LOGGER.info(f'Test Result: {result}') + self.assertEqual(result, (True, 'BACnet device discovered')) + + # Test a BACnet device with valid traffic to/from an + # expected HW address and Object ID + def bacnet_protocol_validate_device_fail_test(self): + LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') + # Load bacnet devices to simulate a discovery + bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001) + BACNET.devices = [bac_dev] + # Change the MAC address to a different device than expected + BACNET.device_hw_addr = HW_ADDR_BAD + result = BACNET.validate_device() + LOGGER.info(f'Test Result: {result}') + self.assertEqual( + result, + (False, 'BACnet device was found but was not device under test')) + + +if __name__ == '__main__': + suite = unittest.TestSuite() + + suite.addTest(ProtocolModuleTest('bacnet_protocol_traffic_test')) + suite.addTest(ProtocolModuleTest('bacnet_protocol_traffic_fail_test')) + + suite.addTest(ProtocolModuleTest('bacnet_protocol_validate_device_test')) + suite.addTest(ProtocolModuleTest('bacnet_protocol_validate_device_fail_test')) + + runner = unittest.TextTestRunner() + runner.run(suite) diff --git a/testing/unit/run_tests.sh b/testing/unit/run_tests.sh index 975627fc3..cec5452fd 100644 --- a/testing/unit/run_tests.sh +++ b/testing/unit/run_tests.sh @@ -1,64 +1,69 @@ -#!/bin/bash -e - -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This script should be run from within the unit_test directory. If -# it is run outside this directory, paths will not be resolved correctly. - -# Move into the root directory of test-run -pushd ../../ >/dev/null 2>&1 - -echo "Root dir: $PWD" - -# Add the framework sources -PYTHONPATH="$PWD/framework/python/src:$PWD/framework/python/src/common" - -# Add the test module sources -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/base/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/conn/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/tls/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/dns/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/services/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/ntp/python/src" - -# Set the python path with all sources -export PYTHONPATH - -# Run the DHCP Unit tests -python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py -python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py - -# Run the Conn Module Unit Tests -python3 -u $PWD/testing/unit/conn/conn_module_test.py - -# Run the TLS Module Unit Tests -python3 -u $PWD/testing/unit/tls/tls_module_test.py - -# Run the DNS Module Unit Tests -python3 -u $PWD/testing/unit/dns/dns_module_test.py - -# Run the NMAP Module Unit Tests -python3 -u $PWD/testing/unit/services/services_module_test.py - -# Run the NTP Module Unit Tests -python3 -u $PWD/testing/unit/ntp/ntp_module_test.py - -# Run the Report Unit Tests -python3 -u $PWD/testing/unit/report/report_test.py - -# Run the RiskProfile Unit Tests -python3 -u $PWD/testing/unit/risk_profile/risk_profile_test.py - +#!/bin/bash -e + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script should be run from within the unit_test directory. If +# it is run outside this directory, paths will not be resolved correctly. + +# Move into the root directory of test-run +pushd ../../ >/dev/null 2>&1 + +echo "Root dir: $PWD" + +# Add the framework sources +PYTHONPATH="$PWD/framework/python/src:$PWD/framework/python/src/common" + +# Add the test module sources +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/base/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/conn/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/tls/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/dns/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/services/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/ntp/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/protocol/python/src" + + +# Set the python path with all sources +export PYTHONPATH + +# Run the DHCP Unit tests +python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py +python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py + +# Run the Conn Module Unit Tests +python3 -u $PWD/testing/unit/conn/conn_module_test.py + +# Run the TLS Module Unit Tests +python3 -u $PWD/testing/unit/tls/tls_module_test.py + +# Run the DNS Module Unit Tests +python3 -u $PWD/testing/unit/dns/dns_module_test.py + +# Run the NMAP Module Unit Tests +python3 -u $PWD/testing/unit/services/services_module_test.py + +# Run the NTP Module Unit Tests +python3 -u $PWD/testing/unit/ntp/ntp_module_test.py + +# Run the Report Unit Tests +python3 -u $PWD/testing/unit/report/report_test.py + +# Run the RiskProfile Unit Tests +python3 -u $PWD/testing/unit/risk_profile/risk_profile_test.py + +# Run the RiskProfile Unit Tests +python3 -u $PWD/testing/unit/protocol/protocol_module_test.py + popd >/dev/null 2>&1 \ No newline at end of file