From 0e15713d7c2b0667ff8507bf0fb63c324510b824 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Mon, 1 Jul 2024 13:37:09 -0600 Subject: [PATCH 1/4] Change BACnet device detection validation Add unit tests --- .../test/protocol/bin/get_bacnet_packets.sh | 26 ++++ .../protocol/python/src/protocol_bacnet.py | 79 ++++++++--- .../protocol/python/src/protocol_module.py | 4 +- testing/unit/protocol/captures/bacnet.pcap | Bin 0 -> 1004 bytes testing/unit/protocol/protocol_module_test.py | 106 ++++++++++++++ testing/unit/run_tests.sh | 131 +++++++++--------- 6 files changed, 264 insertions(+), 82 deletions(-) create mode 100644 modules/test/protocol/bin/get_bacnet_packets.sh create mode 100644 testing/unit/protocol/captures/bacnet.pcap create mode 100644 testing/unit/protocol/protocol_module_test.py diff --git a/modules/test/protocol/bin/get_bacnet_packets.sh b/modules/test/protocol/bin/get_bacnet_packets.sh new file mode 100644 index 000000000..217e56b9c --- /dev/null +++ b/modules/test/protocol/bin/get_bacnet_packets.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +CAPTURE_FILE="$1" +OBJECT_ID="$2" + +TSHARK_OUTPUT="-T json -e ip.src -e ip.dst -e eth.src -e eth.dst -e bacapp.instance_number" +TSHARK_FILTER="bacapp.instance_number == $OBJECT_ID" + +response=$(tshark -r "$CAPTURE_FILE" $TSHARK_OUTPUT $TSHARK_FILTER) + +echo "$response" + \ No newline at end of file diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index a987a9f4c..cc0ff77e5 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -15,6 +15,9 @@ import BAC0 import logging +import json +from common import util +import os from BAC0.core.io.IOExceptions import (UnknownPropertyError, ReadPropertyException, NoResponseFromController, @@ -22,12 +25,20 @@ LOGGER = None BAC0_LOG = '/root/.BAC0/BAC0.log' +DEFAULT_CAPTURES_DIR = '/runtime/output' +DEFAULT_CAPTURE_FILE = 'protocol.pcap' +DEFAULT_BIN_DIR = '/testrun/bin' class BACnet(): """BACnet Test module""" - def __init__(self, log): + def __init__(self, + log, + captures_dir=DEFAULT_CAPTURES_DIR, + capture_file=DEFAULT_CAPTURE_FILE, + bin_dir=DEFAULT_BIN_DIR, + device_hw_addr=None): # Set the log global LOGGER LOGGER = log @@ -37,8 +48,13 @@ def __init__(self, log): stdout=logging.INFO, stderr=logging.CRITICAL) + self._captures_dir = captures_dir + self._capture_file = capture_file + self._bin_dir = bin_dir + self.device_hw_addr = device_hw_addr self.devices = [] self.bacnet = None + self._bin_dir = bin_dir def discover(self, local_ip=None): LOGGER.info('Performing BACnet discovery...') @@ -56,25 +72,25 @@ def discover(self, local_ip=None): self.devices = self.bacnet.devices # Check if the device being tested is in the discovered devices list - def validate_device(self, local_ip, device_ip): - LOGGER.info('Validating BACnet device: ' + device_ip) - self.discover(local_ip + '/24') + # discover needs to be called before this method is invoked + def validate_device(self): LOGGER.info('BACnet devices found: ' + str(len(self.devices))) + result = None + description = '' if len(self.devices) > 0: - result = (False, - 'BACnet device was found but was not device under test') + result = True for device in self.devices: - address = device[2] + object_id = device[3] # BACnet Object ID LOGGER.info('Checking device: ' + str(device)) - if device_ip in address: - result = True, 'BACnet device discovered' - break + result &= self.validate_bacnet_source( + object_id=object_id, device_hw_addr=self.device_hw_addr) + description = ('BACnet device discovered' if result else + 'BACnet device was found but was not device under test') else: - result = ('Feature Not Detected', - 'BACnet device could not be discovered') - if result is not None: - LOGGER.info(result[1]) - return result + result = 'Feature Not Detected' + description = 'BACnet device could not be discovered' + LOGGER.info(description) + return result, description def validate_protocol_version(self, device_ip, device_id): LOGGER.info(f'Resolving protocol version for BACnet device: {device_id}') @@ -85,11 +101,40 @@ def validate_protocol_version(self, device_ip, device_id): f'{device_ip} device {device_id} protocolRevision') protocol_version = f'{version}.{revision}' result = True - result_description = ( - f'Device uses BACnet version {protocol_version}') + result_description = (f'Device uses BACnet version {protocol_version}') except (UnknownPropertyError, ReadPropertyException, NoResponseFromController, DeviceNotConnected) as e: result = False result_description = f'Failed to resolve protocol version {e}' LOGGER.error(result_description) return result, result_description + + # Validate that all traffic to/from BACnet device from + # discovered object id matches the MAC address of the device + def validate_bacnet_source(self, object_id, device_hw_addr): + capture_file = os.path.join(self._captures_dir, self._capture_file) + packets = self.get_bacnet_packets(capture_file, object_id) + valid = None + for packet in packets: + if object_id in packet['_source']['layers']['bacapp.instance_number']: + LOGGER.debug(f'BACnet traffic for instance number: {object_id}') + if device_hw_addr.lower() in packet['_source']['layers']['eth.src']: + LOGGER.debug('BACnet detected from device') + valid = True if valid is None else valid and True + elif device_hw_addr.lower() in packet['_source']['layers']['eth.dst']: + LOGGER.debug('BACnet detected to device') + valid = valid = True if valid is None else valid and True + else: + LOGGER.debug('BACnet detected for wrong MAC address') + src = packet['_source']['layers']['eth.src'][0] + dst = packet['_source']['layers']['eth.dst'][0] + LOGGER.debug(f'From: {src} To: {dst} Expected: {device_hw_addr}') + valid = False + return valid + + def get_bacnet_packets(self, capture_file, object_id): + bin_file = self._bin_dir + '/get_bacnet_packets.sh' + args = f'"{capture_file}" {object_id}' + command = f'{bin_file} {args}' + response = util.run_command(command) + return json.loads(response[0].strip()) diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index bfb248cd5..05f1b803d 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -47,8 +47,8 @@ def _protocol_valid_bacnet(self): # Resolve the appropriate IP for BACnet comms local_address = self.get_local_ip(interface_name) if local_address: - result = self._bacnet.validate_device(local_address, - self._device_ipv4_addr) + self._bacnet.discover(local_address + '/24') + result = self._bacnet.validate_device() if result[0]: self._supports_bacnet = True else: diff --git a/testing/unit/protocol/captures/bacnet.pcap b/testing/unit/protocol/captures/bacnet.pcap new file mode 100644 index 0000000000000000000000000000000000000000..4db710b478788d0c5a19cf438f737fbe5263788b GIT binary patch literal 1004 zcmca|c+)~A1{MYw`2U}Qff2~LTi~nIre7mBgCwH@(47AQ3`dys7RxEIFid1nV)gp#s+3)pe87|3N*{c3ZG-jwt>u=^#i+O7(fO@ z#vmL6@ePt=R9A+949EzCItCKvf_J6@eZ$GvzzB{xh-@ zcL$kuIUMR3K8VLOd>|fUV*r_8gyb5`pegVLnXm}v8cy6na}#J*mJh;b7_QN?e-1M1 z(pnVPNNI!tT_eP(0QA^@0S-FDRj;*~$eB zB}GOJa0+JwYLvS7@*gOP0%;}IBNLewH-L=3a)+706DSM97|yhIU;#U`8O53E8p%Ls z$}_4kFhb4!&s4(5#{)81j?/dev/null 2>&1 - -echo "Root dir: $PWD" - -# Add the framework sources -PYTHONPATH="$PWD/framework/python/src:$PWD/framework/python/src/common" - -# Add the test module sources -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/base/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/conn/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/tls/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/dns/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/services/python/src" -PYTHONPATH="$PYTHONPATH:$PWD/modules/test/ntp/python/src" - -# Set the python path with all sources -export PYTHONPATH - -# Run the DHCP Unit tests -python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py -python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py - -# Run the Conn Module Unit Tests -python3 -u $PWD/testing/unit/conn/conn_module_test.py - -# Run the TLS Module Unit Tests -python3 -u $PWD/testing/unit/tls/tls_module_test.py - -# Run the DNS Module Unit Tests -python3 -u $PWD/testing/unit/dns/dns_module_test.py - -# Run the NMAP Module Unit Tests -python3 -u $PWD/testing/unit/services/services_module_test.py - -# Run the NTP Module Unit Tests -python3 -u $PWD/testing/unit/ntp/ntp_module_test.py - -# Run the Report Unit Tests -python3 -u $PWD/testing/unit/report/report_test.py - -# Run the RiskProfile Unit Tests -python3 -u $PWD/testing/unit/risk_profile/risk_profile_test.py - +#!/bin/bash -e + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script should be run from within the unit_test directory. If +# it is run outside this directory, paths will not be resolved correctly. + +# Move into the root directory of test-run +pushd ../../ >/dev/null 2>&1 + +echo "Root dir: $PWD" + +# Add the framework sources +PYTHONPATH="$PWD/framework/python/src:$PWD/framework/python/src/common" + +# Add the test module sources +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/base/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/conn/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/tls/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/dns/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/services/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/ntp/python/src" +PYTHONPATH="$PYTHONPATH:$PWD/modules/test/protocol/python/src" + + +# Set the python path with all sources +export PYTHONPATH + +# Run the DHCP Unit tests +python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py +python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py + +# Run the Conn Module Unit Tests +python3 -u $PWD/testing/unit/conn/conn_module_test.py + +# Run the TLS Module Unit Tests +python3 -u $PWD/testing/unit/tls/tls_module_test.py + +# Run the DNS Module Unit Tests +python3 -u $PWD/testing/unit/dns/dns_module_test.py + +# Run the NMAP Module Unit Tests +python3 -u $PWD/testing/unit/services/services_module_test.py + +# Run the NTP Module Unit Tests +python3 -u $PWD/testing/unit/ntp/ntp_module_test.py + +# Run the Report Unit Tests +python3 -u $PWD/testing/unit/report/report_test.py + +# Run the RiskProfile Unit Tests +python3 -u $PWD/testing/unit/risk_profile/risk_profile_test.py + +# Run the RiskProfile Unit Tests +python3 -u $PWD/testing/unit/protocol/protocol_module_test.py + popd >/dev/null 2>&1 \ No newline at end of file From adb1fd5eda25bfc0145ad47d8a43dc15f0a6413f Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Tue, 2 Jul 2024 14:31:52 -0600 Subject: [PATCH 2/4] Fix runtime --- modules/test/protocol/protocol.Dockerfile | 9 +++ .../protocol/python/src/protocol_bacnet.py | 78 +++++++++++-------- .../protocol/python/src/protocol_module.py | 36 +++++---- 3 files changed, 73 insertions(+), 50 deletions(-) diff --git a/modules/test/protocol/protocol.Dockerfile b/modules/test/protocol/protocol.Dockerfile index abfbc16b0..6f55520e1 100644 --- a/modules/test/protocol/protocol.Dockerfile +++ b/modules/test/protocol/protocol.Dockerfile @@ -15,6 +15,12 @@ # Image name: test-run/protocol-test FROM test-run/base-test:latest +# Set DEBIAN_FRONTEND to noninteractive mode +ENV DEBIAN_FRONTEND=noninteractive + +# Install required software +RUN apt-get update && apt-get install -y tshark + ARG MODULE_NAME=protocol ARG MODULE_DIR=modules/test/$MODULE_NAME @@ -30,5 +36,8 @@ COPY $MODULE_DIR/conf /testrun/conf # Copy over all binary files COPY $MODULE_DIR/bin /testrun/bin +# Copy over all binary files +COPY $MODULE_DIR/bin /testrun/bin + # Copy over all python files COPY $MODULE_DIR/python /testrun/python \ No newline at end of file diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index cc0ff77e5..0a8ab2a07 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -68,30 +68,34 @@ def discover(self, local_ip=None): with open(BAC0_LOG, 'r', encoding='utf-8') as f: bac0_log = f.read() LOGGER.info('BAC0 Log:\n' + bac0_log) - self.devices = self.bacnet.devices + LOGGER.info('BACnet devices found: ' + str(len(self.devices))) # Check if the device being tested is in the discovered devices list # discover needs to be called before this method is invoked def validate_device(self): - LOGGER.info('BACnet devices found: ' + str(len(self.devices))) result = None description = '' - if len(self.devices) > 0: - result = True - for device in self.devices: - object_id = device[3] # BACnet Object ID - LOGGER.info('Checking device: ' + str(device)) - result &= self.validate_bacnet_source( - object_id=object_id, device_hw_addr=self.device_hw_addr) - description = ('BACnet device discovered' if result else - 'BACnet device was found but was not device under test') - else: - result = 'Feature Not Detected' - description = 'BACnet device could not be discovered' - LOGGER.info(description) + try: + if len(self.devices) > 0: + result = True + for device in self.devices: + object_id = str(device[3]) # BACnet Object ID + LOGGER.info('Checking device: ' + str(device)) + result &= self.validate_bacnet_source( + object_id=object_id, device_hw_addr=self.device_hw_addr) + description = ('BACnet device discovered' if result else + 'BACnet device was found but was not device under test') + else: + result = 'Feature Not Detected' + description = 'BACnet device could not be discovered' + LOGGER.info(description) + except Exception as e: + LOGGER.error('Error occured when validaing device', e) + LOGGER.error('Error occured when validting device', exc_info=True) return result, description + def validate_protocol_version(self, device_ip, device_id): LOGGER.info(f'Resolving protocol version for BACnet device: {device_id}') try: @@ -112,25 +116,31 @@ def validate_protocol_version(self, device_ip, device_id): # Validate that all traffic to/from BACnet device from # discovered object id matches the MAC address of the device def validate_bacnet_source(self, object_id, device_hw_addr): - capture_file = os.path.join(self._captures_dir, self._capture_file) - packets = self.get_bacnet_packets(capture_file, object_id) - valid = None - for packet in packets: - if object_id in packet['_source']['layers']['bacapp.instance_number']: - LOGGER.debug(f'BACnet traffic for instance number: {object_id}') - if device_hw_addr.lower() in packet['_source']['layers']['eth.src']: - LOGGER.debug('BACnet detected from device') - valid = True if valid is None else valid and True - elif device_hw_addr.lower() in packet['_source']['layers']['eth.dst']: - LOGGER.debug('BACnet detected to device') - valid = valid = True if valid is None else valid and True - else: - LOGGER.debug('BACnet detected for wrong MAC address') - src = packet['_source']['layers']['eth.src'][0] - dst = packet['_source']['layers']['eth.dst'][0] - LOGGER.debug(f'From: {src} To: {dst} Expected: {device_hw_addr}') - valid = False - return valid + try: + LOGGER.info(f'Checking BACnet traffic for object id {object_id}') + capture_file = os.path.join(self._captures_dir, self._capture_file) + packets = self.get_bacnet_packets(capture_file, object_id) + valid = None + for packet in packets: + packet_obj_id = packet['_source']['layers']['bacapp.instance_number'] + if object_id in packet['_source']['layers']['bacapp.instance_number']: + if device_hw_addr.lower() in packet['_source']['layers']['eth.src']: + LOGGER.debug('BACnet detected from device') + valid = True if valid is None else valid and True + elif device_hw_addr.lower() in packet['_source']['layers']['eth.dst']: + LOGGER.debug('BACnet detected to device') + valid = valid = True if valid is None else valid and True + else: + LOGGER.debug('BACnet detected for wrong MAC address') + src = packet['_source']['layers']['eth.src'][0] + dst = packet['_source']['layers']['eth.dst'][0] + LOGGER.debug(f'From: {src} To: {dst} Expected: {device_hw_addr}') + valid = False + return valid + except Exception as e: + LOGGER.error(e) + LOGGER.error('Error occured when validating source', exc_info=True) + return False def get_bacnet_packets(self, capture_file, object_id): bin_file = self._bin_dir + '/get_bacnet_packets.sh' diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index 05f1b803d..40efddd83 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -29,30 +29,34 @@ def __init__(self, module): super().__init__(module_name=module, log_name=LOG_NAME) global LOGGER LOGGER = self._get_logger() - self._bacnet = BACnet(LOGGER) + self._bacnet = BACnet(log=LOGGER,device_hw_addr=self._device_mac) def _protocol_valid_bacnet(self): LOGGER.info('Running protocol.valid_bacnet') result = None interface_name = 'veth0' + try: - # If the ipv4 address wasn't resolved yet, try again - if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4() + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is None: + self._device_ipv4_addr = self._get_device_ipv4() - if self._device_ipv4_addr is None: - LOGGER.error('No device IP could be resolved') - return 'Error', 'Could not resolve device IP address' + if self._device_ipv4_addr is None: + LOGGER.error('No device IP could be resolved') + return 'Error', 'Could not resolve device IP address' - # Resolve the appropriate IP for BACnet comms - local_address = self.get_local_ip(interface_name) - if local_address: - self._bacnet.discover(local_address + '/24') - result = self._bacnet.validate_device() - if result[0]: - self._supports_bacnet = True - else: - result = 'Error', 'Failed to perform BACnet discovery' + # Resolve the appropriate IP for BACnet comms + local_address = self.get_local_ip(interface_name) + if local_address: + self._bacnet.discover(local_address + '/24') + result = self._bacnet.validate_device() + if result[0]: + self._supports_bacnet = True + else: + result = 'Error', 'Failed to perform BACnet discovery' + except Exception as e: + LOGGER.error('Error occured when validaing bacnet', e) + LOGGER.error('Error occured when validting bacnet', exc_info=True) return result def _protocol_bacnet_version(self): From ea8f296af24d09702dcd6be0c9df449a913807a9 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Tue, 2 Jul 2024 14:39:11 -0600 Subject: [PATCH 3/4] cleanup --- .../protocol/python/src/protocol_bacnet.py | 11 +++--- .../protocol/python/src/protocol_module.py | 35 ++++++++----------- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index 0a8ab2a07..a17c9cdd3 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -90,9 +90,8 @@ def validate_device(self): result = 'Feature Not Detected' description = 'BACnet device could not be discovered' LOGGER.info(description) - except Exception as e: - LOGGER.error('Error occured when validaing device', e) - LOGGER.error('Error occured when validting device', exc_info=True) + except Exception: # pylint: disable=W0718 + LOGGER.error('Error occured when validating device', exc_info=True) return result, description @@ -105,7 +104,7 @@ def validate_protocol_version(self, device_ip, device_id): f'{device_ip} device {device_id} protocolRevision') protocol_version = f'{version}.{revision}' result = True - result_description = (f'Device uses BACnet version {protocol_version}') + result_description = f'Device uses BACnet version {protocol_version}' except (UnknownPropertyError, ReadPropertyException, NoResponseFromController, DeviceNotConnected) as e: result = False @@ -122,7 +121,6 @@ def validate_bacnet_source(self, object_id, device_hw_addr): packets = self.get_bacnet_packets(capture_file, object_id) valid = None for packet in packets: - packet_obj_id = packet['_source']['layers']['bacapp.instance_number'] if object_id in packet['_source']['layers']['bacapp.instance_number']: if device_hw_addr.lower() in packet['_source']['layers']['eth.src']: LOGGER.debug('BACnet detected from device') @@ -137,8 +135,7 @@ def validate_bacnet_source(self, object_id, device_hw_addr): LOGGER.debug(f'From: {src} To: {dst} Expected: {device_hw_addr}') valid = False return valid - except Exception as e: - LOGGER.error(e) + except Exception: # pylint: disable=W0718 LOGGER.error('Error occured when validating source', exc_info=True) return False diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index 40efddd83..407a46777 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -35,28 +35,23 @@ def _protocol_valid_bacnet(self): LOGGER.info('Running protocol.valid_bacnet') result = None interface_name = 'veth0' - try: - - # If the ipv4 address wasn't resolved yet, try again - if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4() + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is None: + self._device_ipv4_addr = self._get_device_ipv4() - if self._device_ipv4_addr is None: - LOGGER.error('No device IP could be resolved') - return 'Error', 'Could not resolve device IP address' + if self._device_ipv4_addr is None: + LOGGER.error('No device IP could be resolved') + return 'Error', 'Could not resolve device IP address' - # Resolve the appropriate IP for BACnet comms - local_address = self.get_local_ip(interface_name) - if local_address: - self._bacnet.discover(local_address + '/24') - result = self._bacnet.validate_device() - if result[0]: - self._supports_bacnet = True - else: - result = 'Error', 'Failed to perform BACnet discovery' - except Exception as e: - LOGGER.error('Error occured when validaing bacnet', e) - LOGGER.error('Error occured when validting bacnet', exc_info=True) + # Resolve the appropriate IP for BACnet comms + local_address = self.get_local_ip(interface_name) + if local_address: + self._bacnet.discover(local_address + '/24') + result = self._bacnet.validate_device() + if result[0]: + self._supports_bacnet = True + else: + result = 'Error', 'Failed to perform BACnet discovery' return result def _protocol_bacnet_version(self): From 89640b5ecd5d6a141c9fbe5fced5f10c2d2b2298 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Tue, 2 Jul 2024 14:42:25 -0600 Subject: [PATCH 4/4] Update unit tests to match runtime types --- testing/unit/protocol/protocol_module_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing/unit/protocol/protocol_module_test.py b/testing/unit/protocol/protocol_module_test.py index afec002f4..32a0021cd 100644 --- a/testing/unit/protocol/protocol_module_test.py +++ b/testing/unit/protocol/protocol_module_test.py @@ -71,7 +71,7 @@ def bacnet_protocol_traffic_fail_test(self): def bacnet_protocol_validate_device_test(self): LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') # Load bacnet devices to simulate a discovery - bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', '1761001') + bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001) BACNET.devices = [bac_dev] result = BACNET.validate_device() LOGGER.info(f'Test Result: {result}') @@ -82,7 +82,7 @@ def bacnet_protocol_validate_device_test(self): def bacnet_protocol_validate_device_fail_test(self): LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') # Load bacnet devices to simulate a discovery - bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', '1761001') + bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001) BACNET.devices = [bac_dev] # Change the MAC address to a different device than expected BACNET.device_hw_addr = HW_ADDR_BAD