diff --git a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py
index 877d49610..d38d95785 100644
--- a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py
+++ b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py
@@ -56,6 +56,12 @@ def enable_failover(self):
for subnet in self._subnets:
subnet.enable_peer()
+ def get_peer(self):
+ return self._peer
+
+ def get_subnets(self):
+ return self._subnets
+
def get_reserved_host(self, hw_addr):
for host in self._reserved_hosts:
if hw_addr == host.hw_addr:
diff --git a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py
index 4bc1bd52d..493266c2c 100644
--- a/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py
+++ b/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py
@@ -61,14 +61,14 @@ def test_resolve_config(self):
def test_disable_failover(self):
DHCP_CONFIG.disable_failover()
print('Test Disable Config:\n' + str(DHCP_CONFIG))
- config_lines = str(DHCP_CONFIG._peer).split('\n')
+ config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertTrue(line.startswith('#'))
def test_enable_failover(self):
DHCP_CONFIG.enable_failover()
print('Test Enable Config:\n' + str(DHCP_CONFIG))
- config_lines = str(DHCP_CONFIG._peer).split('\n')
+ config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertFalse(line.startswith('#'))
diff --git a/modules/network/dhcp-1/python/src/grpc_server/network_service.py b/modules/network/dhcp-1/python/src/grpc_server/network_service.py
index 92726025d..5124a07ad 100644
--- a/modules/network/dhcp-1/python/src/grpc_server/network_service.py
+++ b/modules/network/dhcp-1/python/src/grpc_server/network_service.py
@@ -142,7 +142,7 @@ def GetDHCPRange(self, request, context): # pylint: disable=W0613
"""
LOGGER.info('Get DHCP range called')
try:
- pool = self._get_dhcp_config()._subnets[0].pools[0]
+ pool = self._get_dhcp_config().get_subnets()[0].pools[0]
return pb2.DHCPRange(code=200, start=pool.range_start, end=pool.range_end)
except Exception as e: # pylint: disable=W0718
fail_message = 'Failed to get DHCP range: ' + str(e)
diff --git a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py
index 5357ba7ed..c49523a64 100644
--- a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py
+++ b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py
@@ -58,6 +58,12 @@ def enable_failover(self):
for subnet in self._subnets:
subnet.enable_peer()
+ def get_peer(self):
+ return self._peer
+
+ def get_subnets(self):
+ return self._subnets
+
def get_reserved_host(self, hw_addr):
for host in self._reserved_hosts:
if hw_addr == host.hw_addr:
diff --git a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py
index 0a156db68..7d368265a 100644
--- a/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py
+++ b/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py
@@ -58,14 +58,14 @@ def test_resolve_config(self):
def test_disable_failover(self):
DHCP_CONFIG.disable_failover()
print('Test Disable Config:\n' + str(DHCP_CONFIG))
- config_lines = str(DHCP_CONFIG._peer).split('\n')
+ config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertTrue(line.startswith('#'))
def test_enable_failover(self):
DHCP_CONFIG.enable_failover()
print('Test Enable Config:\n' + str(DHCP_CONFIG))
- config_lines = str(DHCP_CONFIG._peer).split('\n')
+ config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertFalse(line.startswith('#'))
diff --git a/modules/network/dhcp-2/python/src/grpc_server/network_service.py b/modules/network/dhcp-2/python/src/grpc_server/network_service.py
index f9deba965..7c5c61d4f 100644
--- a/modules/network/dhcp-2/python/src/grpc_server/network_service.py
+++ b/modules/network/dhcp-2/python/src/grpc_server/network_service.py
@@ -142,7 +142,7 @@ def GetDHCPRange(self, request, context): # pylint: disable=W0613
"""
LOGGER.info('Get DHCP range called')
try:
- pool = self._get_dhcp_config()._subnets[0].pools[0]
+ pool = self._get_dhcp_config().get_subnets()[0].pools[0]
return pb2.DHCPRange(code=200, start=pool.range_start, end=pool.range_end)
except Exception as e: # pylint: disable=W0718
fail_message = 'Failed to get DHCP range: ' + str(e)
diff --git a/modules/network/ntp/python/src/chronyd.py b/modules/network/ntp/python/src/chronyd.py
index b8ce7db56..23dd834df 100644
--- a/modules/network/ntp/python/src/chronyd.py
+++ b/modules/network/ntp/python/src/chronyd.py
@@ -46,4 +46,5 @@ def is_running(self):
LOGGER.info('Checking chronyd server')
running = os.path.exists(PID_FILE)
LOGGER.info('chronyd server status: ' + str(running))
- return running
\ No newline at end of file
+ return running
+
\ No newline at end of file
diff --git a/modules/network/ntp/python/src/ntp_server.py b/modules/network/ntp/python/src/ntp_server.py
index 14a3d9bac..42fe21e77 100644
--- a/modules/network/ntp/python/src/ntp_server.py
+++ b/modules/network/ntp/python/src/ntp_server.py
@@ -16,7 +16,7 @@
from common import logger
from chronyd import ChronydServer
import time
-
+LOGGER = None
LOG_NAME = 'ntp_server'
class NTPServer:
diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py
index f7707ba3a..63eb7e8a0 100644
--- a/modules/test/base/python/src/test_module.py
+++ b/modules/test/base/python/src/test_module.py
@@ -26,21 +26,31 @@
class TestModule:
"""An example test module."""
- def __init__(self, module_name, log_name, log_dir=None,conf_file=CONF_FILE,results_dir=RESULTS_DIR):
+ def __init__(self,
+ module_name,
+ log_name,
+ log_dir=None,
+ conf_file=CONF_FILE,
+ results_dir=RESULTS_DIR):
self._module_name = module_name
- self._results_dir=results_dir if results_dir is not None else RESULTS_DIR
- self._device_mac = os.environ.get('DEVICE_MAC','')
- self._ipv4_addr = os.environ.get('IPV4_ADDR','')
- self._ipv4_subnet = os.environ.get('IPV4_SUBNET','')
- self._ipv6_subnet = os.environ.get('IPV6_SUBNET','')
- self._add_logger(log_name=log_name, module_name=module_name, log_dir=log_dir)
- self._config = self._read_config(conf_file=conf_file if conf_file is not None else CONF_FILE)
+ self._results_dir = results_dir if results_dir is not None else RESULTS_DIR
+ self._device_mac = os.environ.get('DEVICE_MAC', '')
+ self._ipv4_addr = os.environ.get('IPV4_ADDR', '')
+ self._ipv4_subnet = os.environ.get('IPV4_SUBNET', '')
+ self._ipv6_subnet = os.environ.get('IPV6_SUBNET', '')
+ self._add_logger(log_name=log_name,
+ module_name=module_name,
+ log_dir=log_dir)
+ self._config = self._read_config(
+ conf_file=conf_file if conf_file is not None else CONF_FILE)
self._device_ipv4_addr = None
self._device_ipv6_addr = None
def _add_logger(self, log_name, module_name, log_dir=None):
global LOGGER
- LOGGER = logger.get_logger(log_name, module_name, log_dir=log_dir)
+ LOGGER = logger.get_logger(name=log_name,
+ log_file=module_name,
+ log_dir=log_dir)
def generate_module_report(self):
pass
@@ -100,7 +110,7 @@ def run_tests(self):
result = getattr(self, test_method_name)(config=test['config'])
else:
result = getattr(self, test_method_name)()
- except Exception as e:
+ except Exception as e: # pylint: disable=W0718
LOGGER.error(f'An error occurred whilst running {test["name"]}')
LOGGER.error(e)
else:
@@ -141,8 +151,7 @@ def run_tests(self):
test['description'] = 'An error occured whilst running this test'
# Remove the steps to resolve if compliant already
- if (test['result'] == 'Compliant' and
- 'recommendations' in test):
+ if (test['result'] == 'Compliant' and 'recommendations' in test):
test.pop('recommendations')
test['end'] = datetime.now().isoformat()
@@ -153,7 +162,7 @@ def run_tests(self):
json_results = json.dumps({'results': tests}, indent=2)
self._write_results(json_results)
- def _read_config(self,conf_file=CONF_FILE):
+ def _read_config(self, conf_file=CONF_FILE):
with open(conf_file, encoding='utf-8') as f:
config = json.load(f)
return config
diff --git a/modules/test/conn/python/src/connection_module.py b/modules/test/conn/python/src/connection_module.py
index cf8852ef0..34e129103 100644
--- a/modules/test/conn/python/src/connection_module.py
+++ b/modules/test/conn/python/src/connection_module.py
@@ -104,9 +104,8 @@ def _connection_switch_arp_inspection(self):
no_arp = False
# Check MAC address matches IP address
- if (arp_packet.hwsrc == self._device_mac and
- (arp_packet.psrc != self._device_ipv4_addr
- and arp_packet.psrc != '0.0.0.0')):
+ if (arp_packet.hwsrc == self._device_mac
+ and (arp_packet.psrc not in (self._device_ipv4_addr, '0.0.0.0'))):
LOGGER.info(f'Bad ARP packet detected for MAC: {self._device_mac}')
LOGGER.info(f'''ARP packet from IP {arp_packet.psrc}
does not match {self._device_ipv4_addr}''')
@@ -205,13 +204,11 @@ def _connection_single_ip(self):
LOGGER.info('Inspecting: ' + str(len(packets)) + ' packets')
for packet in packets:
if DHCP in packet:
- for option in packet[DHCP].options:
- # message-type, option 3 = DHCPREQUEST
- if self._get_dhcp_type(packet) == 3:
- mac_address = packet[Ether].src
- LOGGER.info('DHCPREQUEST detected MAC address: ' + mac_address)
- if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX):
- mac_addresses.add(mac_address.upper())
+ if self._get_dhcp_type(packet) == 3:
+ mac_address = packet[Ether].src
+ LOGGER.info('DHCPREQUEST detected MAC address: ' + mac_address)
+ if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX):
+ mac_addresses.add(mac_address.upper())
# Check if the device mac address is in the list of DHCPREQUESTs
result = self._device_mac.upper() in mac_addresses
@@ -637,4 +634,4 @@ def test_subnets(self, subnets):
LOGGER.error(traceback.format_exc())
result = {'result': False, 'details': 'Subnet test failed: ' + str(e)}
results.append(result)
- return results
\ No newline at end of file
+ return results
diff --git a/modules/test/dns/python/src/dns_module.py b/modules/test/dns/python/src/dns_module.py
index 02d89eb0a..e9550663d 100644
--- a/modules/test/dns/python/src/dns_module.py
+++ b/modules/test/dns/python/src/dns_module.py
@@ -33,17 +33,17 @@ def __init__(self,
log_dir=None,
conf_file=None,
results_dir=None,
- DNS_SERVER_CAPTURE_FILE=DNS_SERVER_CAPTURE_FILE,
- STARTUP_CAPTURE_FILE=STARTUP_CAPTURE_FILE,
- MONITOR_CAPTURE_FILE=MONITOR_CAPTURE_FILE):
+ dns_server_capture_file=DNS_SERVER_CAPTURE_FILE,
+ startup_capture_file=STARTUP_CAPTURE_FILE,
+ monitor_capture_file=MONITOR_CAPTURE_FILE):
super().__init__(module_name=module,
log_name=LOG_NAME,
log_dir=log_dir,
conf_file=conf_file,
results_dir=results_dir)
- self.dns_server_capture_file=DNS_SERVER_CAPTURE_FILE
- self.startup_capture_file=STARTUP_CAPTURE_FILE
- self.monitor_capture_file=MONITOR_CAPTURE_FILE
+ self.dns_server_capture_file=dns_server_capture_file
+ self.startup_capture_file=startup_capture_file
+ self.monitor_capture_file=monitor_capture_file
self._dns_server = '10.10.10.4'
global LOGGER
LOGGER = self._get_logger()
diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py
index 472d403b2..6a959bb2e 100644
--- a/modules/test/tls/python/src/tls_module.py
+++ b/modules/test/tls/python/src/tls_module.py
@@ -14,12 +14,9 @@
"""Baseline test module"""
from test_module import TestModule
from tls_util import TLSUtil
-import os
import pyshark
from cryptography import x509
from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives import serialization
-from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec
LOG_NAME = 'test_tls'
MODULE_REPORT_FILE_NAME = 'tls_report.html'
@@ -27,7 +24,7 @@
MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap'
TLS_CAPTURE_FILE = '/runtime/output/tls.pcap'
GATEWAY_CAPTURE_FILE = '/runtime/network/gateway.pcap'
-
+LOGGER = None
class TLSModule(TestModule):
"""An example testing module."""
@@ -54,144 +51,144 @@ def __init__(self,
# def generate_module_report(self):
- # html_content = '
TLS Module
'
-
- # # List of capture files to scan
- # pcap_files = [
- # self.startup_capture_file, self.monitor_capture_file,
- # self.tls_capture_file
- # ]
- # certificates = self.extract_certificates_from_pcap(pcap_files,
- # self._device_mac)
- # if len(certificates) > 0:
-
- # # Add summary table
- # summary_table = '''
- #
- #
- #
- # | Expiry |
- # Length |
- # Type |
- # Port number |
- # Signed by |
- #
- #
- #
- # '''
-
- # # table_content = '''
- # #
- # #
- # #
- # # | Expiry |
- # # Length |
- # # Type |
- # # Port number |
- # # Signed by |
- # #
- # #
- # # '''
-
- # cert_tables = []
- # for cert_num, ((ip_address, port), cert) in enumerate(
- # certificates.items()):
-
- # # Extract certificate data
- # not_valid_before = cert.not_valid_before
- # not_valid_after = cert.not_valid_after
- # version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})'
- # signature_alg_value = cert.signature_algorithm_oid._name # pylint: disable=W0212
- # not_before = str(not_valid_before)
- # not_after = str(not_valid_after)
- # public_key = cert.public_key()
- # signed_by = 'None'
- # if isinstance(public_key, rsa.RSAPublicKey):
- # public_key_type = 'RSA'
- # elif isinstance(public_key, dsa.DSAPublicKey):
- # public_key_type = 'DSA'
- # elif isinstance(public_key, ec.EllipticCurvePublicKey):
- # public_key_type = 'EC'
- # else:
- # public_key_type = 'Unknown'
- # # Calculate certificate length
- # cert_length = len(cert.public_bytes(
- # encoding=serialization.Encoding.DER))
-
- # # Generate the Certificate table
- # # cert_table = (f'| Property | Value |\n'
- # # f'|---|---|\n'
- # # f"| {'Version':<17} | {version_value:^25} |\n"
- # # f"| {'Signature Alg.':<17} | {signature_alg_value:^25} |\n"
- # # f"| {'Validity from':<17} | {not_before:^25} |\n"
- # # f"| {'Valid to':<17} | {not_after:^25} |")
-
- # # Generate the Subject table
- # subj_table = ('| Distinguished Name | Value |\n'
- # '|---|---|')
- # for val in cert.subject.rdns:
- # dn = val.rfc4514_string().split('=')
- # subj_table += f'\n| {dn[0]} | {dn[1]}'
-
- # # Generate the Issuer table
- # iss_table = ('| Distinguished Name | Value |\n'
- # '|---|---|')
- # for val in cert.issuer.rdns:
- # dn = val.rfc4514_string().split('=')
- # iss_table += f'\n| {dn[0]} | {dn[1]}'
- # if 'CN' in dn[0]:
- # signed_by = dn[1]
-
- # ext_table = None
- # # if cert.extensions:
- # # ext_table = ('| Extension | Value |\n'
- # # '|---|---|')
- # # for extension in cert.extensions:
- # # for extension_value in extension.value:
- # # ext_table += f'''\n| {extension.oid._name} |
- # # {extension_value.value}''' # pylint: disable=W0212
- # # cert_table = f'### Certificate\n{cert_table}'
- # # cert_table += f'\n\n### Subject\n{subj_table}'
- # # cert_table += f'\n\n### Issuer\n{iss_table}'
- # # if ext_table is not None:
- # # cert_table += f'\n\n### Extensions\n{ext_table}'
- # # cert_tables.append(cert_table)
-
- # summary_table += f'''
- #
- # | {not_after} |
- # {cert_length} |
- # {public_key_type} |
- # {port} |
- # {signed_by} |
- #
- # '''
-
- # summary_table += '''
- #
- #
- # '''
-
- # html_content += summary_table
-
- # else:
- # html_content += ('''
- #
- #
- # No TLS certificates found on the device
- #
''')
-
- # LOGGER.debug('Module report:\n' + html_content)
-
- # # Use os.path.join to create the complete file path
- # report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME)
-
- # # Write the content to a file
- # with open(report_path, 'w', encoding='utf-8') as file:
- # file.write(html_content)
-
- # LOGGER.info('Module report generated at: ' + str(report_path))
- # return report_path
+ # html_content = 'TLS Module
'
+
+ # # List of capture files to scan
+ # pcap_files = [
+ # self.startup_capture_file, self.monitor_capture_file,
+ # self.tls_capture_file
+ # ]
+ # certificates = self.extract_certificates_from_pcap(pcap_files,
+ # self._device_mac)
+ # if len(certificates) > 0:
+
+ # # Add summary table
+ # summary_table = '''
+ #
+ #
+ #
+ # | Expiry |
+ # Length |
+ # Type |
+ # Port number |
+ # Signed by |
+ #
+ #
+ #
+ # '''
+
+ # # table_content = '''
+ # #
+ # #
+ # #
+ # # | Expiry |
+ # # Length |
+ # # Type |
+ # # Port number |
+ # # Signed by |
+ # #
+ # #
+ # # '''
+
+ # cert_tables = []
+ # for cert_num, ((ip_address, port), cert) in enumerate(
+ # certificates.items()):
+
+ # # Extract certificate data
+ # not_valid_before = cert.not_valid_before
+ # not_valid_after = cert.not_valid_after
+ # version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})'
+ # signature_alg_value = cert.signature_algorithm_oid._name # pylint: disable=W0212
+ # not_before = str(not_valid_before)
+ # not_after = str(not_valid_after)
+ # public_key = cert.public_key()
+ # signed_by = 'None'
+ # if isinstance(public_key, rsa.RSAPublicKey):
+ # public_key_type = 'RSA'
+ # elif isinstance(public_key, dsa.DSAPublicKey):
+ # public_key_type = 'DSA'
+ # elif isinstance(public_key, ec.EllipticCurvePublicKey):
+ # public_key_type = 'EC'
+ # else:
+ # public_key_type = 'Unknown'
+ # # Calculate certificate length
+ # cert_length = len(cert.public_bytes(
+ # encoding=serialization.Encoding.DER))
+
+ # # Generate the Certificate table
+ # # cert_table = (f'| Property | Value |\n'
+ # # f'|---|---|\n'
+ # # f"| {'Version':<17} | {version_value:^25} |\n"
+ # # f"| {'Signature Alg.':<17} | {signature_alg_value:^25} |\n"
+ # # f"| {'Validity from':<17} | {not_before:^25} |\n"
+ # # f"| {'Valid to':<17} | {not_after:^25} |")
+
+ # # Generate the Subject table
+ # subj_table = ('| Distinguished Name | Value |\n'
+ # '|---|---|')
+ # for val in cert.subject.rdns:
+ # dn = val.rfc4514_string().split('=')
+ # subj_table += f'\n| {dn[0]} | {dn[1]}'
+
+ # # Generate the Issuer table
+ # iss_table = ('| Distinguished Name | Value |\n'
+ # '|---|---|')
+ # for val in cert.issuer.rdns:
+ # dn = val.rfc4514_string().split('=')
+ # iss_table += f'\n| {dn[0]} | {dn[1]}'
+ # if 'CN' in dn[0]:
+ # signed_by = dn[1]
+
+ # ext_table = None
+ # # if cert.extensions:
+ # # ext_table = ('| Extension | Value |\n'
+ # # '|---|---|')
+ # # for extension in cert.extensions:
+ # # for extension_value in extension.value:
+ # # ext_table += f'''\n| {extension.oid._name} |
+ # # {extension_value.value}''' # pylint: disable=W0212
+ # # cert_table = f'### Certificate\n{cert_table}'
+ # # cert_table += f'\n\n### Subject\n{subj_table}'
+ # # cert_table += f'\n\n### Issuer\n{iss_table}'
+ # # if ext_table is not None:
+ # # cert_table += f'\n\n### Extensions\n{ext_table}'
+ # # cert_tables.append(cert_table)
+
+ # summary_table += f'''
+ #
+ # | {not_after} |
+ # {cert_length} |
+ # {public_key_type} |
+ # {port} |
+ # {signed_by} |
+ #
+ # '''
+
+ # summary_table += '''
+ #
+ #
+ # '''
+
+ # html_content += summary_table
+
+ # else:
+ # html_content += ('''
+ #
+ #
+ # No TLS certificates found on the device
+ #
''')
+
+ # LOGGER.debug('Module report:\n' + html_content)
+
+ # # Use os.path.join to create the complete file path
+ # report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME)
+
+ # # Write the content to a file
+ # with open(report_path, 'w', encoding='utf-8') as file:
+ # file.write(html_content)
+
+ # LOGGER.info('Module report generated at: ' + str(report_path))
+ # return report_path
def extract_certificates_from_pcap(self, pcap_files, mac_address):
# Initialize a list to store packets
diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py
index ef8e74e64..14d6d15e8 100644
--- a/modules/test/tls/python/src/tls_util.py
+++ b/modules/test/tls/python/src/tls_util.py
@@ -550,13 +550,14 @@ def get_non_tls_client_connection_ips(self, client_ip, capture_files):
non_tls_dst_ips = set() # Store unique destination IPs
for packet in packets:
# Check if packet contains TCP layer
- if 'tcp' in packet['_source']['layers']:
- tcp_flags = packet['_source']['layers']['tcp.flags']
- if 'A' not in tcp_flags and 'S' not in tcp_flags:
- # Packet is not ACK or SYN
- dst_ip = ipaddress.ip_address(packet['_source']['layers']['ip.dst'][0])
- if not dst_ip in subnet_with_mask:
- non_tls_dst_ips.add(str(dst_ip))
+ if 'tcp' in packet['_source']['layers']:
+ tcp_flags = packet['_source']['layers']['tcp.flags']
+ if 'A' not in tcp_flags and 'S' not in tcp_flags:
+ # Packet is not ACK or SYN
+ dst_ip = ipaddress.ip_address(
+ packet['_source']['layers']['ip.dst'][0])
+ if not dst_ip in subnet_with_mask:
+ non_tls_dst_ips.add(str(dst_ip))
return non_tls_dst_ips
# Check if the device has made any outbound connections that don't
diff --git a/testing/pylint/test_pylint b/testing/pylint/test_pylint
index a330d54c3..9e9074aa7 100755
--- a/testing/pylint/test_pylint
+++ b/testing/pylint/test_pylint
@@ -14,14 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-ERROR_LIMIT=0
+ERROR_LIMIT=78
sudo cmd/install
source venv/bin/activate
sudo pip3 install pylint==3.0.3
-files=$(find ./framework -path ./venv -prune -o -name '*.py' -print)
+files=$(find ./ -path ./venv -prune -o -name '*.py' -print)
OUT=pylint.out
diff --git a/testing/tests/test_tests.py b/testing/tests/test_tests.py
index a14afb2cb..895b63ec0 100644
--- a/testing/tests/test_tests.py
+++ b/testing/tests/test_tests.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
""" Test assertions for CI testing of tests """
# Temporarily disabled because using Pytest fixtures
# TODO refactor fixtures to not trigger error
@@ -29,6 +28,7 @@
TEST_MATRIX = 'test_tests.json'
RESULTS_PATH = '/tmp/results/*.json'
+
#TODO add reason
@dataclass(frozen=True)
class TestResult:
@@ -73,19 +73,23 @@ def test_tests(results, test_matrix):
actual = set(collect_actual_results(results[tester]))
assert expected & actual == expected
-def test_list_tests(capsys, results, test_matrix):
- all_tests = set(itertools.chain.from_iterable(
- [collect_actual_results(results[x]) for x in results.keys()]))
- ci_pass = set([test
- for testers in test_matrix.values()
- for test, result in testers['expected_results'].items()
- if result == 'Compliant'])
-
- ci_fail = set([test
- for testers in test_matrix.values()
- for test, result in testers['expected_results'].items()
- if result == 'Non-Compliant'])
+def test_list_tests(capsys, results, test_matrix):
+ all_tests = set(
+ itertools.chain.from_iterable(
+ [collect_actual_results(results[x]) for x in results.keys()]))
+
+ ci_pass = set([
+ test for testers in test_matrix.values()
+ for test, result in testers['expected_results'].items()
+ if result == 'Compliant'
+ ])
+
+ ci_fail = set([
+ test for testers in test_matrix.values()
+ for test, result in testers['expected_results'].items()
+ if result == 'Non-Compliant'
+ ])
with capsys.disabled():
#TODO print matching the JSON schema for easy copy/paste
@@ -101,12 +105,15 @@ def test_list_tests(capsys, results, test_matrix):
for tester in test_matrix.keys():
print(f'\n{tester}:')
print(' expected results:')
- for test in collect_expected_results(test_matrix[tester]['expected_results']):
+ for test in collect_expected_results(
+ test_matrix[tester]['expected_results']):
print(f' {test.name}: {test.result}')
print(' actual results:')
for test in collect_actual_results(results[tester]):
if test.name in test_matrix[tester]['expected_results']:
- print(f' {test.name}: {test.result} (exp: {test_matrix[tester]["expected_results"][test.name]})')
+ print(
+ f' {test.name}: {test.result} (exp: {test_matrix[tester]["expected_results"][test.name]})'
+ )
else:
print(f' {test.name}: {test.result}')
diff --git a/testing/unit/dns/dns_module_test.py b/testing/unit/dns/dns_module_test.py
index 52ec80b4d..6c3dec74d 100644
--- a/testing/unit/dns/dns_module_test.py
+++ b/testing/unit/dns/dns_module_test.py
@@ -22,18 +22,19 @@
# Define the directories
TEST_FILES_DIR = 'testing/unit/' + MODULE
-OUTPUT_DIR = os.path.join(TEST_FILES_DIR,'output/')
-REPORTS_DIR = os.path.join(TEST_FILES_DIR,'reports/')
-CAPTURES_DIR = os.path.join(TEST_FILES_DIR,'captures/')
+OUTPUT_DIR = os.path.join(TEST_FILES_DIR, 'output/')
+REPORTS_DIR = os.path.join(TEST_FILES_DIR, 'reports/')
+CAPTURES_DIR = os.path.join(TEST_FILES_DIR, 'captures/')
-LOCAL_REPORT = os.path.join(REPORTS_DIR,'dns_report_local.html')
-LOCAL_REPORT_NO_DNS = os.path.join(REPORTS_DIR,'dns_report_local_no_dns.html')
+LOCAL_REPORT = os.path.join(REPORTS_DIR, 'dns_report_local.html')
+LOCAL_REPORT_NO_DNS = os.path.join(REPORTS_DIR, 'dns_report_local_no_dns.html')
CONF_FILE = 'modules/test/' + MODULE + '/conf/module_config.json'
# Define the capture files to be used for the test
-DNS_SERVER_CAPTURE_FILE = os.path.join(CAPTURES_DIR,'dns.pcap')
-STARTUP_CAPTURE_FILE = os.path.join(CAPTURES_DIR,'startup.pcap')
-MONITOR_CAPTURE_FILE = os.path.join(CAPTURES_DIR,'monitor.pcap')
+DNS_SERVER_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'dns.pcap')
+STARTUP_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'startup.pcap')
+MONITOR_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'monitor.pcap')
+
class TLSModuleTest(unittest.TestCase):
"""Contains and runs all the unit tests concerning DNS behaviors"""
@@ -49,9 +50,9 @@ def dns_module_report_test(self):
log_dir=OUTPUT_DIR,
conf_file=CONF_FILE,
results_dir=OUTPUT_DIR,
- DNS_SERVER_CAPTURE_FILE=DNS_SERVER_CAPTURE_FILE,
- STARTUP_CAPTURE_FILE=STARTUP_CAPTURE_FILE,
- MONITOR_CAPTURE_FILE=MONITOR_CAPTURE_FILE)
+ dns_server_capture_file=DNS_SERVER_CAPTURE_FILE,
+ startup_capture_file=STARTUP_CAPTURE_FILE,
+ monitor_capture_file=MONITOR_CAPTURE_FILE)
report_out_path = dns_module.generate_module_report()
@@ -61,7 +62,7 @@ def dns_module_report_test(self):
formatted_report = self.add_formatting(report_out)
# Write back the new formatted_report value
- out_report_path = os.path.join(OUTPUT_DIR,'dns_report_with_dns.html')
+ out_report_path = os.path.join(OUTPUT_DIR, 'dns_report_with_dns.html')
with open(out_report_path, 'w', encoding='utf-8') as file:
file.write(formatted_report)
@@ -105,9 +106,9 @@ def dns_module_report_no_dns_test(self):
log_dir=OUTPUT_DIR,
conf_file=CONF_FILE,
results_dir=OUTPUT_DIR,
- DNS_SERVER_CAPTURE_FILE=dns_server_cap_file,
- STARTUP_CAPTURE_FILE=startup_cap_file,
- MONITOR_CAPTURE_FILE=monitor_cap_file)
+ dns_server_capture_file=dns_server_cap_file,
+ startup_capture_file=startup_cap_file,
+ monitor_capture_file=monitor_cap_file)
report_out_path = dns_module.generate_module_report()
@@ -117,7 +118,7 @@ def dns_module_report_no_dns_test(self):
formatted_report = self.add_formatting(report_out)
# Write back the new formatted_report value
- out_report_path = os.path.join(OUTPUT_DIR,'dns_report_no_dns.html')
+ out_report_path = os.path.join(OUTPUT_DIR, 'dns_report_no_dns.html')
with open(out_report_path, 'w', encoding='utf-8') as file:
file.write(formatted_report)
@@ -127,8 +128,7 @@ def dns_module_report_no_dns_test(self):
self.assertEqual(report_out, report_local)
-
- def add_formatting(self,body):
+ def add_formatting(self, body):
return f'''
@@ -138,6 +138,7 @@ def add_formatting(self,body):