diff --git a/framework/python/src/common/testreport.py b/framework/python/src/common/testreport.py index 8c24fe240..c9bde77f7 100644 --- a/framework/python/src/common/testreport.py +++ b/framework/python/src/common/testreport.py @@ -239,6 +239,8 @@ def generate_module_pages(self, json_data, module_reports): content_size += 40 + header_padding elif '' in line: content_size += 39 elif '
  • ' in line: @@ -310,6 +312,7 @@ def generate_module_reports(self, json_data): content = content.replace('', '
    ') content = content.replace('

    ', '

    ') content = content.replace('

    ', '

    ') + content = content.replace('

    ', '

    ') content = self.generate_module_pages(json_data=json_data, module_reports=content) @@ -797,6 +800,16 @@ def generate_css(self): font-weight: bold; } + .markdown-header-h3{ + margin-left:20px; + margin-top:20px; + margin-bottom:24px; + margin-right:0px; + + font-size: 1.17em; + font-weight: bold; + } + .module-page-content{ /*Page height minus header(93px), footer(30px), and a 20px bottom padding.*/ diff --git a/modules/test/tls/python/requirements.txt b/modules/test/tls/python/requirements.txt index 432116ff2..719299488 100644 --- a/modules/test/tls/python/requirements.txt +++ b/modules/test/tls/python/requirements.txt @@ -1,2 +1,4 @@ cryptography -pyOpenSSL \ No newline at end of file +pyOpenSSL +pyshark +cryptography \ No newline at end of file diff --git a/modules/test/tls/python/src/run.py b/modules/test/tls/python/src/run.py index 51bc82f8f..2b7ea7e0f 100644 --- a/modules/test/tls/python/src/run.py +++ b/modules/test/tls/python/src/run.py @@ -37,6 +37,7 @@ def __init__(self, module): self._test_module = TLSModule(module) self._test_module.run_tests() + self._test_module.generate_module_report() def _handler(self, signum): LOGGER.debug('SigtermEnum: ' + str(signal.SIGTERM)) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index b00cccc8d..72a6b1c92 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -14,23 +14,194 @@ """Baseline test module""" from test_module import TestModule from tls_util import TLSUtil +import os +import pyshark +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec LOG_NAME = 'test_tls' -LOGGER = None +MODULE_REPORT_FILE_NAME = 'tls_report.md' STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap' MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap' -GATEWAY_CAPTURE_FILE = '/runtime/network/gateway.pcap' +TLS_CAPTURE_FILE = '/runtime/output/tls.pcap' + +LOGGER = None class TLSModule(TestModule): """An example testing module.""" - def __init__(self, module): - super().__init__(module_name=module, log_name=LOG_NAME) + def __init__(self, + module, + log_dir=None, + conf_file=None, + results_dir=None, + startup_capture_file=STARTUP_CAPTURE_FILE, + monitor_capture_file=MONITOR_CAPTURE_FILE, + tls_capture_file=TLS_CAPTURE_FILE): + super().__init__(module_name=module, + log_name=LOG_NAME, + log_dir=log_dir, + conf_file=conf_file, + results_dir=results_dir) + self.startup_capture_file = startup_capture_file + self.monitor_capture_file = monitor_capture_file + self.tls_capture_file = tls_capture_file global LOGGER LOGGER = self._get_logger() self._tls_util = TLSUtil(LOGGER) + def generate_module_report(self): + summary = '## Summary' + + summary_header = (f'''| {'#': ^5} ''' + f'''| {'Expiry': ^{25}} ''' + f'''| {'Length': ^{8}} ''' + f'''| {'Type': ^{6}} ''' + f'''| {'Port No.': ^{10}} ''' + f'''| {'Signed by': ^{11}} | ''') + summary_header_line = (f'''|{'-' * 7}''' + f'''|{'-' * 27}''' + f'''|{'-' * 10}''' + f'''|{'-' * 8}''' + f'''|{'-' * 12}''' + f'''|{'-' * 13}|''') + summary_table = f'{summary_header}\n{summary_header_line}' + + # List of capture files to scan + pcap_files = [ + self.startup_capture_file, self.monitor_capture_file, + self.tls_capture_file + ] + certificates = self.extract_certificates_from_pcap(pcap_files, + self._device_mac) + if len(certificates)>0: + cert_tables = [] + for cert_num, ((ip_address, port), cert) in enumerate(certificates.items()): + # Extract certificate data + not_valid_before = cert.not_valid_before + not_valid_after = cert.not_valid_after + version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})' + signature_alg_value = cert.signature_algorithm_oid._name # pylint: disable=W0212 + not_before = str(not_valid_before) + not_after = str(not_valid_after) + public_key = cert.public_key() + signed_by = 'None' + if isinstance(public_key, rsa.RSAPublicKey): + public_key_type = "RSA" + elif isinstance(public_key, dsa.DSAPublicKey): + public_key_type = "DSA" + elif isinstance(public_key, ec.EllipticCurvePublicKey): + public_key_type = "EC" + else: + public_key_type = "Unknown" + # Calculate certificate length + cert_length = len(cert.public_bytes(encoding=serialization.Encoding.DER)) + # Generate the Certificate table + cert_table = (f'| Property | Value |\n' + f'|---|---|\n' + f"| {'Version':<17} | {version_value:^25} |\n" + f"| {'Signature Alg.':<17} | {signature_alg_value:^25} |\n" + f"| {'Validity from':<17} | {not_before:^25} |\n" + f"| {'Valid to':<17} | {not_after:^25} |") + + # Generate the Subject table + subj_table = ('| Distinguished Name | Value |\n' + '|---|---|') + for val in cert.subject.rdns: + dn = val.rfc4514_string().split('=') + subj_table += f'\n| {dn[0]} | {dn[1]}' + + # Generate the Issuer table + iss_table = ('| Distinguished Name | Value |\n' + '|---|---|') + for val in cert.issuer.rdns: + dn = val.rfc4514_string().split('=') + iss_table += f'\n| {dn[0]} | {dn[1]}' + if 'CN' in dn[0]: + signed_by = dn[1] + + ext_table = None + if cert.extensions: + ext_table = ('| Extension | Value |\n' + '|---|---|') + for extension in cert.extensions: + for extension_value in extension.value: + ext_table += f'\n| {extension.oid._name} | {extension_value.value}' # pylint: disable=W0212 + cert_table = f'### Certificate\n{cert_table}' + cert_table += f'\n\n### Subject\n{subj_table}' + cert_table += f'\n\n### Issuer\n{iss_table}' + if ext_table is not None: + cert_table += f'\n\n### Extensions\n{ext_table}' + cert_tables.append(cert_table) + summary_table_row = (f'''| {cert_num+1: ^5} ''' + f'''| {not_after: ^25} ''' + f'''| {cert_length: ^8} ''' + f'''| {public_key_type: ^6} ''' + f'''| {port: ^10} ''' + f'''| {signed_by: ^11} |''') + summary_table+=f'\n{summary_table_row}' + + markdown_template = '# TLS Module\n' + '\n'.join( + '\n' + tables for tables in cert_tables) + + # summary = f'## Summary\n\n{summary_table}' + # markdown_template += f'\n\n{summary}' + else: + markdown_template = (f'''# TLS Module\n''' + f'''\n- No device certificates detected\n''') + + summary = f'## Summary\n\n{summary_table}' + markdown_template += f'\n\n{summary}' + LOGGER.debug('Markdown Report:\n' + markdown_template) + + # Use os.path.join to create the complete file path + report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME) + + # Write the content to a file + with open(report_path, 'w', encoding='utf-8') as file: + file.write(markdown_template) + + LOGGER.info('Module report generated at: ' + str(report_path)) + return report_path + + def extract_certificates_from_pcap(self, pcap_files, mac_address): + # Initialize a list to store packets + all_packets = [] + # Iterate over each file + for pcap_file in pcap_files: + # Open the capture file + packets = pyshark.FileCapture(pcap_file) + try: + # Iterate over each packet in the file and add it to the list + for packet in packets: + all_packets.append(packet) + finally: + # Close the capture file + packets.close() + + certificates = {} + # Loop through each item (packet) + for packet in all_packets: + if 'TLS' in packet: + # Check if the packet's source matches the target MAC address + if 'eth' in packet and (packet.eth.src == mac_address): + # Look for attribute of x509 + if hasattr(packet['TLS'], 'x509sat_utf8string'): + certificate_bytes = bytes.fromhex( + packet['TLS'].handshake_certificate.replace(':', '')) + # Parse the certificate bytes + certificate = x509.load_der_x509_certificate( + certificate_bytes, default_backend()) + # Extract IP address and port from packet + ip_address = packet.ip.src + port = packet.tcp.srcport if 'tcp' in packet else packet.udp.srcport + # Store certificate in dictionary with IP address and port as key + certificates[(ip_address, port)] = certificate + return certificates + def _security_tls_v1_2_server(self): LOGGER.info('Running security.tls.v1_2_server') self._resolve_device_ip() @@ -81,8 +252,9 @@ def _validate_tls_client(self, client_ip, tls_version): client_results = self._tls_util.validate_tls_client( client_ip=client_ip, tls_version=tls_version, - capture_files=[MONITOR_CAPTURE_FILE,STARTUP_CAPTURE_FILE, - GATEWAY_CAPTURE_FILE]) + capture_files=[ + MONITOR_CAPTURE_FILE, STARTUP_CAPTURE_FILE, GATEWAY_CAPTURE_FILE + ]) # Generate results based on the state result_message = 'No outbound connections were found.' diff --git a/testing/unit/report/report_test.py b/testing/unit/report/report_test.py index 4a46b81f7..b8bdcfba8 100644 --- a/testing/unit/report/report_test.py +++ b/testing/unit/report/report_test.py @@ -44,6 +44,7 @@ def report_test(self): # Load all module markdown reports reports_md = [] + reports_md.append(self.get_module_md_report('tls')) reports_md.append(self.get_module_md_report('dns')) reports_md.append(self.get_module_md_report('nmap')) reports_md.append(self.get_module_md_report('ntp')) diff --git a/testing/unit/tls/monitor.pcap b/testing/unit/tls/captures/monitor.pcap similarity index 100% rename from testing/unit/tls/monitor.pcap rename to testing/unit/tls/captures/monitor.pcap diff --git a/testing/unit/tls/no_tls.pcap b/testing/unit/tls/captures/no_tls.pcap similarity index 100% rename from testing/unit/tls/no_tls.pcap rename to testing/unit/tls/captures/no_tls.pcap diff --git a/testing/unit/tls/captures/tls.pcap b/testing/unit/tls/captures/tls.pcap new file mode 100644 index 000000000..3d49a4551 Binary files /dev/null and b/testing/unit/tls/captures/tls.pcap differ diff --git a/testing/unit/tls/captures/tls_ext.pcap b/testing/unit/tls/captures/tls_ext.pcap new file mode 100644 index 000000000..09d75837a Binary files /dev/null and b/testing/unit/tls/captures/tls_ext.pcap differ diff --git a/testing/unit/tls/unsupported_tls.pcap b/testing/unit/tls/captures/unsupported_tls.pcap similarity index 100% rename from testing/unit/tls/unsupported_tls.pcap rename to testing/unit/tls/captures/unsupported_tls.pcap diff --git a/testing/unit/tls/reports/tls_report_ext_local.md b/testing/unit/tls/reports/tls_report_ext_local.md new file mode 100644 index 000000000..878fa0743 --- /dev/null +++ b/testing/unit/tls/reports/tls_report_ext_local.md @@ -0,0 +1,33 @@ +# TLS Module + +### Certificate +| Property | Value | +|---|---| +| Version | 3 (0x2) | +| Signature Alg. | sha256WithRSAEncryption | +| Validity from | 2022-07-26 15:33:09 | +| Valid to | 2027-07-25 15:33:09 | + +### Subject +| Distinguished Name | Value | +|---|---| +| C | US +| CN | apc27D605.nam.gad.schneider-electric.com + +### Issuer +| Distinguished Name | Value | +|---|---| +| C | US +| O | IT Division +| CN | Sub CA + +### Extensions +| Extension | Value | +|---|---| +| subjectAltName | ap9643_qa1941270129.nam.gad.schneider-electric.com + +## Summary + +| # | Expiry | Length | Type | Port No. | Signed by | +|-------|---------------------------|----------|--------|------------|-------------| +| 1 | 2027-07-25 15:33:09 | 888 | EC | 443 | Sub CA | \ No newline at end of file diff --git a/testing/unit/tls/reports/tls_report_local.md b/testing/unit/tls/reports/tls_report_local.md new file mode 100644 index 000000000..dc3866dc6 --- /dev/null +++ b/testing/unit/tls/reports/tls_report_local.md @@ -0,0 +1,35 @@ +# TLS Module + +### Certificate +| Property | Value | +|---|---| +| Version | 1 (0x0) | +| Signature Alg. | sha256WithRSAEncryption | +| Validity from | 2022-09-21 19:57:57 | +| Valid to | 2027-09-21 19:57:57 | + +### Subject +| Distinguished Name | Value | +|---|---| +| C | US +| ST | California +| L | Concord +| O | BuildingsIoT +| OU | Software +| CN | EasyIO_FS-32 + +### Issuer +| Distinguished Name | Value | +|---|---| +| C | US +| ST | California +| L | Concord +| O | BuildingsIoT +| OU | Software +| CN | BuildingsIoT RSA Signing CA + +## Summary + +| # | Expiry | Length | Type | Port No. | Signed by | +|-------|---------------------------|----------|--------|------------|-------------| +| 1 | 2027-09-21 19:57:57 | 901 | RSA | 443 | BuildingsIoT RSA Signing CA | \ No newline at end of file diff --git a/testing/unit/tls/reports/tls_report_no_cert_local.md b/testing/unit/tls/reports/tls_report_no_cert_local.md new file mode 100644 index 000000000..6de5bb88a --- /dev/null +++ b/testing/unit/tls/reports/tls_report_no_cert_local.md @@ -0,0 +1,9 @@ +# TLS Module + +- No device certificates detected + + +## Summary + +| # | Expiry | Length | Type | Port No. | Signed by | +|-------|---------------------------|----------|--------|------------|-------------| \ No newline at end of file diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index baed9c395..86870e46a 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -13,10 +13,11 @@ # limitations under the License. """Module run all the TLS related unit tests""" from tls_util import TLSUtil +from tls_module import TLSModule +import os import unittest from common import logger from scapy.all import sniff, wrpcap -import os import threading import time import netifaces @@ -26,18 +27,24 @@ MODULE = 'tls' # Define the file paths TEST_FILES_DIR = 'testing/unit/' + MODULE -OUTPUT_DIR = TEST_FILES_DIR + '/output' +OUTPUT_DIR = os.path.join(TEST_FILES_DIR,'output/') +REPORTS_DIR = os.path.join(TEST_FILES_DIR,'reports/') +CAPTURES_DIR = os.path.join(TEST_FILES_DIR,'captures/') + +LOCAL_REPORT = os.path.join(REPORTS_DIR,'tls_report_local.md') +LOCAL_REPORT_EXT = os.path.join(REPORTS_DIR,'tls_report_ext_local.md') +LOCAL_REPORT_NO_CERT = os.path.join(REPORTS_DIR,'tls_report_no_cert_local.md') +CONF_FILE = 'modules/test/' + MODULE + '/conf/module_config.json' TLS_UTIL = None PACKET_CAPTURE = None - class TLSModuleTest(unittest.TestCase): """Contains and runs all the unit tests concerning TLS behaviors""" @classmethod def setUpClass(cls): - log = logger.get_logger('test_' + MODULE) + log = logger.get_logger('unit_test_' + MODULE) global TLS_UTIL TLS_UTIL = TLSUtil(log, bin_dir='modules/test/tls/bin', @@ -205,7 +212,7 @@ def security_tls_v1_2_client_cipher_fail_test(self): # generate a skip result def security_tls_client_skip_test(self): print('security_tls_client_skip_test') - capture_file = os.path.join(TEST_FILES_DIR, 'no_tls.pcap') + capture_file = os.path.join(CAPTURES_DIR, 'no_tls.pcap') # Run the client test test_results = TLS_UTIL.validate_tls_client(client_ip='172.27.253.167', @@ -267,7 +274,7 @@ def test_client_tls(self, def test_client_tls_with_non_tls_client(self): print('\ntest_client_tls_with_non_tls_client') - capture_file = os.path.join(TEST_FILES_DIR, 'monitor.pcap') + capture_file = os.path.join(CAPTURES_DIR, 'monitor.pcap') # Run the client test test_results = TLS_UTIL.validate_tls_client(client_ip='10.10.10.14', @@ -280,7 +287,7 @@ def test_client_tls_with_non_tls_client(self): # generate a fail result def security_tls_client_unsupported_tls_client(self): print('\nsecurity_tls_client_unsupported_tls_client') - capture_file = os.path.join(TEST_FILES_DIR, 'unsupported_tls.pcap') + capture_file = os.path.join(CAPTURES_DIR, 'unsupported_tls.pcap') # Run the client test test_results = TLS_UTIL.validate_tls_client(client_ip='172.27.253.167', @@ -289,6 +296,64 @@ def security_tls_client_unsupported_tls_client(self): print(str(test_results)) self.assertFalse(test_results[0]) + def tls_module_report_test(self): + print('\ntls_module_report_test') + os.environ['DEVICE_MAC'] = '38:d1:35:01:17:fe' + pcap_file = os.path.join(CAPTURES_DIR,'tls.pcap') + tls = TLSModule(module=MODULE, + log_dir=OUTPUT_DIR, + conf_file=CONF_FILE, + results_dir=OUTPUT_DIR, + startup_capture_file=pcap_file, + monitor_capture_file=pcap_file, + tls_capture_file=pcap_file) + report_out_path = tls.generate_module_report() + + with open(report_out_path, 'r', encoding='utf-8') as file: + report_out = file.read() + + # Read the local good report + with open(LOCAL_REPORT, 'r', encoding='utf-8') as file: + report_local = file.read() + + self.assertEqual(report_out, report_local) + + def tls_module_report_ext_test(self): + print('\ntls_module_report_ext_test') + os.environ['DEVICE_MAC'] = '28:29:86:27:d6:05' + pcap_file = os.path.join(CAPTURES_DIR,'tls_ext.pcap') + tls = TLSModule(module=MODULE, + log_dir=OUTPUT_DIR, + conf_file=CONF_FILE, + results_dir=OUTPUT_DIR, + startup_capture_file=pcap_file, + monitor_capture_file=pcap_file, + tls_capture_file=pcap_file) + report_out_path = tls.generate_module_report() + + def tls_module_report_no_cert_test(self): + print('\ntls_module_report_no_cert_test') + os.environ['DEVICE_MAC'] = '' + pcap_file = os.path.join(CAPTURES_DIR,'tls_ext.pcap') + tls = TLSModule(module=MODULE, + log_dir=OUTPUT_DIR, + conf_file=CONF_FILE, + results_dir=OUTPUT_DIR, + startup_capture_file=pcap_file, + monitor_capture_file=pcap_file, + tls_capture_file=pcap_file) + report_out_path = tls.generate_module_report() + + # Read the generated report + with open(report_out_path, 'r', encoding='utf-8') as file: + report_out = file.read() + + # Read the local good report + with open(LOCAL_REPORT_NO_CERT, 'r', encoding='utf-8') as file: + report_local = file.read() + + self.assertEqual(report_out, report_local) + def generate_tls_traffic(self, capture_file, tls_version, @@ -398,7 +463,13 @@ def get_interface_ip(self, interface_name): suite.addTest(TLSModuleTest('test_client_tls_with_non_tls_client')) suite.addTest(TLSModuleTest('security_tls_client_unsupported_tls_client')) + # Test the results options for tls server tests suite.addTest(TLSModuleTest('security_tls_server_results_test')) + # Test various report module outputs + suite.addTest(TLSModuleTest('tls_module_report_test')) + suite.addTest(TLSModuleTest('tls_module_report_ext_test')) + suite.addTest(TLSModuleTest('tls_module_report_no_cert_test')) + runner = unittest.TextTestRunner() runner.run(suite)