From 34e9d1e3ef761e040dbeb42012004987624684bb Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Tue, 16 Apr 2024 15:30:16 -0600 Subject: [PATCH] Expand pylint to whole project and fix simple pylint issues --- .../python/src/grpc_server/dhcp_config.py | 6 + .../src/grpc_server/dhcp_config_test.py | 4 +- .../python/src/grpc_server/network_service.py | 2 +- .../python/src/grpc_server/dhcp_config.py | 6 + .../src/grpc_server/dhcp_config_test.py | 4 +- .../python/src/grpc_server/network_service.py | 2 +- modules/network/ntp/python/src/chronyd.py | 3 +- modules/network/ntp/python/src/ntp_server.py | 2 +- modules/test/base/python/src/test_module.py | 35 ++- .../test/conn/python/src/connection_module.py | 19 +- modules/test/dns/python/src/dns_module.py | 12 +- modules/test/tls/python/src/tls_module.py | 281 +++++++++--------- modules/test/tls/python/src/tls_util.py | 15 +- testing/pylint/test_pylint | 4 +- testing/tests/test_tests.py | 37 ++- testing/unit/dns/dns_module_test.py | 37 +-- testing/unit/nmap/nmap_module_test.py | 27 +- testing/unit/tls/tls_module_test.py | 38 +-- 18 files changed, 282 insertions(+), 252 deletions(-) diff --git a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py index 877d49610..d38d95785 100644 --- a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py +++ b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py @@ -56,6 +56,12 @@ def enable_failover(self): for subnet in self._subnets: subnet.enable_peer() + def get_peer(self): + return self._peer + + def get_subnets(self): + return self._subnets + def get_reserved_host(self, hw_addr): for host in self._reserved_hosts: if hw_addr == host.hw_addr: diff --git a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py index 4bc1bd52d..493266c2c 100644 --- a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py +++ b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py @@ -61,14 +61,14 @@ def test_resolve_config(self): def test_disable_failover(self): DHCP_CONFIG.disable_failover() print('Test Disable Config:\n' + str(DHCP_CONFIG)) - config_lines = str(DHCP_CONFIG._peer).split('\n') + config_lines = str(DHCP_CONFIG.get_peer()).split('\n') for line in config_lines: self.assertTrue(line.startswith('#')) def test_enable_failover(self): DHCP_CONFIG.enable_failover() print('Test Enable Config:\n' + str(DHCP_CONFIG)) - config_lines = str(DHCP_CONFIG._peer).split('\n') + config_lines = str(DHCP_CONFIG.get_peer()).split('\n') for line in config_lines: self.assertFalse(line.startswith('#')) diff --git a/modules/network/dhcp-1/python/src/grpc_server/network_service.py b/modules/network/dhcp-1/python/src/grpc_server/network_service.py index 92726025d..5124a07ad 100644 --- a/modules/network/dhcp-1/python/src/grpc_server/network_service.py +++ b/modules/network/dhcp-1/python/src/grpc_server/network_service.py @@ -142,7 +142,7 @@ def GetDHCPRange(self, request, context): # pylint: disable=W0613 """ LOGGER.info('Get DHCP range called') try: - pool = self._get_dhcp_config()._subnets[0].pools[0] + pool = self._get_dhcp_config().get_subnets()[0].pools[0] return pb2.DHCPRange(code=200, start=pool.range_start, end=pool.range_end) except Exception as e: # pylint: disable=W0718 fail_message = 'Failed to get DHCP range: ' + str(e) diff --git a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py index 5357ba7ed..c49523a64 100644 --- a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py +++ b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py @@ -58,6 +58,12 @@ def enable_failover(self): for subnet in self._subnets: subnet.enable_peer() + def get_peer(self): + return self._peer + + def get_subnets(self): + return self._subnets + def get_reserved_host(self, hw_addr): for host in self._reserved_hosts: if hw_addr == host.hw_addr: diff --git a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py index 0a156db68..7d368265a 100644 --- a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py +++ b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py @@ -58,14 +58,14 @@ def test_resolve_config(self): def test_disable_failover(self): DHCP_CONFIG.disable_failover() print('Test Disable Config:\n' + str(DHCP_CONFIG)) - config_lines = str(DHCP_CONFIG._peer).split('\n') + config_lines = str(DHCP_CONFIG.get_peer()).split('\n') for line in config_lines: self.assertTrue(line.startswith('#')) def test_enable_failover(self): DHCP_CONFIG.enable_failover() print('Test Enable Config:\n' + str(DHCP_CONFIG)) - config_lines = str(DHCP_CONFIG._peer).split('\n') + config_lines = str(DHCP_CONFIG.get_peer()).split('\n') for line in config_lines: self.assertFalse(line.startswith('#')) diff --git a/modules/network/dhcp-2/python/src/grpc_server/network_service.py b/modules/network/dhcp-2/python/src/grpc_server/network_service.py index f9deba965..7c5c61d4f 100644 --- a/modules/network/dhcp-2/python/src/grpc_server/network_service.py +++ b/modules/network/dhcp-2/python/src/grpc_server/network_service.py @@ -142,7 +142,7 @@ def GetDHCPRange(self, request, context): # pylint: disable=W0613 """ LOGGER.info('Get DHCP range called') try: - pool = self._get_dhcp_config()._subnets[0].pools[0] + pool = self._get_dhcp_config().get_subnets()[0].pools[0] return pb2.DHCPRange(code=200, start=pool.range_start, end=pool.range_end) except Exception as e: # pylint: disable=W0718 fail_message = 'Failed to get DHCP range: ' + str(e) diff --git a/modules/network/ntp/python/src/chronyd.py b/modules/network/ntp/python/src/chronyd.py index b8ce7db56..23dd834df 100644 --- a/modules/network/ntp/python/src/chronyd.py +++ b/modules/network/ntp/python/src/chronyd.py @@ -46,4 +46,5 @@ def is_running(self): LOGGER.info('Checking chronyd server') running = os.path.exists(PID_FILE) LOGGER.info('chronyd server status: ' + str(running)) - return running \ No newline at end of file + return running + \ No newline at end of file diff --git a/modules/network/ntp/python/src/ntp_server.py b/modules/network/ntp/python/src/ntp_server.py index 14a3d9bac..42fe21e77 100644 --- a/modules/network/ntp/python/src/ntp_server.py +++ b/modules/network/ntp/python/src/ntp_server.py @@ -16,7 +16,7 @@ from common import logger from chronyd import ChronydServer import time - +LOGGER = None LOG_NAME = 'ntp_server' class NTPServer: diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py index f7707ba3a..63eb7e8a0 100644 --- a/modules/test/base/python/src/test_module.py +++ b/modules/test/base/python/src/test_module.py @@ -26,21 +26,31 @@ class TestModule: """An example test module.""" - def __init__(self, module_name, log_name, log_dir=None,conf_file=CONF_FILE,results_dir=RESULTS_DIR): + def __init__(self, + module_name, + log_name, + log_dir=None, + conf_file=CONF_FILE, + results_dir=RESULTS_DIR): self._module_name = module_name - self._results_dir=results_dir if results_dir is not None else RESULTS_DIR - self._device_mac = os.environ.get('DEVICE_MAC','') - self._ipv4_addr = os.environ.get('IPV4_ADDR','') - self._ipv4_subnet = os.environ.get('IPV4_SUBNET','') - self._ipv6_subnet = os.environ.get('IPV6_SUBNET','') - self._add_logger(log_name=log_name, module_name=module_name, log_dir=log_dir) - self._config = self._read_config(conf_file=conf_file if conf_file is not None else CONF_FILE) + self._results_dir = results_dir if results_dir is not None else RESULTS_DIR + self._device_mac = os.environ.get('DEVICE_MAC', '') + self._ipv4_addr = os.environ.get('IPV4_ADDR', '') + self._ipv4_subnet = os.environ.get('IPV4_SUBNET', '') + self._ipv6_subnet = os.environ.get('IPV6_SUBNET', '') + self._add_logger(log_name=log_name, + module_name=module_name, + log_dir=log_dir) + self._config = self._read_config( + conf_file=conf_file if conf_file is not None else CONF_FILE) self._device_ipv4_addr = None self._device_ipv6_addr = None def _add_logger(self, log_name, module_name, log_dir=None): global LOGGER - LOGGER = logger.get_logger(log_name, module_name, log_dir=log_dir) + LOGGER = logger.get_logger(name=log_name, + log_file=module_name, + log_dir=log_dir) def generate_module_report(self): pass @@ -100,7 +110,7 @@ def run_tests(self): result = getattr(self, test_method_name)(config=test['config']) else: result = getattr(self, test_method_name)() - except Exception as e: + except Exception as e: # pylint: disable=W0718 LOGGER.error(f'An error occurred whilst running {test["name"]}') LOGGER.error(e) else: @@ -141,8 +151,7 @@ def run_tests(self): test['description'] = 'An error occured whilst running this test' # Remove the steps to resolve if compliant already - if (test['result'] == 'Compliant' and - 'recommendations' in test): + if (test['result'] == 'Compliant' and 'recommendations' in test): test.pop('recommendations') test['end'] = datetime.now().isoformat() @@ -153,7 +162,7 @@ def run_tests(self): json_results = json.dumps({'results': tests}, indent=2) self._write_results(json_results) - def _read_config(self,conf_file=CONF_FILE): + def _read_config(self, conf_file=CONF_FILE): with open(conf_file, encoding='utf-8') as f: config = json.load(f) return config diff --git a/modules/test/conn/python/src/connection_module.py b/modules/test/conn/python/src/connection_module.py index cf8852ef0..34e129103 100644 --- a/modules/test/conn/python/src/connection_module.py +++ b/modules/test/conn/python/src/connection_module.py @@ -104,9 +104,8 @@ def _connection_switch_arp_inspection(self): no_arp = False # Check MAC address matches IP address - if (arp_packet.hwsrc == self._device_mac and - (arp_packet.psrc != self._device_ipv4_addr - and arp_packet.psrc != '0.0.0.0')): + if (arp_packet.hwsrc == self._device_mac + and (arp_packet.psrc not in (self._device_ipv4_addr, '0.0.0.0'))): LOGGER.info(f'Bad ARP packet detected for MAC: {self._device_mac}') LOGGER.info(f'''ARP packet from IP {arp_packet.psrc} does not match {self._device_ipv4_addr}''') @@ -205,13 +204,11 @@ def _connection_single_ip(self): LOGGER.info('Inspecting: ' + str(len(packets)) + ' packets') for packet in packets: if DHCP in packet: - for option in packet[DHCP].options: - # message-type, option 3 = DHCPREQUEST - if self._get_dhcp_type(packet) == 3: - mac_address = packet[Ether].src - LOGGER.info('DHCPREQUEST detected MAC address: ' + mac_address) - if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX): - mac_addresses.add(mac_address.upper()) + if self._get_dhcp_type(packet) == 3: + mac_address = packet[Ether].src + LOGGER.info('DHCPREQUEST detected MAC address: ' + mac_address) + if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX): + mac_addresses.add(mac_address.upper()) # Check if the device mac address is in the list of DHCPREQUESTs result = self._device_mac.upper() in mac_addresses @@ -637,4 +634,4 @@ def test_subnets(self, subnets): LOGGER.error(traceback.format_exc()) result = {'result': False, 'details': 'Subnet test failed: ' + str(e)} results.append(result) - return results \ No newline at end of file + return results diff --git a/modules/test/dns/python/src/dns_module.py b/modules/test/dns/python/src/dns_module.py index 02d89eb0a..e9550663d 100644 --- a/modules/test/dns/python/src/dns_module.py +++ b/modules/test/dns/python/src/dns_module.py @@ -33,17 +33,17 @@ def __init__(self, log_dir=None, conf_file=None, results_dir=None, - DNS_SERVER_CAPTURE_FILE=DNS_SERVER_CAPTURE_FILE, - STARTUP_CAPTURE_FILE=STARTUP_CAPTURE_FILE, - MONITOR_CAPTURE_FILE=MONITOR_CAPTURE_FILE): + dns_server_capture_file=DNS_SERVER_CAPTURE_FILE, + startup_capture_file=STARTUP_CAPTURE_FILE, + monitor_capture_file=MONITOR_CAPTURE_FILE): super().__init__(module_name=module, log_name=LOG_NAME, log_dir=log_dir, conf_file=conf_file, results_dir=results_dir) - self.dns_server_capture_file=DNS_SERVER_CAPTURE_FILE - self.startup_capture_file=STARTUP_CAPTURE_FILE - self.monitor_capture_file=MONITOR_CAPTURE_FILE + self.dns_server_capture_file=dns_server_capture_file + self.startup_capture_file=startup_capture_file + self.monitor_capture_file=monitor_capture_file self._dns_server = '10.10.10.4' global LOGGER LOGGER = self._get_logger() diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 472d403b2..6a959bb2e 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -14,12 +14,9 @@ """Baseline test module""" from test_module import TestModule from tls_util import TLSUtil -import os import pyshark from cryptography import x509 from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec LOG_NAME = 'test_tls' MODULE_REPORT_FILE_NAME = 'tls_report.html' @@ -27,7 +24,7 @@ MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap' TLS_CAPTURE_FILE = '/runtime/output/tls.pcap' GATEWAY_CAPTURE_FILE = '/runtime/network/gateway.pcap' - +LOGGER = None class TLSModule(TestModule): """An example testing module.""" @@ -54,144 +51,144 @@ def __init__(self, # def generate_module_report(self): - # html_content = '

TLS Module

' - - # # List of capture files to scan - # pcap_files = [ - # self.startup_capture_file, self.monitor_capture_file, - # self.tls_capture_file - # ] - # certificates = self.extract_certificates_from_pcap(pcap_files, - # self._device_mac) - # if len(certificates) > 0: - - # # Add summary table - # summary_table = ''' - # - # - # - # - # - # - # - # - # - # - # - # ''' - - # # table_content = ''' - # #
ExpiryLengthTypePort numberSigned by
- # # - # # - # # - # # - # # - # # - # # - # # - # # - # # ''' - - # cert_tables = [] - # for cert_num, ((ip_address, port), cert) in enumerate( - # certificates.items()): - - # # Extract certificate data - # not_valid_before = cert.not_valid_before - # not_valid_after = cert.not_valid_after - # version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})' - # signature_alg_value = cert.signature_algorithm_oid._name # pylint: disable=W0212 - # not_before = str(not_valid_before) - # not_after = str(not_valid_after) - # public_key = cert.public_key() - # signed_by = 'None' - # if isinstance(public_key, rsa.RSAPublicKey): - # public_key_type = 'RSA' - # elif isinstance(public_key, dsa.DSAPublicKey): - # public_key_type = 'DSA' - # elif isinstance(public_key, ec.EllipticCurvePublicKey): - # public_key_type = 'EC' - # else: - # public_key_type = 'Unknown' - # # Calculate certificate length - # cert_length = len(cert.public_bytes( - # encoding=serialization.Encoding.DER)) - - # # Generate the Certificate table - # # cert_table = (f'| Property | Value |\n' - # # f'|---|---|\n' - # # f"| {'Version':<17} | {version_value:^25} |\n" - # # f"| {'Signature Alg.':<17} | {signature_alg_value:^25} |\n" - # # f"| {'Validity from':<17} | {not_before:^25} |\n" - # # f"| {'Valid to':<17} | {not_after:^25} |") - - # # Generate the Subject table - # subj_table = ('| Distinguished Name | Value |\n' - # '|---|---|') - # for val in cert.subject.rdns: - # dn = val.rfc4514_string().split('=') - # subj_table += f'\n| {dn[0]} | {dn[1]}' - - # # Generate the Issuer table - # iss_table = ('| Distinguished Name | Value |\n' - # '|---|---|') - # for val in cert.issuer.rdns: - # dn = val.rfc4514_string().split('=') - # iss_table += f'\n| {dn[0]} | {dn[1]}' - # if 'CN' in dn[0]: - # signed_by = dn[1] - - # ext_table = None - # # if cert.extensions: - # # ext_table = ('| Extension | Value |\n' - # # '|---|---|') - # # for extension in cert.extensions: - # # for extension_value in extension.value: - # # ext_table += f'''\n| {extension.oid._name} | - # # {extension_value.value}''' # pylint: disable=W0212 - # # cert_table = f'### Certificate\n{cert_table}' - # # cert_table += f'\n\n### Subject\n{subj_table}' - # # cert_table += f'\n\n### Issuer\n{iss_table}' - # # if ext_table is not None: - # # cert_table += f'\n\n### Extensions\n{ext_table}' - # # cert_tables.append(cert_table) - - # summary_table += f''' - # - # - # - # - # - # - # - # ''' - - # summary_table += ''' - # - #
ExpiryLengthTypePort numberSigned by
{not_after}{cert_length}{public_key_type}{port}{signed_by}
- # ''' - - # html_content += summary_table - - # else: - # html_content += (''' - #
- #
- # No TLS certificates found on the device - #
''') - - # LOGGER.debug('Module report:\n' + html_content) - - # # Use os.path.join to create the complete file path - # report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME) - - # # Write the content to a file - # with open(report_path, 'w', encoding='utf-8') as file: - # file.write(html_content) - - # LOGGER.info('Module report generated at: ' + str(report_path)) - # return report_path + # html_content = '

TLS Module

' + + # # List of capture files to scan + # pcap_files = [ + # self.startup_capture_file, self.monitor_capture_file, + # self.tls_capture_file + # ] + # certificates = self.extract_certificates_from_pcap(pcap_files, + # self._device_mac) + # if len(certificates) > 0: + + # # Add summary table + # summary_table = ''' + # + # + # + # + # + # + # + # + # + # + # + # ''' + + # # table_content = ''' + # #
ExpiryLengthTypePort numberSigned by
+ # # + # # + # # + # # + # # + # # + # # + # # + # # + # # ''' + + # cert_tables = [] + # for cert_num, ((ip_address, port), cert) in enumerate( + # certificates.items()): + + # # Extract certificate data + # not_valid_before = cert.not_valid_before + # not_valid_after = cert.not_valid_after + # version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})' + # signature_alg_value = cert.signature_algorithm_oid._name # pylint: disable=W0212 + # not_before = str(not_valid_before) + # not_after = str(not_valid_after) + # public_key = cert.public_key() + # signed_by = 'None' + # if isinstance(public_key, rsa.RSAPublicKey): + # public_key_type = 'RSA' + # elif isinstance(public_key, dsa.DSAPublicKey): + # public_key_type = 'DSA' + # elif isinstance(public_key, ec.EllipticCurvePublicKey): + # public_key_type = 'EC' + # else: + # public_key_type = 'Unknown' + # # Calculate certificate length + # cert_length = len(cert.public_bytes( + # encoding=serialization.Encoding.DER)) + + # # Generate the Certificate table + # # cert_table = (f'| Property | Value |\n' + # # f'|---|---|\n' + # # f"| {'Version':<17} | {version_value:^25} |\n" + # # f"| {'Signature Alg.':<17} | {signature_alg_value:^25} |\n" + # # f"| {'Validity from':<17} | {not_before:^25} |\n" + # # f"| {'Valid to':<17} | {not_after:^25} |") + + # # Generate the Subject table + # subj_table = ('| Distinguished Name | Value |\n' + # '|---|---|') + # for val in cert.subject.rdns: + # dn = val.rfc4514_string().split('=') + # subj_table += f'\n| {dn[0]} | {dn[1]}' + + # # Generate the Issuer table + # iss_table = ('| Distinguished Name | Value |\n' + # '|---|---|') + # for val in cert.issuer.rdns: + # dn = val.rfc4514_string().split('=') + # iss_table += f'\n| {dn[0]} | {dn[1]}' + # if 'CN' in dn[0]: + # signed_by = dn[1] + + # ext_table = None + # # if cert.extensions: + # # ext_table = ('| Extension | Value |\n' + # # '|---|---|') + # # for extension in cert.extensions: + # # for extension_value in extension.value: + # # ext_table += f'''\n| {extension.oid._name} | + # # {extension_value.value}''' # pylint: disable=W0212 + # # cert_table = f'### Certificate\n{cert_table}' + # # cert_table += f'\n\n### Subject\n{subj_table}' + # # cert_table += f'\n\n### Issuer\n{iss_table}' + # # if ext_table is not None: + # # cert_table += f'\n\n### Extensions\n{ext_table}' + # # cert_tables.append(cert_table) + + # summary_table += f''' + # + # + # + # + # + # + # + # ''' + + # summary_table += ''' + # + #
ExpiryLengthTypePort numberSigned by
{not_after}{cert_length}{public_key_type}{port}{signed_by}
+ # ''' + + # html_content += summary_table + + # else: + # html_content += (''' + #
+ #
+ # No TLS certificates found on the device + #
''') + + # LOGGER.debug('Module report:\n' + html_content) + + # # Use os.path.join to create the complete file path + # report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME) + + # # Write the content to a file + # with open(report_path, 'w', encoding='utf-8') as file: + # file.write(html_content) + + # LOGGER.info('Module report generated at: ' + str(report_path)) + # return report_path def extract_certificates_from_pcap(self, pcap_files, mac_address): # Initialize a list to store packets diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index ef8e74e64..14d6d15e8 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -550,13 +550,14 @@ def get_non_tls_client_connection_ips(self, client_ip, capture_files): non_tls_dst_ips = set() # Store unique destination IPs for packet in packets: # Check if packet contains TCP layer - if 'tcp' in packet['_source']['layers']: - tcp_flags = packet['_source']['layers']['tcp.flags'] - if 'A' not in tcp_flags and 'S' not in tcp_flags: - # Packet is not ACK or SYN - dst_ip = ipaddress.ip_address(packet['_source']['layers']['ip.dst'][0]) - if not dst_ip in subnet_with_mask: - non_tls_dst_ips.add(str(dst_ip)) + if 'tcp' in packet['_source']['layers']: + tcp_flags = packet['_source']['layers']['tcp.flags'] + if 'A' not in tcp_flags and 'S' not in tcp_flags: + # Packet is not ACK or SYN + dst_ip = ipaddress.ip_address( + packet['_source']['layers']['ip.dst'][0]) + if not dst_ip in subnet_with_mask: + non_tls_dst_ips.add(str(dst_ip)) return non_tls_dst_ips # Check if the device has made any outbound connections that don't diff --git a/testing/pylint/test_pylint b/testing/pylint/test_pylint index a330d54c3..9e9074aa7 100755 --- a/testing/pylint/test_pylint +++ b/testing/pylint/test_pylint @@ -14,14 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -ERROR_LIMIT=0 +ERROR_LIMIT=78 sudo cmd/install source venv/bin/activate sudo pip3 install pylint==3.0.3 -files=$(find ./framework -path ./venv -prune -o -name '*.py' -print) +files=$(find ./ -path ./venv -prune -o -name '*.py' -print) OUT=pylint.out diff --git a/testing/tests/test_tests.py b/testing/tests/test_tests.py index a14afb2cb..895b63ec0 100644 --- a/testing/tests/test_tests.py +++ b/testing/tests/test_tests.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Test assertions for CI testing of tests """ # Temporarily disabled because using Pytest fixtures # TODO refactor fixtures to not trigger error @@ -29,6 +28,7 @@ TEST_MATRIX = 'test_tests.json' RESULTS_PATH = '/tmp/results/*.json' + #TODO add reason @dataclass(frozen=True) class TestResult: @@ -73,19 +73,23 @@ def test_tests(results, test_matrix): actual = set(collect_actual_results(results[tester])) assert expected & actual == expected -def test_list_tests(capsys, results, test_matrix): - all_tests = set(itertools.chain.from_iterable( - [collect_actual_results(results[x]) for x in results.keys()])) - ci_pass = set([test - for testers in test_matrix.values() - for test, result in testers['expected_results'].items() - if result == 'Compliant']) - - ci_fail = set([test - for testers in test_matrix.values() - for test, result in testers['expected_results'].items() - if result == 'Non-Compliant']) +def test_list_tests(capsys, results, test_matrix): + all_tests = set( + itertools.chain.from_iterable( + [collect_actual_results(results[x]) for x in results.keys()])) + + ci_pass = set([ + test for testers in test_matrix.values() + for test, result in testers['expected_results'].items() + if result == 'Compliant' + ]) + + ci_fail = set([ + test for testers in test_matrix.values() + for test, result in testers['expected_results'].items() + if result == 'Non-Compliant' + ]) with capsys.disabled(): #TODO print matching the JSON schema for easy copy/paste @@ -101,12 +105,15 @@ def test_list_tests(capsys, results, test_matrix): for tester in test_matrix.keys(): print(f'\n{tester}:') print(' expected results:') - for test in collect_expected_results(test_matrix[tester]['expected_results']): + for test in collect_expected_results( + test_matrix[tester]['expected_results']): print(f' {test.name}: {test.result}') print(' actual results:') for test in collect_actual_results(results[tester]): if test.name in test_matrix[tester]['expected_results']: - print(f' {test.name}: {test.result} (exp: {test_matrix[tester]["expected_results"][test.name]})') + print( + f' {test.name}: {test.result} (exp: {test_matrix[tester]["expected_results"][test.name]})' + ) else: print(f' {test.name}: {test.result}') diff --git a/testing/unit/dns/dns_module_test.py b/testing/unit/dns/dns_module_test.py index 52ec80b4d..6c3dec74d 100644 --- a/testing/unit/dns/dns_module_test.py +++ b/testing/unit/dns/dns_module_test.py @@ -22,18 +22,19 @@ # Define the directories TEST_FILES_DIR = 'testing/unit/' + MODULE -OUTPUT_DIR = os.path.join(TEST_FILES_DIR,'output/') -REPORTS_DIR = os.path.join(TEST_FILES_DIR,'reports/') -CAPTURES_DIR = os.path.join(TEST_FILES_DIR,'captures/') +OUTPUT_DIR = os.path.join(TEST_FILES_DIR, 'output/') +REPORTS_DIR = os.path.join(TEST_FILES_DIR, 'reports/') +CAPTURES_DIR = os.path.join(TEST_FILES_DIR, 'captures/') -LOCAL_REPORT = os.path.join(REPORTS_DIR,'dns_report_local.html') -LOCAL_REPORT_NO_DNS = os.path.join(REPORTS_DIR,'dns_report_local_no_dns.html') +LOCAL_REPORT = os.path.join(REPORTS_DIR, 'dns_report_local.html') +LOCAL_REPORT_NO_DNS = os.path.join(REPORTS_DIR, 'dns_report_local_no_dns.html') CONF_FILE = 'modules/test/' + MODULE + '/conf/module_config.json' # Define the capture files to be used for the test -DNS_SERVER_CAPTURE_FILE = os.path.join(CAPTURES_DIR,'dns.pcap') -STARTUP_CAPTURE_FILE = os.path.join(CAPTURES_DIR,'startup.pcap') -MONITOR_CAPTURE_FILE = os.path.join(CAPTURES_DIR,'monitor.pcap') +DNS_SERVER_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'dns.pcap') +STARTUP_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'startup.pcap') +MONITOR_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'monitor.pcap') + class TLSModuleTest(unittest.TestCase): """Contains and runs all the unit tests concerning DNS behaviors""" @@ -49,9 +50,9 @@ def dns_module_report_test(self): log_dir=OUTPUT_DIR, conf_file=CONF_FILE, results_dir=OUTPUT_DIR, - DNS_SERVER_CAPTURE_FILE=DNS_SERVER_CAPTURE_FILE, - STARTUP_CAPTURE_FILE=STARTUP_CAPTURE_FILE, - MONITOR_CAPTURE_FILE=MONITOR_CAPTURE_FILE) + dns_server_capture_file=DNS_SERVER_CAPTURE_FILE, + startup_capture_file=STARTUP_CAPTURE_FILE, + monitor_capture_file=MONITOR_CAPTURE_FILE) report_out_path = dns_module.generate_module_report() @@ -61,7 +62,7 @@ def dns_module_report_test(self): formatted_report = self.add_formatting(report_out) # Write back the new formatted_report value - out_report_path = os.path.join(OUTPUT_DIR,'dns_report_with_dns.html') + out_report_path = os.path.join(OUTPUT_DIR, 'dns_report_with_dns.html') with open(out_report_path, 'w', encoding='utf-8') as file: file.write(formatted_report) @@ -105,9 +106,9 @@ def dns_module_report_no_dns_test(self): log_dir=OUTPUT_DIR, conf_file=CONF_FILE, results_dir=OUTPUT_DIR, - DNS_SERVER_CAPTURE_FILE=dns_server_cap_file, - STARTUP_CAPTURE_FILE=startup_cap_file, - MONITOR_CAPTURE_FILE=monitor_cap_file) + dns_server_capture_file=dns_server_cap_file, + startup_capture_file=startup_cap_file, + monitor_capture_file=monitor_cap_file) report_out_path = dns_module.generate_module_report() @@ -117,7 +118,7 @@ def dns_module_report_no_dns_test(self): formatted_report = self.add_formatting(report_out) # Write back the new formatted_report value - out_report_path = os.path.join(OUTPUT_DIR,'dns_report_no_dns.html') + out_report_path = os.path.join(OUTPUT_DIR, 'dns_report_no_dns.html') with open(out_report_path, 'w', encoding='utf-8') as file: file.write(formatted_report) @@ -127,8 +128,7 @@ def dns_module_report_no_dns_test(self): self.assertEqual(report_out, report_local) - - def add_formatting(self,body): + def add_formatting(self, body): return f''' @@ -138,6 +138,7 @@ def add_formatting(self,body): diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index fb06b0eb1..0f8cada3e 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -27,20 +27,21 @@ MODULE = 'tls' # Define the file paths TEST_FILES_DIR = 'testing/unit/' + MODULE -OUTPUT_DIR = os.path.join(TEST_FILES_DIR,'output/') -REPORTS_DIR = os.path.join(TEST_FILES_DIR,'reports/') -CAPTURES_DIR = os.path.join(TEST_FILES_DIR,'captures/') -CERT_DIR = os.path.join(TEST_FILES_DIR,'certs/') -ROOT_CERTS_DIR = os.path.join(TEST_FILES_DIR,'root_certs') - -LOCAL_REPORT = os.path.join(REPORTS_DIR,'tls_report_local.md') -LOCAL_REPORT_EXT = os.path.join(REPORTS_DIR,'tls_report_ext_local.md') -LOCAL_REPORT_NO_CERT = os.path.join(REPORTS_DIR,'tls_report_no_cert_local.md') +OUTPUT_DIR = os.path.join(TEST_FILES_DIR, 'output/') +REPORTS_DIR = os.path.join(TEST_FILES_DIR, 'reports/') +CAPTURES_DIR = os.path.join(TEST_FILES_DIR, 'captures/') +CERT_DIR = os.path.join(TEST_FILES_DIR, 'certs/') +ROOT_CERTS_DIR = os.path.join(TEST_FILES_DIR, 'root_certs') + +LOCAL_REPORT = os.path.join(REPORTS_DIR, 'tls_report_local.md') +LOCAL_REPORT_EXT = os.path.join(REPORTS_DIR, 'tls_report_ext_local.md') +LOCAL_REPORT_NO_CERT = os.path.join(REPORTS_DIR, 'tls_report_no_cert_local.md') CONF_FILE = 'modules/test/' + MODULE + '/conf/module_config.json' TLS_UTIL = None PACKET_CAPTURE = None + class TLSModuleTest(unittest.TestCase): """Contains and runs all the unit tests concerning TLS behaviors""" @@ -148,7 +149,7 @@ def security_tls_server_results_test(self, ): tls_1_2_results = True, success_message tls_1_3_results = True, success_message expected = True, (f'TLS 1.2 validated: {success_message}\n' - f'TLS 1.3 validated: {success_message}') + f'TLS 1.3 validated: {success_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, tls_1_3_results) self.assertEqual(result, expected) @@ -171,11 +172,10 @@ def security_tls_server_results_test(self, ): tls_1_3_results) self.assertEqual(result, expected) - # TLS 1.2 Fail and TLS 1.2 Fail tls_1_3_results = False, fail_message expected = False, (f'TLS 1.2 not validated: {fail_message}\n' - f'TLS 1.3 not validated: {fail_message}') + f'TLS 1.3 not validated: {fail_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, tls_1_3_results) self.assertEqual(result, expected) @@ -301,7 +301,7 @@ def security_tls_client_unsupported_tls_client(self): def tls_module_report_test(self): print('\ntls_module_report_test') os.environ['DEVICE_MAC'] = '38:d1:35:01:17:fe' - pcap_file = os.path.join(CAPTURES_DIR,'tls.pcap') + pcap_file = os.path.join(CAPTURES_DIR, 'tls.pcap') tls = TLSModule(module=MODULE, log_dir=OUTPUT_DIR, conf_file=CONF_FILE, @@ -323,7 +323,7 @@ def tls_module_report_test(self): def tls_module_report_ext_test(self): print('\ntls_module_report_ext_test') os.environ['DEVICE_MAC'] = '28:29:86:27:d6:05' - pcap_file = os.path.join(CAPTURES_DIR,'tls_ext.pcap') + pcap_file = os.path.join(CAPTURES_DIR, 'tls_ext.pcap') tls = TLSModule(module=MODULE, log_dir=OUTPUT_DIR, conf_file=CONF_FILE, @@ -336,7 +336,7 @@ def tls_module_report_ext_test(self): def tls_module_report_no_cert_test(self): print('\ntls_module_report_no_cert_test') os.environ['DEVICE_MAC'] = '' - pcap_file = os.path.join(CAPTURES_DIR,'tls_ext.pcap') + pcap_file = os.path.join(CAPTURES_DIR, 'tls_ext.pcap') tls = TLSModule(module=MODULE, log_dir=OUTPUT_DIR, conf_file=CONF_FILE, @@ -443,16 +443,18 @@ def get_interface_ip(self, interface_name): def tls_module_trusted_ca_cert_chain_test(self): print('\ntls_module_trusted_ca_cert_chain_test') - cert_path = os.path.join(CERT_DIR,'_.google.com.crt') + cert_path = os.path.join(CERT_DIR, '_.google.com.crt') cert_valid = TLS_UTIL.validate_cert_chain(device_cert_path=cert_path) self.assertEqual(cert_valid, True) def tls_module_local_ca_cert_test(self): print('\ntls_module_trusted_ca_cert_chain_test') - cert_path = os.path.join(CERT_DIR,'device_cert_local.crt') - cert_valid = TLS_UTIL.validate_local_ca_signature(device_cert_path=cert_path) + cert_path = os.path.join(CERT_DIR, 'device_cert_local.crt') + cert_valid = TLS_UTIL.validate_local_ca_signature( + device_cert_path=cert_path) self.assertEqual(cert_valid[0], True) + if __name__ == '__main__': suite = unittest.TestSuite() suite.addTest(TLSModuleTest('client_hello_packets_test'))