Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions modules/test/protocol/bin/get_bacnet_packets.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

CAPTURE_FILE="$1"
OBJECT_ID="$2"

TSHARK_OUTPUT="-T json -e ip.src -e ip.dst -e eth.src -e eth.dst -e bacapp.instance_number"
TSHARK_FILTER="bacapp.instance_number == $OBJECT_ID"

response=$(tshark -r "$CAPTURE_FILE" $TSHARK_OUTPUT $TSHARK_FILTER)

echo "$response"

9 changes: 9 additions & 0 deletions modules/test/protocol/protocol.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
# Image name: test-run/protocol-test
FROM test-run/base-test:latest

# Set DEBIAN_FRONTEND to noninteractive mode
ENV DEBIAN_FRONTEND=noninteractive

# Install required software
RUN apt-get update && apt-get install -y tshark

ARG MODULE_NAME=protocol
ARG MODULE_DIR=modules/test/$MODULE_NAME

Expand All @@ -30,5 +36,8 @@ COPY $MODULE_DIR/conf /testrun/conf
# Copy over all binary files
COPY $MODULE_DIR/bin /testrun/bin

# Copy over all binary files
COPY $MODULE_DIR/bin /testrun/bin

# Copy over all python files
COPY $MODULE_DIR/python /testrun/python
98 changes: 75 additions & 23 deletions modules/test/protocol/python/src/protocol_bacnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,30 @@

import BAC0
import logging
import json
from common import util
import os
from BAC0.core.io.IOExceptions import (UnknownPropertyError,
ReadPropertyException,
NoResponseFromController,
DeviceNotConnected)

LOGGER = None
BAC0_LOG = '/root/.BAC0/BAC0.log'
DEFAULT_CAPTURES_DIR = '/runtime/output'
DEFAULT_CAPTURE_FILE = 'protocol.pcap'
DEFAULT_BIN_DIR = '/testrun/bin'


class BACnet():
"""BACnet Test module"""

def __init__(self, log):
def __init__(self,
log,
captures_dir=DEFAULT_CAPTURES_DIR,
capture_file=DEFAULT_CAPTURE_FILE,
bin_dir=DEFAULT_BIN_DIR,
device_hw_addr=None):
# Set the log
global LOGGER
LOGGER = log
Expand All @@ -37,8 +48,13 @@ def __init__(self, log):
stdout=logging.INFO,
stderr=logging.CRITICAL)

self._captures_dir = captures_dir
self._capture_file = capture_file
self._bin_dir = bin_dir
self.device_hw_addr = device_hw_addr
self.devices = []
self.bacnet = None
self._bin_dir = bin_dir

def discover(self, local_ip=None):
LOGGER.info('Performing BACnet discovery...')
Expand All @@ -52,29 +68,32 @@ def discover(self, local_ip=None):
with open(BAC0_LOG, 'r', encoding='utf-8') as f:
bac0_log = f.read()
LOGGER.info('BAC0 Log:\n' + bac0_log)

self.devices = self.bacnet.devices
LOGGER.info('BACnet devices found: ' + str(len(self.devices)))

# Check if the device being tested is in the discovered devices list
def validate_device(self, local_ip, device_ip):
LOGGER.info('Validating BACnet device: ' + device_ip)
self.discover(local_ip + '/24')
LOGGER.info('BACnet devices found: ' + str(len(self.devices)))
if len(self.devices) > 0:
result = (False,
'BACnet device was found but was not device under test')
for device in self.devices:
address = device[2]
LOGGER.info('Checking device: ' + str(device))
if device_ip in address:
result = True, 'BACnet device discovered'
break
else:
result = ('Feature Not Detected',
'BACnet device could not be discovered')
if result is not None:
LOGGER.info(result[1])
return result
# discover needs to be called before this method is invoked
def validate_device(self):
result = None
description = ''
try:
if len(self.devices) > 0:
result = True
for device in self.devices:
object_id = str(device[3]) # BACnet Object ID
LOGGER.info('Checking device: ' + str(device))
result &= self.validate_bacnet_source(
object_id=object_id, device_hw_addr=self.device_hw_addr)
description = ('BACnet device discovered' if result else
'BACnet device was found but was not device under test')
else:
result = 'Feature Not Detected'
description = 'BACnet device could not be discovered'
LOGGER.info(description)
except Exception: # pylint: disable=W0718
LOGGER.error('Error occured when validating device', exc_info=True)
return result, description


def validate_protocol_version(self, device_ip, device_id):
LOGGER.info(f'Resolving protocol version for BACnet device: {device_id}')
Expand All @@ -85,11 +104,44 @@ def validate_protocol_version(self, device_ip, device_id):
f'{device_ip} device {device_id} protocolRevision')
protocol_version = f'{version}.{revision}'
result = True
result_description = (
f'Device uses BACnet version {protocol_version}')
result_description = f'Device uses BACnet version {protocol_version}'
except (UnknownPropertyError, ReadPropertyException,
NoResponseFromController, DeviceNotConnected) as e:
result = False
result_description = f'Failed to resolve protocol version {e}'
LOGGER.error(result_description)
return result, result_description

# Validate that all traffic to/from BACnet device from
# discovered object id matches the MAC address of the device
def validate_bacnet_source(self, object_id, device_hw_addr):
try:
LOGGER.info(f'Checking BACnet traffic for object id {object_id}')
capture_file = os.path.join(self._captures_dir, self._capture_file)
packets = self.get_bacnet_packets(capture_file, object_id)
valid = None
for packet in packets:
if object_id in packet['_source']['layers']['bacapp.instance_number']:
if device_hw_addr.lower() in packet['_source']['layers']['eth.src']:
LOGGER.debug('BACnet detected from device')
valid = True if valid is None else valid and True
elif device_hw_addr.lower() in packet['_source']['layers']['eth.dst']:
LOGGER.debug('BACnet detected to device')
valid = valid = True if valid is None else valid and True
else:
LOGGER.debug('BACnet detected for wrong MAC address')
src = packet['_source']['layers']['eth.src'][0]
dst = packet['_source']['layers']['eth.dst'][0]
LOGGER.debug(f'From: {src} To: {dst} Expected: {device_hw_addr}')
valid = False
return valid
except Exception: # pylint: disable=W0718
LOGGER.error('Error occured when validating source', exc_info=True)
return False

def get_bacnet_packets(self, capture_file, object_id):
bin_file = self._bin_dir + '/get_bacnet_packets.sh'
args = f'"{capture_file}" {object_id}'
command = f'{bin_file} {args}'
response = util.run_command(command)
return json.loads(response[0].strip())
7 changes: 3 additions & 4 deletions modules/test/protocol/python/src/protocol_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ def __init__(self, module):
super().__init__(module_name=module, log_name=LOG_NAME)
global LOGGER
LOGGER = self._get_logger()
self._bacnet = BACnet(LOGGER)
self._bacnet = BACnet(log=LOGGER,device_hw_addr=self._device_mac)

def _protocol_valid_bacnet(self):
LOGGER.info('Running protocol.valid_bacnet')
result = None
interface_name = 'veth0'

# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is None:
self._device_ipv4_addr = self._get_device_ipv4()
Expand All @@ -47,8 +46,8 @@ def _protocol_valid_bacnet(self):
# Resolve the appropriate IP for BACnet comms
local_address = self.get_local_ip(interface_name)
if local_address:
result = self._bacnet.validate_device(local_address,
self._device_ipv4_addr)
self._bacnet.discover(local_address + '/24')
result = self._bacnet.validate_device()
if result[0]:
self._supports_bacnet = True
else:
Expand Down
Binary file added testing/unit/protocol/captures/bacnet.pcap
Binary file not shown.
106 changes: 106 additions & 0 deletions testing/unit/protocol/protocol_module_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module run all the DNS related unit tests"""
from protocol_bacnet import BACnet
import unittest
import os
from common import logger
import inspect

MODULE = 'protocol'

# Define the directories
TEST_FILES_DIR = 'testing/unit/' + MODULE
OUTPUT_DIR = os.path.join(TEST_FILES_DIR, 'output/')
REPORTS_DIR = os.path.join(TEST_FILES_DIR, 'reports/')
CAPTURES_DIR = os.path.join(TEST_FILES_DIR, 'captures/')

# Define the capture files to be used for the test
PROTOCOL_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'bacnet.pcap')

HW_ADDR = 'AA:BB:CC:DD:EE:FF'
HW_ADDR_BAD = 'AA:BB:CC:DD:EE:FE'
BACNET = None
LOGGER = None


class ProtocolModuleTest(unittest.TestCase):
"""Contains and runs all the unit tests concerning DNS behaviors"""

@classmethod
def setUpClass(cls):
global LOGGER
LOGGER = logger.get_logger('unit_test_' + MODULE)
global BACNET
BACNET = BACnet(log=LOGGER,
captures_dir=CAPTURES_DIR,
capture_file='bacnet.pcap',
bin_dir='modules/test/protocol/bin',
device_hw_addr=HW_ADDR)

# Test the BACNet traffic for a matching Object ID and HW address
def bacnet_protocol_traffic_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
result = BACNET.validate_bacnet_source(object_id='1761001',
device_hw_addr=HW_ADDR)
LOGGER.info(f'Test Result: {result}')
self.assertEqual(result, True)

# Test the BACNet test when Object ID and HW address
# do not match
def bacnet_protocol_traffic_fail_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
result = BACNET.validate_bacnet_source(object_id='1761001',
device_hw_addr=HW_ADDR_BAD)
LOGGER.info(f'Test Result: {result}')
self.assertEqual(result, False)

# Test a BACnet device with valid traffic to/from an
# expected HW address and Object ID
def bacnet_protocol_validate_device_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
# Load bacnet devices to simulate a discovery
bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001)
BACNET.devices = [bac_dev]
result = BACNET.validate_device()
LOGGER.info(f'Test Result: {result}')
self.assertEqual(result, (True, 'BACnet device discovered'))

# Test a BACnet device with valid traffic to/from an
# expected HW address and Object ID
def bacnet_protocol_validate_device_fail_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
# Load bacnet devices to simulate a discovery
bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001)
BACNET.devices = [bac_dev]
# Change the MAC address to a different device than expected
BACNET.device_hw_addr = HW_ADDR_BAD
result = BACNET.validate_device()
LOGGER.info(f'Test Result: {result}')
self.assertEqual(
result,
(False, 'BACnet device was found but was not device under test'))


if __name__ == '__main__':
suite = unittest.TestSuite()

suite.addTest(ProtocolModuleTest('bacnet_protocol_traffic_test'))
suite.addTest(ProtocolModuleTest('bacnet_protocol_traffic_fail_test'))

suite.addTest(ProtocolModuleTest('bacnet_protocol_validate_device_test'))
suite.addTest(ProtocolModuleTest('bacnet_protocol_validate_device_fail_test'))

runner = unittest.TextTestRunner()
runner.run(suite)
Loading