From 822d55c7367f8739a4998279ad5c5e91f2fb4c9c Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Mon, 15 Apr 2024 14:36:33 +0100 Subject: [PATCH 1/8] Add feature not present test result --- framework/python/src/common/testreport.py | 8 ++ modules/test/base/python/src/test_module.py | 8 +- modules/test/tls/conf/module_config.json | 4 +- modules/test/tls/python/src/tls_module.py | 43 ++++++++-- modules/test/tls/python/src/tls_util.py | 87 ++++++++++++++++----- 5 files changed, 119 insertions(+), 31 deletions(-) diff --git a/framework/python/src/common/testreport.py b/framework/python/src/common/testreport.py index 9b1eff6ef..2cd82b267 100644 --- a/framework/python/src/common/testreport.py +++ b/framework/python/src/common/testreport.py @@ -358,6 +358,8 @@ def generate_result(self, result): result_class = 'result-test-result-compliant' elif result['result'] == 'Error': result_class = 'result-test-result-error' + elif result['result'] == 'Feature Not Present': + result_class = 'result-test-result-feature-not-present' else: result_class = 'result-test-result-skipped' @@ -865,6 +867,12 @@ def generate_css(self): left: 7.3in; } + .result-test-result-feature-not-present { + background-color: #ffe9c8; + color: #8d5c00; + left: 6.92in; + } + .result-test-result-non-compliant { background-color: #FCE8E6; color: #C5221F; diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py index a81b97caf..8114704f3 100644 --- a/modules/test/base/python/src/test_module.py +++ b/modules/test/base/python/src/test_module.py @@ -114,6 +114,7 @@ def run_tests(self): test['result'] = 'Compliant' if result else 'Non-Compliant' test['description'] = 'No description was provided for this test' else: + # TODO: This is assuming that result is an array but haven't checked # Skipped result if result[0] is None: test['result'] = 'Skipped' @@ -121,12 +122,15 @@ def run_tests(self): test['description'] = result[1] else: test['description'] = 'An error occured whilst running this test' + # Compliant / Non-Compliant result elif isinstance(result[0], bool): test['result'] = 'Compliant' if result[0] else 'Non-Compliant' - # Result may be a string, e.g error - elif result[0] == 'Error': + # Result may be a string, e.g Error, Feature Not Present + elif isinstance(result[0], str): test['result'] = result[0] + else: + test['result'] = 'Error' # Check that description is a string if isinstance(result[1], str): diff --git a/modules/test/tls/conf/module_config.json b/modules/test/tls/conf/module_config.json index 8505476b4..cd77f8299 100644 --- a/modules/test/tls/conf/module_config.json +++ b/modules/test/tls/conf/module_config.json @@ -16,7 +16,7 @@ "name": "security.tls.v1_2_server", "test_description": "Check the device web server TLS 1.2 & certificate is valid", "expected_behavior": "TLS 1.2 certificate is issued to the web browser client when accessed", - "required_result": "Required", + "required_result": "Required if Applicable", "recommendations": [ "Enable TLS 1.2 support in the web server configuration", "Disable TLS 1.0 and 1.1", @@ -27,7 +27,7 @@ "name": "security.tls.v1_2_client", "test_description": "Device uses TLS with connection to an external service on port 443 (or any other port which could be running the webserver-HTTPS)", "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers", - "required_result": "Required", + "required_result": "Required if Applicable", "recommendations": [ "Disable connections to unsecure services", "Ensure any URLs connected to are secure (https)" diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 472d403b2..7c0a821eb 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -230,24 +230,49 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): def _security_tls_v1_2_server(self): LOGGER.info('Running security.tls.v1_2_server') - self._resolve_device_ip() + # If the ipv4 address wasn't resolved yet, try again + self._resolve_device_ip() + + # IPv4 address has been resolved if self._device_ipv4_addr is not None: + + # Check if the device has a web server + web_server_open = self._tls_util.web_server_open( + host=self._device_ipv4_addr + ) + if not web_server_open: + return 'Feature Not Present', 'No web server available' + + # Fetch TLSv1.2 results tls_1_2_results = self._tls_util.validate_tls_server( self._device_ipv4_addr, tls_version='1.2') + + # Fetch TLSv1.3 results tls_1_3_results = self._tls_util.validate_tls_server( self._device_ipv4_addr, tls_version='1.3') + return self._tls_util.process_tls_server_results(tls_1_2_results, tls_1_3_results) else: - LOGGER.error('Could not resolve device IP address. Skipping') + LOGGER.error('Could not resolve device IP address') return 'Error', 'Could not resolve device IP address' def _security_tls_v1_3_server(self): LOGGER.info('Running security.tls.v1_3_server') - self._resolve_device_ip() + # If the ipv4 address wasn't resolved yet, try again + self._resolve_device_ip() + if self._device_ipv4_addr is not None: + + # Check if the device has a web server + web_server_open = self._tls_util.web_server_open( + host=self._device_ipv4_addr + ) + if not web_server_open: + return 'Feature Not Present', 'No web server available' + return self._tls_util.validate_tls_server(self._device_ipv4_addr, tls_version='1.3') else: @@ -256,12 +281,14 @@ def _security_tls_v1_3_server(self): def _security_tls_v1_2_client(self): LOGGER.info('Running security.tls.v1_2_client') - self._resolve_device_ip() + # If the ipv4 address wasn't resolved yet, try again + self._resolve_device_ip() + if self._device_ipv4_addr is not None: return self._validate_tls_client(self._device_ipv4_addr, '1.2') else: - LOGGER.error('Could not resolve device IP address. Skipping') + LOGGER.error('Could not resolve device IP address.') return 'Error', 'Could not resolve device IP address' def _security_tls_v1_3_client(self): @@ -271,7 +298,7 @@ def _security_tls_v1_3_client(self): if self._device_ipv4_addr is not None: return self._validate_tls_client(self._device_ipv4_addr, '1.3') else: - LOGGER.error('Could not resolve device IP address. Skipping') + LOGGER.error('Could not resolve device IP address.') return 'Error', 'Could not resolve device IP address' def _validate_tls_client(self, client_ip, tls_version): @@ -283,8 +310,8 @@ def _validate_tls_client(self, client_ip, tls_version): ]) # Generate results based on the state - result_message = 'No outbound connections were found.' - result_state = None + result_message = 'No outbound connections were found' + result_state = 'Feature Not Present' # If any of the packetes detect failed client comms, fail the test if not client_results[0] and client_results[0] is not None: diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index ef8e74e64..36c10f22d 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -54,13 +54,31 @@ def __init__(self, self._dev_cert_file = 'device_cert.crt' self._root_certs_dir = root_certs_dir + def web_server_open(self, host, port=443): + server_open = False + try: + + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + # Create an SSL/TLS socket + with socket.create_connection((host, port), timeout=5) as sock: + with context.wrap_socket(sock, server_hostname=host): + server_open = True + + except Exception as e: + LOGGER.info('Failed to open TLS connection to device web server') + LOGGER.debug(e) + + return server_open + def get_public_certificate(self, host, port=443, validate_cert=False, tls_version='1.2'): try: - #context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False if not validate_cert: @@ -318,32 +336,48 @@ def get_certificate(self, uri, timeout=10): return certificate def process_tls_server_results(self, tls_1_2_results, tls_1_3_results): + results = '' + + # Only TLS v1.3 is supported if tls_1_2_results[0] is None and tls_1_3_results[0] is not None: - # Validate only TLS 1.3 results + description = 'TLS 1.3' + (' not' if not tls_1_3_results[0] else - '') + ' validated: ' + tls_1_3_results[1] + '') + ' compliant: ' + tls_1_3_results[1] results = tls_1_3_results[0], description + + # Only TLS v1.2 is supported elif tls_1_3_results[0] is None and tls_1_2_results[0] is not None: - # Vaidate only TLS 1.2 results description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else - '') + ' validated: ' + tls_1_2_results[1] + '') + ' compliant: ' + tls_1_2_results[1] results = tls_1_2_results[0], description + + # Both v1.2 and v1.3 are supported elif tls_1_3_results[0] is not None and tls_1_2_results[0] is not None: - # Validate both results - description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else - '') + ' validated: ' + tls_1_2_results[1] - description += '\nTLS 1.3' + (' not' if not tls_1_3_results[0] else - '') + ' validated: ' + tls_1_3_results[1] + + # Check if both v1.2 and v1.3 are not compliant + # If so, we only need details on 1.2 + if not tls_1_2_results[0] and not tls_1_3_results[0]: + description = 'TLS 1.2 not compliant: ' + tls_1_2_results[1] + else: + description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else + '') + ' compliant: ' + tls_1_2_results[1] + description += '\nTLS 1.3' + (' not' if not tls_1_3_results[0] else + '') + ' compliant: ' + tls_1_3_results[1] + results = tls_1_2_results[0] or tls_1_3_results[0], description + + # Neither TLS v1.2 or v1.3 are supported else: + # Just provide info about TLS v1.2 description = f'TLS 1.2 not validated: {tls_1_2_results[1]}' - description += f'\nTLS 1.3 not validated: {tls_1_3_results[1]}' results = None, description LOGGER.info('TLS server test results: ' + str(results)) return results def validate_tls_server(self, host, tls_version): + + # Obtain the web server certificate cert_pem = self.get_public_certificate(host, validate_cert=False, tls_version=tls_version) @@ -372,7 +406,21 @@ def validate_tls_server(self, host, tls_version): # Check results cert_valid = tr_valid[0] and key_valid[0] and sig_valid[0] - test_details = tr_valid[1] + '\n' + key_valid[1] + '\n' + sig_valid[1] + test_details = '' + + # Only include reasons for non-compliance + if not tr_valid[0]: + test_details = tr_valid[1] + '. ' + + if not key_valid[0]: + test_details += key_valid[1] + '. ' + + if not sig_valid[0]: + test_details += sig_valid[1] + '. ' + + if cert_valid: + test_details = 'Certificate meets all requirements' + LOGGER.info('Certificate validated: ' + str(cert_valid)) LOGGER.info('Test details:\n' + test_details) return cert_valid, test_details @@ -550,13 +598,14 @@ def get_non_tls_client_connection_ips(self, client_ip, capture_files): non_tls_dst_ips = set() # Store unique destination IPs for packet in packets: # Check if packet contains TCP layer - if 'tcp' in packet['_source']['layers']: - tcp_flags = packet['_source']['layers']['tcp.flags'] - if 'A' not in tcp_flags and 'S' not in tcp_flags: - # Packet is not ACK or SYN - dst_ip = ipaddress.ip_address(packet['_source']['layers']['ip.dst'][0]) - if not dst_ip in subnet_with_mask: - non_tls_dst_ips.add(str(dst_ip)) + if 'tcp' in packet['_source']['layers']: + tcp_flags = packet['_source']['layers']['tcp.flags'] + if 'A' not in tcp_flags and 'S' not in tcp_flags: + # Packet is not ACK or SYN + dst_ip = ipaddress.ip_address( + packet['_source']['layers']['ip.dst'][0]) + if not dst_ip in subnet_with_mask: + non_tls_dst_ips.add(str(dst_ip)) return non_tls_dst_ips # Check if the device has made any outbound connections that don't From aae5467255333d040d46ab4c507bfebfb2868ed6 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Tue, 16 Apr 2024 09:44:34 +0100 Subject: [PATCH 2/8] Remove changes to tls module --- modules/test/tls/conf/module_config.json | 4 +- modules/test/tls/python/src/run.py | 2 +- modules/test/tls/python/src/tls_module.py | 45 +++--------- modules/test/tls/python/src/tls_util.py | 89 +++++------------------ 4 files changed, 32 insertions(+), 108 deletions(-) diff --git a/modules/test/tls/conf/module_config.json b/modules/test/tls/conf/module_config.json index cd77f8299..8505476b4 100644 --- a/modules/test/tls/conf/module_config.json +++ b/modules/test/tls/conf/module_config.json @@ -16,7 +16,7 @@ "name": "security.tls.v1_2_server", "test_description": "Check the device web server TLS 1.2 & certificate is valid", "expected_behavior": "TLS 1.2 certificate is issued to the web browser client when accessed", - "required_result": "Required if Applicable", + "required_result": "Required", "recommendations": [ "Enable TLS 1.2 support in the web server configuration", "Disable TLS 1.0 and 1.1", @@ -27,7 +27,7 @@ "name": "security.tls.v1_2_client", "test_description": "Device uses TLS with connection to an external service on port 443 (or any other port which could be running the webserver-HTTPS)", "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers", - "required_result": "Required if Applicable", + "required_result": "Required", "recommendations": [ "Disable connections to unsecure services", "Ensure any URLs connected to are secure (https)" diff --git a/modules/test/tls/python/src/run.py b/modules/test/tls/python/src/run.py index 89de9f65e..78296f0c1 100644 --- a/modules/test/tls/python/src/run.py +++ b/modules/test/tls/python/src/run.py @@ -66,4 +66,4 @@ def run(): if __name__ == '__main__': - run() + run() \ No newline at end of file diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 7c0a821eb..f2afd4ee0 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -230,49 +230,24 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): def _security_tls_v1_2_server(self): LOGGER.info('Running security.tls.v1_2_server') - - # If the ipv4 address wasn't resolved yet, try again self._resolve_device_ip() - - # IPv4 address has been resolved + # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is not None: - - # Check if the device has a web server - web_server_open = self._tls_util.web_server_open( - host=self._device_ipv4_addr - ) - if not web_server_open: - return 'Feature Not Present', 'No web server available' - - # Fetch TLSv1.2 results tls_1_2_results = self._tls_util.validate_tls_server( self._device_ipv4_addr, tls_version='1.2') - - # Fetch TLSv1.3 results tls_1_3_results = self._tls_util.validate_tls_server( self._device_ipv4_addr, tls_version='1.3') - return self._tls_util.process_tls_server_results(tls_1_2_results, tls_1_3_results) else: - LOGGER.error('Could not resolve device IP address') + LOGGER.error('Could not resolve device IP address. Skipping') return 'Error', 'Could not resolve device IP address' def _security_tls_v1_3_server(self): LOGGER.info('Running security.tls.v1_3_server') - - # If the ipv4 address wasn't resolved yet, try again self._resolve_device_ip() - + # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is not None: - - # Check if the device has a web server - web_server_open = self._tls_util.web_server_open( - host=self._device_ipv4_addr - ) - if not web_server_open: - return 'Feature Not Present', 'No web server available' - return self._tls_util.validate_tls_server(self._device_ipv4_addr, tls_version='1.3') else: @@ -281,14 +256,12 @@ def _security_tls_v1_3_server(self): def _security_tls_v1_2_client(self): LOGGER.info('Running security.tls.v1_2_client') - - # If the ipv4 address wasn't resolved yet, try again self._resolve_device_ip() - + # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is not None: return self._validate_tls_client(self._device_ipv4_addr, '1.2') else: - LOGGER.error('Could not resolve device IP address.') + LOGGER.error('Could not resolve device IP address. Skipping') return 'Error', 'Could not resolve device IP address' def _security_tls_v1_3_client(self): @@ -298,7 +271,7 @@ def _security_tls_v1_3_client(self): if self._device_ipv4_addr is not None: return self._validate_tls_client(self._device_ipv4_addr, '1.3') else: - LOGGER.error('Could not resolve device IP address.') + LOGGER.error('Could not resolve device IP address. Skipping') return 'Error', 'Could not resolve device IP address' def _validate_tls_client(self, client_ip, tls_version): @@ -310,8 +283,8 @@ def _validate_tls_client(self, client_ip, tls_version): ]) # Generate results based on the state - result_message = 'No outbound connections were found' - result_state = 'Feature Not Present' + result_message = 'No outbound connections were found.' + result_state = None # If any of the packetes detect failed client comms, fail the test if not client_results[0] and client_results[0] is not None: @@ -326,4 +299,4 @@ def _validate_tls_client(self, client_ip, tls_version): def _resolve_device_ip(self): # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4() + self._device_ipv4_addr = self._get_device_ipv4() \ No newline at end of file diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 36c10f22d..cf30d4f87 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -54,31 +54,13 @@ def __init__(self, self._dev_cert_file = 'device_cert.crt' self._root_certs_dir = root_certs_dir - def web_server_open(self, host, port=443): - server_open = False - try: - - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - - # Create an SSL/TLS socket - with socket.create_connection((host, port), timeout=5) as sock: - with context.wrap_socket(sock, server_hostname=host): - server_open = True - - except Exception as e: - LOGGER.info('Failed to open TLS connection to device web server') - LOGGER.debug(e) - - return server_open - def get_public_certificate(self, host, port=443, validate_cert=False, tls_version='1.2'): try: + #context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False if not validate_cert: @@ -336,48 +318,32 @@ def get_certificate(self, uri, timeout=10): return certificate def process_tls_server_results(self, tls_1_2_results, tls_1_3_results): - results = '' - - # Only TLS v1.3 is supported if tls_1_2_results[0] is None and tls_1_3_results[0] is not None: - + # Validate only TLS 1.3 results description = 'TLS 1.3' + (' not' if not tls_1_3_results[0] else - '') + ' compliant: ' + tls_1_3_results[1] + '') + ' validated: ' + tls_1_3_results[1] results = tls_1_3_results[0], description - - # Only TLS v1.2 is supported elif tls_1_3_results[0] is None and tls_1_2_results[0] is not None: + # Vaidate only TLS 1.2 results description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else - '') + ' compliant: ' + tls_1_2_results[1] + '') + ' validated: ' + tls_1_2_results[1] results = tls_1_2_results[0], description - - # Both v1.2 and v1.3 are supported elif tls_1_3_results[0] is not None and tls_1_2_results[0] is not None: - - # Check if both v1.2 and v1.3 are not compliant - # If so, we only need details on 1.2 - if not tls_1_2_results[0] and not tls_1_3_results[0]: - description = 'TLS 1.2 not compliant: ' + tls_1_2_results[1] - else: - description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else - '') + ' compliant: ' + tls_1_2_results[1] - description += '\nTLS 1.3' + (' not' if not tls_1_3_results[0] else - '') + ' compliant: ' + tls_1_3_results[1] - + # Validate both results + description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else + '') + ' validated: ' + tls_1_2_results[1] + description += '\nTLS 1.3' + (' not' if not tls_1_3_results[0] else + '') + ' validated: ' + tls_1_3_results[1] results = tls_1_2_results[0] or tls_1_3_results[0], description - - # Neither TLS v1.2 or v1.3 are supported else: - # Just provide info about TLS v1.2 description = f'TLS 1.2 not validated: {tls_1_2_results[1]}' + description += f'\nTLS 1.3 not validated: {tls_1_3_results[1]}' results = None, description LOGGER.info('TLS server test results: ' + str(results)) return results def validate_tls_server(self, host, tls_version): - - # Obtain the web server certificate cert_pem = self.get_public_certificate(host, validate_cert=False, tls_version=tls_version) @@ -406,21 +372,7 @@ def validate_tls_server(self, host, tls_version): # Check results cert_valid = tr_valid[0] and key_valid[0] and sig_valid[0] - test_details = '' - - # Only include reasons for non-compliance - if not tr_valid[0]: - test_details = tr_valid[1] + '. ' - - if not key_valid[0]: - test_details += key_valid[1] + '. ' - - if not sig_valid[0]: - test_details += sig_valid[1] + '. ' - - if cert_valid: - test_details = 'Certificate meets all requirements' - + test_details = tr_valid[1] + '\n' + key_valid[1] + '\n' + sig_valid[1] LOGGER.info('Certificate validated: ' + str(cert_valid)) LOGGER.info('Test details:\n' + test_details) return cert_valid, test_details @@ -598,14 +550,13 @@ def get_non_tls_client_connection_ips(self, client_ip, capture_files): non_tls_dst_ips = set() # Store unique destination IPs for packet in packets: # Check if packet contains TCP layer - if 'tcp' in packet['_source']['layers']: - tcp_flags = packet['_source']['layers']['tcp.flags'] - if 'A' not in tcp_flags and 'S' not in tcp_flags: - # Packet is not ACK or SYN - dst_ip = ipaddress.ip_address( - packet['_source']['layers']['ip.dst'][0]) - if not dst_ip in subnet_with_mask: - non_tls_dst_ips.add(str(dst_ip)) + if 'tcp' in packet['_source']['layers']: + tcp_flags = packet['_source']['layers']['tcp.flags'] + if 'A' not in tcp_flags and 'S' not in tcp_flags: + # Packet is not ACK or SYN + dst_ip = ipaddress.ip_address(packet['_source']['layers']['ip.dst'][0]) + if not dst_ip in subnet_with_mask: + non_tls_dst_ips.add(str(dst_ip)) return non_tls_dst_ips # Check if the device has made any outbound connections that don't @@ -765,4 +716,4 @@ def is_ecdh_and_ecdsa(self, ciphers): for cipher in ciphers: ecdh |= 'ECDH' in cipher ecdsa |= 'ECDSA' in cipher - return {'ecdh': ecdh, 'ecdsa': ecdsa} + return {'ecdh': ecdh, 'ecdsa': ecdsa} \ No newline at end of file From b276a33dd6b7788e7817c354aea84250076bef7f Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Wed, 17 Apr 2024 10:44:11 +0100 Subject: [PATCH 3/8] Further pylint fixes --- .../devices/faux-dev/python/src/dns_check.py | 12 +-- modules/test/base/python/src/util.py | 26 ++--- modules/test/dns/python/src/dns_module.py | 12 +-- modules/test/tls/python/src/tls_module.py | 3 +- testing/api/test_api.py | 96 ++++++++++--------- testing/tests/test_tests.py | 6 +- 6 files changed, 81 insertions(+), 74 deletions(-) diff --git a/modules/devices/faux-dev/python/src/dns_check.py b/modules/devices/faux-dev/python/src/dns_check.py index be9c58d43..98dfd42e8 100644 --- a/modules/devices/faux-dev/python/src/dns_check.py +++ b/modules/devices/faux-dev/python/src/dns_check.py @@ -102,16 +102,16 @@ def _exec_tcpdump(self, tcpdump_filter): LOGGER.debug('tcpdump command: ' + command) - process = subprocess.Popen(command, + with subprocess.Popen(command, universal_newlines=True, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - text = str(process.stdout.read()).rstrip() + stderr=subprocess.PIPE) as process: + text = str(process.stdout.read()).rstrip() - LOGGER.debug('tcpdump response: ' + text) + LOGGER.debug('tcpdump response: ' + text) - if text: - return text.split('\n') + if text: + return text.split('\n') return [] diff --git a/modules/test/base/python/src/util.py b/modules/test/base/python/src/util.py index 0f54c4298..61007f2f8 100644 --- a/modules/test/base/python/src/util.py +++ b/modules/test/base/python/src/util.py @@ -27,17 +27,17 @@ # by any return code from the process other than zero. def run_command(cmd, output=True): success = False - process = subprocess.Popen(shlex.split(cmd), + with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - if process.returncode != 0 and output: - err_msg = f'{stderr.strip()}. Code: {process.returncode}' - LOGGER.error('Command Failed: ' + cmd) - LOGGER.error('Error: ' + err_msg) - else: - success = True - if output: - return stdout.strip().decode('utf-8'), stderr - else: - return success + stderr=subprocess.PIPE) as process: + stdout, stderr = process.communicate() + if process.returncode != 0 and output: + err_msg = f'{stderr.strip()}. Code: {process.returncode}' + LOGGER.error('Command Failed: ' + cmd) + LOGGER.error('Error: ' + err_msg) + else: + success = True + if output: + return stdout.strip().decode('utf-8'), stderr + else: + return success diff --git a/modules/test/dns/python/src/dns_module.py b/modules/test/dns/python/src/dns_module.py index 02d89eb0a..0cd27aa96 100644 --- a/modules/test/dns/python/src/dns_module.py +++ b/modules/test/dns/python/src/dns_module.py @@ -267,16 +267,16 @@ def _exec_tcpdump(self, tcpdump_filter, capture_file): LOGGER.debug('tcpdump command: ' + command) - process = subprocess.Popen(command, + with subprocess.Popen(command, universal_newlines=True, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - text = str(process.stdout.read()).rstrip() + stderr=subprocess.PIPE) as process: + text = str(process.stdout.read()).rstrip() - LOGGER.debug('tcpdump response: ' + text) + LOGGER.debug('tcpdump response: ' + text) - if text: - return text.split('\n') + if text: + return text.split('\n') return [] diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index f2afd4ee0..a1fad197f 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -122,7 +122,8 @@ def __init__(self, # # cert_table = (f'| Property | Value |\n' # # f'|---|---|\n' # # f"| {'Version':<17} | {version_value:^25} |\n" - # # f"| {'Signature Alg.':<17} | {signature_alg_value:^25} |\n" + # # f"| {'Signature Alg.':<17} | + # {signature_alg_value:^25} |\n" # # f"| {'Validity from':<17} | {not_before:^25} |\n" # # f"| {'Valid to':<17} | {not_after:^25} |") diff --git a/testing/api/test_api.py b/testing/api/test_api.py index f0e9ac51c..3759bb103 100644 --- a/testing/api/test_api.py +++ b/testing/api/test_api.py @@ -17,14 +17,13 @@ # TODO refactor fixtures to not trigger error # pylint: disable=redefined-outer-name -from collections.abc import Awaitable, Callable +from collections.abc import Callable import copy import json import os from pathlib import Path import re import shutil -import shutil import signal import subprocess import time @@ -52,14 +51,14 @@ def pretty_print(dictionary: dict): def query_system_status() -> str: """Query system status from API and returns this""" - r = requests.get(f"{API}/system/status") + r = requests.get(f"{API}/system/status", timeout=5) response = json.loads(r.text) return response["status"] def query_test_count() -> int: """Queries status and returns number of test results""" - r = requests.get(f"{API}/system/status") + r = requests.get(f"{API}/system/status", timeout=5) response = json.loads(r.text) return len(response["tests"]["results"]) @@ -118,9 +117,8 @@ def testing_devices(): @pytest.fixture -def testrun(request): +def testrun(request): # pylint: disable=W0613 """ Start intstance of testrun """ - test_name = request.node.originalname proc = subprocess.Popen( "bin/testrun", stdout=subprocess.PIPE, @@ -131,13 +129,13 @@ def testrun(request): while True: try: - outs, errs = proc.communicate(timeout=1) + outs = proc.communicate(timeout=1)[0] except subprocess.TimeoutExpired as e: if e.output is not None: output = e.output.decode("utf-8") if re.search("API waiting for requests", output): break - except Exception as e: + except Exception: pytest.fail("testrun terminated") time.sleep(2) @@ -146,7 +144,7 @@ def testrun(request): os.killpg(os.getpgid(proc.pid), signal.SIGTERM) try: - outs, errs = proc.communicate(timeout=60) + outs = proc.communicate(timeout=60)[0] except Exception as e: print(e.output) os.killpg(os.getpgid(proc.pid), signal.SIGKILL) @@ -224,9 +222,9 @@ def local_get_devices(): ) -def test_get_system_interfaces(testrun): +def test_get_system_interfaces(testrun): # pylint: disable=W0613 """Tests API system interfaces against actual local interfaces""" - r = requests.get(f"{API}/system/interfaces") + r = requests.get(f"{API}/system/interfaces", timeout=5) response = json.loads(r.text) local_interfaces = get_network_interfaces() assert set(response.keys()) == set(local_interfaces) @@ -235,18 +233,18 @@ def test_get_system_interfaces(testrun): assert all([isinstance(x, str) for x in response]) -def test_modify_device(testing_devices, testrun): +def test_modify_device(testing_devices, testrun): # pylint: disable=W0613 with open( os.path.join( DEVICES_DIRECTORY, testing_devices[1] - ) + ), encoding="utf-8" ) as f: local_device = json.load(f) mac_addr = local_device["mac_addr"] new_model = "Alphabet" - r = requests.get(f"{API}/devices") + r = requests.get(f"{API}/devices", timeout=5) all_devices = json.loads(r.text) api_device = next(x for x in all_devices if x["mac_addr"] == mac_addr) @@ -271,11 +269,11 @@ def test_modify_device(testing_devices, testrun): # update device r = requests.post(f"{API}/device/edit", - data=json.dumps(updated_device_payload)) + data=json.dumps(updated_device_payload), timeout=5) assert r.status_code == 200 - r = requests.get(f"{API}/devices") + r = requests.get(f"{API}/devices", timeout=5) all_devices = json.loads(r.text) updated_device_api = next(x for x in all_devices if x["mac_addr"] == mac_addr) @@ -283,7 +281,7 @@ def test_modify_device(testing_devices, testrun): assert updated_device_api["test_modules"] == new_test_modules -def test_create_get_devices(empty_devices_dir, testrun): +def test_create_get_devices(empty_devices_dir, testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -297,9 +295,8 @@ def test_create_get_devices(empty_devices_dir, testrun): }, } - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) print(r.text) - device1_response = r.text assert r.status_code == 201 assert len(local_get_devices()) == 1 @@ -315,18 +312,19 @@ def test_create_get_devices(empty_devices_dir, testrun): "nmap": {"enabled": True}, }, } - r = requests.post(f"{API}/device", data=json.dumps(device_2)) - device2_response = json.loads(r.text) + r = requests.post(f"{API}/device", + data=json.dumps(device_2), timeout=5) assert r.status_code == 201 assert len(local_get_devices()) == 2 # Test that returned devices API endpoint matches expected structure - r = requests.get(f"{API}/devices") + r = requests.get(f"{API}/devices", timeout=5) all_devices = json.loads(r.text) pretty_print(all_devices) with open( - os.path.join(os.path.dirname(__file__), "mockito/get_devices.json") + os.path.join(os.path.dirname(__file__), "mockito/get_devices.json"), + encoding="utf-8" ) as f: mockito = json.load(f) @@ -345,10 +343,10 @@ def test_create_get_devices(empty_devices_dir, testrun): ) -def test_get_system_config(testrun): - r = requests.get(f"{API}/system/config") +def test_get_system_config(testrun): # pylint: disable=W0613 + r = requests.get(f"{API}/system/config", timeout=5) - with open(SYSTEM_CONFIG_PATH) as f: + with open(SYSTEM_CONFIG_PATH, encoding="utf-8") as f: local_config = json.load(f) api_config = json.loads(r.text) @@ -370,12 +368,13 @@ def test_get_system_config(testrun): # TODO change to invalid jsdon request @pytest.mark.skip() -def test_invalid_path_get(testrun): - r = requests.get(f"{API}/blah/blah") +def test_invalid_path_get(testrun): # pylint: disable=W0613 + r = requests.get(f"{API}/blah/blah", timeout=5) response = json.loads(r.text) assert r.status_code == 404 with open( - os.path.join(os.path.dirname(__file__), "mockito/invalid_request.json") + os.path.join(os.path.dirname(__file__), "mockito/invalid_request.json"), + encoding="utf-8" ) as f: mockito = json.load(f) @@ -383,9 +382,10 @@ def test_invalid_path_get(testrun): assert set(dict_paths(mockito)) == set(dict_paths(response)) -def test_trigger_run(testing_devices, testrun): +def test_trigger_run(testing_devices, testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} - r = requests.post(f"{API}/system/start", data=json.dumps(payload)) + r = requests.post(f"{API}/system/start", data=json.dumps(payload), + timeout=5) assert r.status_code == 200 until_true( @@ -405,7 +405,7 @@ def test_trigger_run(testing_devices, testrun): stop_test_device("x123") # Validate response - r = requests.get(f"{API}/system/status") + r = requests.get(f"{API}/system/status", timeout=5) response = json.loads(r.text) pretty_print(response) @@ -419,7 +419,7 @@ def test_trigger_run(testing_devices, testrun): with open( os.path.join( os.path.dirname(__file__), "mockito/running_system_status.json" - ) + ), encoding="utf-8" ) as f: mockito = json.load(f) @@ -434,9 +434,10 @@ def test_trigger_run(testing_devices, testrun): # Validate a result assert results["baseline.compliant"]["result"] == "Compliant" -def test_stop_running_test(testing_devices, testrun): +def test_stop_running_test(testing_devices, testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": ALL_MAC_ADDR, "firmware": "asd"}} - r = requests.post(f"{API}/system/start", data=json.dumps(payload)) + r = requests.post(f"{API}/system/start", data=json.dumps(payload), + timeout=5) assert r.status_code == 200 until_true( @@ -456,13 +457,13 @@ def test_stop_running_test(testing_devices, testrun): stop_test_device("x12345") # Validate response - r = requests.post(f"{API}/system/stop") + r = requests.post(f"{API}/system/stop", timeout=5) response = json.loads(r.text) pretty_print(response) assert response == {"success": "Testrun stopped"} time.sleep(1) # Validate response - r = requests.get(f"{API}/system/status") + r = requests.get(f"{API}/system/status", timeout=5) response = json.loads(r.text) pretty_print(response) @@ -474,17 +475,18 @@ def test_stop_running_test(testing_devices, testrun): @pytest.mark.skip() -def test_stop_running_not_running(testrun): +def test_stop_running_not_running(testrun): # pylint: disable=W0613 # Validate response - r = requests.post(f"{API}/system/stop") + r = requests.post(f"{API}/system/stop", timeout=5) response = json.loads(r.text) pretty_print(response) assert False -def test_multiple_runs(testing_devices, testrun): +def test_multiple_runs(testing_devices, testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} - r = requests.post(f"{API}/system/start", data=json.dumps(payload)) + r = requests.post(f"{API}/system/start", data=json.dumps(payload), + timeout=10) assert r.status_code == 200 print(r.text) @@ -505,7 +507,7 @@ def test_multiple_runs(testing_devices, testrun): stop_test_device("x123") # Validate response - r = requests.get(f"{API}/system/status") + r = requests.get(f"{API}/system/status", timeout=5) response = json.loads(r.text) pretty_print(response) @@ -516,7 +518,8 @@ def test_multiple_runs(testing_devices, testrun): assert len(results) == 3 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} - r = requests.post(f"{API}/system/start", data=json.dumps(payload)) + r = requests.post(f"{API}/system/start", data=json.dumps(payload), + timeout=5) # assert r.status_code == 200 # returns 409 print(r.text) @@ -539,9 +542,9 @@ def test_multiple_runs(testing_devices, testrun): #TODO uncomment when functionality is implemented @pytest.mark.skip() -def test_create_invalid_chars(empty_devices_dir, testrun): +def test_create_invalid_chars(empty_devices_dir, testrun): # pylint: disable=W0613 # local_delete_devices(ALL_DEVICES) - # We must start test run with no devices in local/devices for this test + # We must start test run with no devices in local/devices for this test # to function as expected! assert len(local_get_devices()) == 0 @@ -559,6 +562,7 @@ def test_create_invalid_chars(empty_devices_dir, testrun): }, } - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", data=json.dumps(device_1), + timeout=5) print(r.text) print(r.status_code) diff --git a/testing/tests/test_tests.py b/testing/tests/test_tests.py index a14afb2cb..f744d3b69 100644 --- a/testing/tests/test_tests.py +++ b/testing/tests/test_tests.py @@ -101,12 +101,14 @@ def test_list_tests(capsys, results, test_matrix): for tester in test_matrix.keys(): print(f'\n{tester}:') print(' expected results:') - for test in collect_expected_results(test_matrix[tester]['expected_results']): + for test in collect_expected_results( + test_matrix[tester]['expected_results']): print(f' {test.name}: {test.result}') print(' actual results:') for test in collect_actual_results(results[tester]): if test.name in test_matrix[tester]['expected_results']: - print(f' {test.name}: {test.result} (exp: {test_matrix[tester]["expected_results"][test.name]})') + print(f''' {test.name}: {test.result} + (exp: {test_matrix[tester]["expected_results"][test.name]})''') else: print(f' {test.name}: {test.result}') From 78ba4d33e675dc9e4dc47b73384a09bbed27acdd Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Mon, 22 Apr 2024 16:30:13 +0100 Subject: [PATCH 4/8] Add Feature Not Present to relevant tests --- modules/test/dns/python/src/dns_module.py | 3 ++- modules/test/ntp/python/src/ntp_module.py | 4 ++-- modules/test/protocol/python/src/protocol_bacnet.py | 3 ++- modules/test/protocol/python/src/protocol_modbus.py | 2 +- modules/test/protocol/python/src/protocol_module.py | 3 ++- modules/test/tls/python/src/tls_module.py | 4 ++-- 6 files changed, 11 insertions(+), 8 deletions(-) diff --git a/modules/test/dns/python/src/dns_module.py b/modules/test/dns/python/src/dns_module.py index ab711f5c4..c5f07895d 100644 --- a/modules/test/dns/python/src/dns_module.py +++ b/modules/test/dns/python/src/dns_module.py @@ -213,7 +213,8 @@ def _dns_network_from_dhcp(self): if dns_packets_local or dns_packets_not_local: if dns_packets_not_local: - result = False, 'DNS traffic detected to non-DHCP provided server' + result = ('Feature Not Present', + 'DNS traffic detected to non-DHCP provided server') else: LOGGER.info('DNS traffic detected only to configured DHCP DNS server') result = True, 'DNS traffic detected only to DHCP provided server' diff --git a/modules/test/ntp/python/src/ntp_module.py b/modules/test/ntp/python/src/ntp_module.py index 952ae236f..59fde2270 100644 --- a/modules/test/ntp/python/src/ntp_module.py +++ b/modules/test/ntp/python/src/ntp_module.py @@ -272,11 +272,11 @@ def _ntp_network_ntp_dhcp(self): result = False, ('Device sent NTP request to DHCP provided ' + 'server and non-DHCP provided server') elif ntp_to_remote: - result = False, 'Device sent NTP request to non-DHCP provided server' + result = 'Feature Not Present', 'Device sent NTP request to non-DHCP provided server' elif ntp_to_local: result = True, 'Device sent NTP request to DHCP provided server' else: - result = False, 'Device has not sent any NTP requests' + result = 'Feature Not Present', 'Device has not sent any NTP requests' LOGGER.info(result[1]) return result diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index 8560ba00f..2a5a22fe4 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -70,7 +70,8 @@ def validate_device(self, local_ip, device_ip): result = True, 'Device IP matches discovered device' break else: - result = None, 'BACnet discovery could not resolve any devices' + result = ('Feature Not Present', + 'BACnet discovery could not resolve any devices') if result is not None: LOGGER.info(result[1]) return result diff --git a/modules/test/protocol/python/src/protocol_modbus.py b/modules/test/protocol/python/src/protocol_modbus.py index d9a74c834..48041b1b8 100644 --- a/modules/test/protocol/python/src/protocol_modbus.py +++ b/modules/test/protocol/python/src/protocol_modbus.py @@ -267,7 +267,7 @@ def validate_device(self): compliant = (holding_reg is not None or input_reg is not None or coils is not None or discrete_inputs is not None) else: - compliant = False + compliant = 'Feature Not Present' details = 'Failed to establish Modbus connection to device' result = compliant, details return result diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index 0c9936524..dc7c7d286 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -67,11 +67,12 @@ def _protocol_bacnet_version(self): LOGGER.info(f'Checking BACnet version for device: {device}') if self._device_ipv4_addr in device[2]: result_status, result_description = \ - self._bacnet.validate_protocol_version( device[2], device[3]) + self._bacnet.validate_protocol_version(device[2], device[3]) break else: LOGGER.info('Device does not match expected IP address, skipping') else: + result_status = 'Feature Not Present' result_description = 'No BACnet devices discovered.' LOGGER.info(result_description) return result_status, result_description diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index e6cb23ff1..89b124691 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -280,7 +280,7 @@ def _validate_tls_client(self, client_ip, tls_version): # Generate results based on the state result_message = 'No outbound connections were found.' - result_state = None + result_state = 'Feature Not Present' # If any of the packetes detect failed client comms, fail the test if not client_results[0] and client_results[0] is not None: @@ -295,4 +295,4 @@ def _validate_tls_client(self, client_ip, tls_version): def _resolve_device_ip(self): # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4() \ No newline at end of file + self._device_ipv4_addr = self._get_device_ipv4() From 7e15f284ba954b735cab6f4b6e3906bb9ab3581f Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Mon, 22 Apr 2024 16:31:01 +0100 Subject: [PATCH 5/8] Reduce pylint limit --- testing/pylint/test_pylint | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/pylint/test_pylint b/testing/pylint/test_pylint index 9e9074aa7..006fca20c 100755 --- a/testing/pylint/test_pylint +++ b/testing/pylint/test_pylint @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -ERROR_LIMIT=78 +ERROR_LIMIT=25 sudo cmd/install From dfab10a0d5904ef642c82461781870be3b03b0f0 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Sat, 11 May 2024 14:52:34 +0100 Subject: [PATCH 6/8] Change to feature not detected --- framework/python/src/common/testreport.py | 8 ++--- modules/test/base/python/src/test_module.py | 2 +- modules/test/base/python/src/util.py | 30 ++++++++--------- modules/test/dns/python/src/dns_module.py | 22 ++++++------- modules/test/ntp/python/src/ntp_module.py | 7 ++-- .../protocol/python/src/protocol_bacnet.py | 22 ++++++------- .../protocol/python/src/protocol_modbus.py | 2 +- .../protocol/python/src/protocol_module.py | 32 +++++++++++-------- modules/test/tls/python/src/tls_module.py | 6 ++-- 9 files changed, 67 insertions(+), 64 deletions(-) diff --git a/framework/python/src/common/testreport.py b/framework/python/src/common/testreport.py index 6b430fe18..683ead17a 100644 --- a/framework/python/src/common/testreport.py +++ b/framework/python/src/common/testreport.py @@ -358,8 +358,8 @@ def generate_result(self, result): result_class = 'result-test-result-compliant' elif result['result'] == 'Error': result_class = 'result-test-result-error' - elif result['result'] == 'Feature Not Present': - result_class = 'result-test-result-feature-not-present' + elif result['result'] == 'Feature Not Detected': + result_class = 'result-test-result-feature-not-detected' else: result_class = 'result-test-result-skipped' @@ -869,8 +869,8 @@ def generate_css(self): left: 7.3in; } - .result-test-result-feature-not-present { - background-color: #ffe9c8; + .result-test-result-feature-not-detected { + background-color: #dadce0; color: #8d5c00; left: 6.92in; } diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py index d8d2cce44..dc924cfeb 100644 --- a/modules/test/base/python/src/test_module.py +++ b/modules/test/base/python/src/test_module.py @@ -136,7 +136,7 @@ def run_tests(self): # Compliant / Non-Compliant result elif isinstance(result[0], bool): test['result'] = 'Compliant' if result[0] else 'Non-Compliant' - # Result may be a string, e.g Error, Feature Not Present + # Result may be a string, e.g Error, Feature Not Detected elif isinstance(result[0], str): test['result'] = result[0] else: diff --git a/modules/test/base/python/src/util.py b/modules/test/base/python/src/util.py index 431b361b0..006648037 100644 --- a/modules/test/base/python/src/util.py +++ b/modules/test/base/python/src/util.py @@ -30,18 +30,18 @@ def run_command(cmd, output=True): success = False with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - if process.returncode != 0 and output: - err_msg = f'{stderr.strip()}. Code: {process.returncode}' - LOGGER.error('Command Failed: ' + cmd) - LOGGER.error('Error: ' + err_msg) - else: - success = True - LOGGER.debug('Command succeeded: ' + cmd) - if output: - out = stdout.strip().decode('utf-8') - LOGGER.debug('Command output: ' + out) - return out, stderr - else: - return success + stderr=subprocess.PIPE) as process: + stdout, stderr = process.communicate() + if process.returncode != 0 and output: + err_msg = f'{stderr.strip()}. Code: {process.returncode}' + LOGGER.error('Command Failed: ' + cmd) + LOGGER.error('Error: ' + err_msg) + else: + success = True + LOGGER.debug('Command succeeded: ' + cmd) + if output: + out = stdout.strip().decode('utf-8') + LOGGER.debug('Command output: ' + out) + return out, stderr + else: + return success diff --git a/modules/test/dns/python/src/dns_module.py b/modules/test/dns/python/src/dns_module.py index c5f07895d..ceccd0871 100644 --- a/modules/test/dns/python/src/dns_module.py +++ b/modules/test/dns/python/src/dns_module.py @@ -196,7 +196,6 @@ def _has_dns_traffic(self, tcpdump_filter): def _dns_network_from_dhcp(self): LOGGER.info('Running dns.network.from_dhcp') - result = None LOGGER.info('Checking DNS traffic for configured DHCP DNS server: ' + self._dns_server) @@ -213,26 +212,27 @@ def _dns_network_from_dhcp(self): if dns_packets_local or dns_packets_not_local: if dns_packets_not_local: - result = ('Feature Not Present', + # Feature Not Detected + result = ('Feature Not Detected', 'DNS traffic detected to non-DHCP provided server') else: LOGGER.info('DNS traffic detected only to configured DHCP DNS server') result = True, 'DNS traffic detected only to DHCP provided server' else: + # Feature Not Detected LOGGER.info('No DNS traffic detected from the device') - result = None, 'No DNS traffic detected from the device' + result = 'Feature Not Detected', 'No DNS traffic detected from the device' return result def _dns_network_hostname_resolution(self): LOGGER.info('Running dns.network.hostname_resolution') - result = None LOGGER.info('Checking DNS traffic from device: ' + self._device_mac) # Check if the device DNS traffic tcpdump_filter = f'dst port 53 and ether src {self._device_mac}' - dns_packetes = self._has_dns_traffic(tcpdump_filter=tcpdump_filter) + dns_packets = self._has_dns_traffic(tcpdump_filter=tcpdump_filter) - if dns_packetes: + if dns_packets: LOGGER.info('DNS traffic detected from device') result = True, 'DNS traffic detected from device' else: @@ -240,20 +240,18 @@ def _dns_network_hostname_resolution(self): result = False, 'No DNS traffic detected from the device' return result - ## TODO: This test should always return 'Informational' result def _dns_mdns(self): LOGGER.info('Running dns.mdns') - result = None # Check if the device sends any MDNS traffic tcpdump_filter = f'udp port 5353 and ether src {self._device_mac}' - dns_packetes = self._has_dns_traffic(tcpdump_filter=tcpdump_filter) + dns_packets = self._has_dns_traffic(tcpdump_filter=tcpdump_filter) - if dns_packetes: + if dns_packets: LOGGER.info('MDNS traffic detected from device') - result = True, 'MDNS traffic detected from device' + result = 'Informational', 'MDNS traffic detected from device' else: LOGGER.info('No MDNS traffic detected from the device') - result = True, 'No MDNS traffic detected from the device' + result = 'Informational', 'No MDNS traffic detected from the device' return result def _exec_tcpdump(self, tcpdump_filter, capture_file): diff --git a/modules/test/ntp/python/src/ntp_module.py b/modules/test/ntp/python/src/ntp_module.py index 59fde2270..019a63128 100644 --- a/modules/test/ntp/python/src/ntp_module.py +++ b/modules/test/ntp/python/src/ntp_module.py @@ -207,7 +207,6 @@ def extract_ntp_data(self): def _ntp_network_ntp_support(self): LOGGER.info('Running ntp.network.ntp_support') - result = None packet_capture = (rdpcap(STARTUP_CAPTURE_FILE) + rdpcap(MONITOR_CAPTURE_FILE) + rdpcap(NTP_SERVER_CAPTURE_FILE)) @@ -244,7 +243,6 @@ def _ntp_network_ntp_support(self): def _ntp_network_ntp_dhcp(self): LOGGER.info('Running ntp.network.ntp_dhcp') - result = None packet_capture = (rdpcap(STARTUP_CAPTURE_FILE) + rdpcap(MONITOR_CAPTURE_FILE) + rdpcap(NTP_SERVER_CAPTURE_FILE)) @@ -272,11 +270,12 @@ def _ntp_network_ntp_dhcp(self): result = False, ('Device sent NTP request to DHCP provided ' + 'server and non-DHCP provided server') elif ntp_to_remote: - result = 'Feature Not Present', 'Device sent NTP request to non-DHCP provided server' + result = ('Feature Not Detected', + 'Device sent NTP request to non-DHCP provided server') elif ntp_to_local: result = True, 'Device sent NTP request to DHCP provided server' else: - result = 'Feature Not Present', 'Device has not sent any NTP requests' + result = 'Feature Not Detected', 'Device has not sent any NTP requests' LOGGER.info(result[1]) return result diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index 2a5a22fe4..dd10dacc1 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -15,7 +15,10 @@ import BAC0 import logging -from BAC0.core.io.IOExceptions import UnknownPropertyError, ReadPropertyException, NoResponseFromController, DeviceNotConnected +from BAC0.core.io.IOExceptions import (UnknownPropertyError, + ReadPropertyException, + NoResponseFromController, + DeviceNotConnected) LOGGER = None BAC0_LOG = '/root/.BAC0/BAC0.log' @@ -54,24 +57,21 @@ def discover(self, local_ip=None): # Check if the device being tested is in the discovered devices list def validate_device(self, local_ip, device_ip): - result = None LOGGER.info('Validating BACnet device: ' + device_ip) self.discover(local_ip + '/24') - LOGGER.info('BACnet Devices Found: ' + str(len(self.devices))) + LOGGER.info('BACnet devices found: ' + str(len(self.devices))) if len(self.devices) > 0: - # Load a fail result initially and pass only - # if we can validate it's the right device responding - result = False, ('Could not confirm discovered BACnet device is the ' + - 'same as device being tested') + result = ('Feature Not Detected', + 'Device did not respond to BACnet discovery') for device in self.devices: address = device[2] LOGGER.info('Checking device: ' + str(device)) if device_ip in address: - result = True, 'Device IP matches discovered device' + result = True, 'BACnet device discovered' break else: - result = ('Feature Not Present', - 'BACnet discovery could not resolve any devices') + result = ('Feature Not Detected', + 'BACnet device could not be discovered') if result is not None: LOGGER.info(result[1]) return result @@ -86,7 +86,7 @@ def validate_protocol_version(self, device_ip, device_id): protocol_version = f'{version}.{revision}' result = True result_description = ( - f'BACnet protocol version detected: {protocol_version}') + f'Device uses BACnet version {protocol_version}') except (UnknownPropertyError, ReadPropertyException, NoResponseFromController, DeviceNotConnected) as e: result = False diff --git a/modules/test/protocol/python/src/protocol_modbus.py b/modules/test/protocol/python/src/protocol_modbus.py index 48041b1b8..925e9517a 100644 --- a/modules/test/protocol/python/src/protocol_modbus.py +++ b/modules/test/protocol/python/src/protocol_modbus.py @@ -267,7 +267,7 @@ def validate_device(self): compliant = (holding_reg is not None or input_reg is not None or coils is not None or discrete_inputs is not None) else: - compliant = 'Feature Not Present' + compliant = None details = 'Failed to establish Modbus connection to device' result = compliant, details return result diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index a3a2b5e7b..bfb248cd5 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -25,6 +25,7 @@ class ProtocolModule(TestModule): """Protocol Test module""" def __init__(self, module): + self._supports_bacnet = False super().__init__(module_name=module, log_name=LOG_NAME) global LOGGER LOGGER = self._get_logger() @@ -48,8 +49,10 @@ def _protocol_valid_bacnet(self): if local_address: result = self._bacnet.validate_device(local_address, self._device_ipv4_addr) + if result[0]: + self._supports_bacnet = True else: - result = None, 'Could not resolve test container IP for BACnet discovery' + result = 'Error', 'Failed to perform BACnet discovery' return result def _protocol_bacnet_version(self): @@ -59,21 +62,23 @@ def _protocol_bacnet_version(self): this test can pass. """ LOGGER.info('Running protocol.bacnet.version') - result_status = None - result_description = '' + result_status = 'Feature Not Detected' + result_description = 'Device did not respond to BACnet discovery' + + # Do not run test if device does not support BACnet + if not self._supports_bacnet: + return result_status, result_description if len(self._bacnet.devices) > 0: for device in self._bacnet.devices: - LOGGER.info(f'Checking BACnet version for device: {device}') if self._device_ipv4_addr in device[2]: + LOGGER.debug(f'Checking BACnet version for device: {device}') result_status, result_description = \ self._bacnet.validate_protocol_version(device[2], device[3]) break else: - LOGGER.info('Device does not match expected IP address, skipping') - else: - result_status = 'Feature Not Present' - result_description = 'No BACnet devices discovered.' + LOGGER.debug('Device does not match expected IP address, skipping') + LOGGER.info(result_description) return result_status, result_description @@ -82,15 +87,16 @@ def _protocol_valid_modbus(self, config): # Extract basic device connection information modbus = Modbus(log=LOGGER, device_ip=self._device_ipv4_addr, config=config) results = modbus.validate_device() + # Determine results and return proper messaging and details - description = '' if results[0] is None: - description = 'No modbus connection could be made' + result = ('Feature Not Detected', + 'Device did not respond to Modbus connection') elif results[0]: - description = 'Valid modbus communication detected' + result = True, 'Valid modbus communication detected' else: - description = 'Failed to confirm valid modbus communication' - return results[0], description, results[1] + result = False, 'Failed to confirm valid modbus communication' + return result, results[1] def get_local_ip(self, interface_name): try: diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 772a798df..637aa627c 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -305,7 +305,7 @@ def _security_tls_v1_3_client(self): description = 'TLS 1.3 client connections invalid' return results[0], description, results[1] else: - LOGGER.error('Could not resolve device IP address. Skipping') + LOGGER.error('Could not resolve device IP address') return 'Error', 'Could not resolve device IP address' def _validate_tls_client(self, client_ip, tls_version): @@ -318,9 +318,9 @@ def _validate_tls_client(self, client_ip, tls_version): # Generate results based on the state result_message = 'No outbound connections were found.' - result_state = 'Feature Not Present' + result_state = 'Feature Not Detected' - # If any of the packetes detect failed client comms, fail the test + # If any of the packets detect failed client comms, fail the test if not client_results[0] and client_results[0] is not None: result_state = False result_message = client_results[1] From 7b19ae69d807542da1bfb73a81bfe9469664a623 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Wed, 15 May 2024 09:04:45 +0100 Subject: [PATCH 7/8] Modify bacnet result --- modules/test/protocol/python/src/protocol_bacnet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index dd10dacc1..a987a9f4c 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -61,8 +61,8 @@ def validate_device(self, local_ip, device_ip): self.discover(local_ip + '/24') LOGGER.info('BACnet devices found: ' + str(len(self.devices))) if len(self.devices) > 0: - result = ('Feature Not Detected', - 'Device did not respond to BACnet discovery') + result = (False, + 'BACnet device was found but was not device under test') for device in self.devices: address = device[2] LOGGER.info('Checking device: ' + str(device)) From c7e11efe1f449c384810706412e8c9a1e6b1cafd Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Wed, 5 Jun 2024 10:05:48 +0100 Subject: [PATCH 8/8] Update DNS test --- modules/test/dns/conf/module_config.json | 2 +- modules/test/dns/python/src/dns_module.py | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/modules/test/dns/conf/module_config.json b/modules/test/dns/conf/module_config.json index 4a0dc4930..13c9b3236 100644 --- a/modules/test/dns/conf/module_config.json +++ b/modules/test/dns/conf/module_config.json @@ -27,7 +27,7 @@ "name": "dns.network.from_dhcp", "test_description": "Verify the device allows for a DNS server to be entered automatically", "expected_behavior": "The device sends DNS requests to the DNS server provided by the DHCP server", - "required_result": "Roadmap", + "required_result": "Informational", "recommendations": [ "Install a DNS client that supports fetching DNS servers from DHCP options" ] diff --git a/modules/test/dns/python/src/dns_module.py b/modules/test/dns/python/src/dns_module.py index ceccd0871..4d1c315a3 100644 --- a/modules/test/dns/python/src/dns_module.py +++ b/modules/test/dns/python/src/dns_module.py @@ -212,17 +212,14 @@ def _dns_network_from_dhcp(self): if dns_packets_local or dns_packets_not_local: if dns_packets_not_local: - # Feature Not Detected - result = ('Feature Not Detected', - 'DNS traffic detected to non-DHCP provided server') + description = 'DNS traffic detected to non-DHCP provided server' else: LOGGER.info('DNS traffic detected only to configured DHCP DNS server') - result = True, 'DNS traffic detected only to DHCP provided server' + description = 'DNS traffic detected only to DHCP provided server' else: - # Feature Not Detected LOGGER.info('No DNS traffic detected from the device') - result = 'Feature Not Detected', 'No DNS traffic detected from the device' - return result + description = 'No DNS traffic detected from the device' + return 'Informational', description def _dns_network_hostname_resolution(self): LOGGER.info('Running dns.network.hostname_resolution')