From 2b5892ecf27ff2d5e731c6599d621a83a2fe6e92 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Mon, 17 Jul 2023 11:02:56 -0600 Subject: [PATCH 01/17] initial add of security module and tls tests --- modules/test/security/bin/start_test_module | 56 +++++++ modules/test/security/conf/module_config.json | 27 ++++ modules/test/security/python/requirements.txt | 1 + modules/test/security/python/src/run.py | 69 ++++++++ .../security/python/src/security_module.py | 147 ++++++++++++++++++ .../python/src/security_module_test.py | 29 ++++ modules/test/security/python/src/tls_util.py | 138 ++++++++++++++++ modules/test/security/requirements.txt | 1 + modules/test/security/security.Dockerfile | 31 ++++ testing/unit_test/run_tests.sh | 8 +- 10 files changed, 505 insertions(+), 2 deletions(-) create mode 100644 modules/test/security/bin/start_test_module create mode 100644 modules/test/security/conf/module_config.json create mode 100644 modules/test/security/python/requirements.txt create mode 100644 modules/test/security/python/src/run.py create mode 100644 modules/test/security/python/src/security_module.py create mode 100644 modules/test/security/python/src/security_module_test.py create mode 100644 modules/test/security/python/src/tls_util.py create mode 100644 modules/test/security/requirements.txt create mode 100644 modules/test/security/security.Dockerfile diff --git a/modules/test/security/bin/start_test_module b/modules/test/security/bin/start_test_module new file mode 100644 index 000000000..a529c2fcf --- /dev/null +++ b/modules/test/security/bin/start_test_module @@ -0,0 +1,56 @@ +#!/bin/bash + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# An example startup script that does the bare minimum to start +# a test module via a pyhon script. Each test module should include a +# start_test_module file that overwrites this one to boot all of its +# specific requirements to run. + +# Define where the python source files are located +PYTHON_SRC_DIR=/testrun/python/src + +# Fetch module name +MODULE_NAME=$1 + +# Default interface should be veth0 for all containers +DEFAULT_IFACE=veth0 + +# Allow a user to define an interface by passing it into this script +DEFINED_IFACE=$2 + +# Select which interace to use +if [[ -z $DEFINED_IFACE || "$DEFINED_IFACE" == "null" ]] +then + echo "No interface defined, defaulting to veth0" + INTF=$DEFAULT_IFACE +else + INTF=$DEFINED_IFACE +fi + +# Create and set permissions on the log files +LOG_FILE=/runtime/output/$MODULE_NAME.log +RESULT_FILE=/runtime/output/$MODULE_NAME-result.json +touch $LOG_FILE +touch $RESULT_FILE +chown $HOST_USER $LOG_FILE +chown $HOST_USER $RESULT_FILE + +# Run the python scrip that will execute the tests for this module +# -u flag allows python print statements +# to be logged by docker by running unbuffered +python3 -u $PYTHON_SRC_DIR/run.py "-m $MODULE_NAME" + +echo Module has finished \ No newline at end of file diff --git a/modules/test/security/conf/module_config.json b/modules/test/security/conf/module_config.json new file mode 100644 index 000000000..d4598cc88 --- /dev/null +++ b/modules/test/security/conf/module_config.json @@ -0,0 +1,27 @@ +{ + "config": { + "meta": { + "name": "security", + "display_name": "Security", + "description": "Security tests" + }, + "network": true, + "docker": { + "depends_on": "base", + "enable_container": true, + "timeout": 60 + }, + "tests":[ + { + "name": "security.tls.v1_2_server", + "description": "Check the device web server TLS 1.2 & certificate is valid", + "expected_behavior": "TLS 1.2 certificate is issued to the web browser client when accessed" + }, + { + "name": "security.tls.v1_2_client", + "description": "Device uses TLS with connection to an external service on port 443 (or any other port which could be running the webserver-HTTPS)", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers" + } + ] + } +} \ No newline at end of file diff --git a/modules/test/security/python/requirements.txt b/modules/test/security/python/requirements.txt new file mode 100644 index 000000000..488c3aab3 --- /dev/null +++ b/modules/test/security/python/requirements.txt @@ -0,0 +1 @@ +cryptography \ No newline at end of file diff --git a/modules/test/security/python/src/run.py b/modules/test/security/python/src/run.py new file mode 100644 index 000000000..9caead1df --- /dev/null +++ b/modules/test/security/python/src/run.py @@ -0,0 +1,69 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Run Baseline module""" +import argparse +import signal +import sys +import logger + +from security_module import SecurityModule + +LOGGER = logger.get_logger('test_module') +RUNTIME = 1500 + + +class SecurityModuleRunner: + """An example runner class for test modules.""" + + def __init__(self, module): + + signal.signal(signal.SIGINT, self._handler) + signal.signal(signal.SIGTERM, self._handler) + signal.signal(signal.SIGABRT, self._handler) + signal.signal(signal.SIGQUIT, self._handler) + + LOGGER.info('Starting Baseline Module') + + self._test_module = SecurityModule(module) + self._test_module.run_tests() + + def _handler(self, signum): + LOGGER.debug('SigtermEnum: ' + str(signal.SIGTERM)) + LOGGER.debug('Exit signal received: ' + str(signum)) + if signum in (2, signal.SIGTERM): + LOGGER.info('Exit signal received. Stopping test module...') + LOGGER.info('Test module stopped') + sys.exit(1) + + +def run(): + parser = argparse.ArgumentParser( + description='Security Module Help', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument( + '-m', + '--module', + help='Define the module name to be used to create the log file') + + args = parser.parse_args() + + # For some reason passing in the args from bash adds an extra + # space before the argument so we'll just strip out extra space + SecurityModuleRunner(args.module.strip()) + + +if __name__ == '__main__': + run() diff --git a/modules/test/security/python/src/security_module.py b/modules/test/security/python/src/security_module.py new file mode 100644 index 000000000..2926d9f1c --- /dev/null +++ b/modules/test/security/python/src/security_module.py @@ -0,0 +1,147 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Baseline test module""" +from test_module import TestModule +# import ssl +# import socket +# from cryptography import x509 +# from cryptography.hazmat.backends import default_backend +# from datetime import datetime +from tls_util import TLSUtil + +LOG_NAME = 'test_security' +LOGGER = None + +class SecurityModule(TestModule): + """An example testing module.""" + + def __init__(self, module): + super().__init__(module_name=module, log_name=LOG_NAME) + global LOGGER + LOGGER = self._get_logger() + self._tls_util = TLSUtil(LOGGER) + + def _security_tls_v1_2_server(self): + LOGGER.info('Running security.tls.v1_2_server') + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is None: + self._device_ipv4_addr = self._get_device_ipv4(self) + + return self._tls_util.validate_tls_server(self._ipv4_addr,tls_version='1.2') + + # # If the ipv4 address wasn't resolved yet, try again + # if self._device_ipv4_addr is None: + # self._device_ipv4_addr = self._get_device_ipv4(self) + + # try: + # public_cert = self._get_public_certificate(self._device_ipv4_addr) + # LOGGER.info('Public Certificate: ' + str(public_cert)) + # except ConnectionRefusedError as e: + # LOGGER.info('Could not connect to device, skipping') + # return None, 'Could not connect to device, skipping' + + # public_key = self._get_public_key(public_cert) + # LOGGER.info('Public Key: ' + str(public_key)) + + # self._verify_certificate_timerange(public_cert) + # self._verify_public_key(public_key) + # return None, 'Test not yet implemented' + + def _security_tls_v1_2_client(self): + LOGGER.info('Running security.tls.v1_2_client') + return None, 'Test not yet implemented' + +# def _verify_certificate_timerange(public_cert): +# # Extract the notBefore and notAfter dates from the certificate +# not_before = cert.not_valid_before +# not_after = cert.not_valid_after + +# LOGGER.info('Certificate valid from: ' + str(not_before) + '-' + str(not_after)) + +# # Get the current date +# current_date = datetime.utcnow() + +# # Check if today's date is within the certificate's validity range +# if not_before <= current_date <= not_after: +# return True, 'Certificate has a valid time range' +# elif current_date <= not_before: +# return False, 'Certificate is not yet valid' +# else: +# return False, 'Certificate has expired' + +# def _verify_public_key(self,public_key): +# # Serialize the public key to get its size/length +# public_key_pem = public_key.public_bytes( +# encoding=serialization.Encoding.PEM, +# format=serialization.PublicFormat.SubjectPublicKeyInfo +# ) + +# # Calculate the key length based on the serialized public key +# key_length = len(public_key_pem) * 8 + +# # Check if the public key is of RSA type +# if isinstance(public_key, rsa.RSAPublicKey): +# if key_length >= 2048: +# return True, 'RSA key length passed: ' + key_length + '>= 2048' +# else: +# return False, 'RSA key length too short: ' + str(key_length) + '< 2048' + +# # Check if the public key is of EC type +# elif isinstance(public_key, ec.EllipticCurvePublicKey): +# if key_length >= 224: +# return True, 'EC key length passed: ' + key_length + '>= 224' +# else: +# return False, 'EC key length too short: ' + str(key_length) + '< 224' +# else: +# return False, "Key is not RSA or EC type" + +# def _get_public_key(self,cert_pem): +# # Parse the PEM certificate using cryptography +# cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend()) + +# # Extract and return the public key from the certificate +# public_key = cert.public_key() +# return public_key + +# def _get_public_certificate(self,host, port=443): +# # Disable certificate verification +# context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) +# context.check_hostname = False +# context.verify_mode = ssl.CERT_NONE + +# # Create an SSL/TLS socket +# with socket.create_connection((host, port)) as sock: +# with context.wrap_socket(sock, server_hostname=host) as secure_sock: +# # Get the server's certificate in PEM format +# cert_pem = secure_sock.getpeercert(binary_form=False) + +# # Parse the PEM certificate using cryptography +# cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend()) + +# return cert + + +# try: +# sec_mod = SecurityModule() +# public_cert = sec_mod._get_public_certificate('google.com') +# LOGGER.info('Public Certificate: ' + str(public_cert)) +# except ConnectionRefusedError as e: +# LOGGER.info('Could not connect to device, skipping') + +# public_key = sec_mod._get_public_key(public_cert) +# LOGGER.info('Public Key: ' + str(public_key)) + +# sec_mod._verify_certificate_timerange(public_cert) +# sec_mod._verify_public_key(public_key) \ No newline at end of file diff --git a/modules/test/security/python/src/security_module_test.py b/modules/test/security/python/src/security_module_test.py new file mode 100644 index 000000000..6c6b02487 --- /dev/null +++ b/modules/test/security/python/src/security_module_test.py @@ -0,0 +1,29 @@ +from tls_util import TLSUtil +import unittest +import common.logger as logger + +MODULE_NAME='security_module_test' +TLS_UTIL = None + +class SecurityModuleTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + log = logger.get_logger(MODULE_NAME) + global TLS_UTIL + TLS_UTIL = TLSUtil(log) + + def security_tls_v1_2_server_test(self): + test_results = TLS_UTIL.validate_tls_server('google.com',tls_version='1.2') + self.assertTrue(test_results[0]) + + def security_tls_v1_3_server_test(self): + test_results = TLS_UTIL.validate_tls_server('google.com',tls_version='1.3') + self.assertTrue(test_results[0]) + +if __name__ == '__main__': + suite = unittest.TestSuite() + suite.addTest(SecurityModuleTest('security_tls_v1_2_server_test')) + suite.addTest(SecurityModuleTest('security_tls_v1_3_server_test')) + runner = unittest.TextTestRunner() + runner.run(suite) \ No newline at end of file diff --git a/modules/test/security/python/src/tls_util.py b/modules/test/security/python/src/tls_util.py new file mode 100644 index 000000000..7ea33ef23 --- /dev/null +++ b/modules/test/security/python/src/tls_util.py @@ -0,0 +1,138 @@ + +import ssl +import socket +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from datetime import datetime +from OpenSSL import crypto + +LOG_NAME = 'tls_util' +LOGGER = None + +class TLSUtil(): + + """Helper class for various tests concerning TLS communications""" + + def __init__(self,logger): + global LOGGER + LOGGER = logger + + def get_public_certificate(self,host, port=443, tls_version='1.2'): + try: + # Disable certificate verification + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + # Set the correct TLS version + context.options |= ssl.PROTOCOL_TLS + context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 + context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 + if tls_version == '1.3': + context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 + elif tls_version == '1.2': + context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + + # Create an SSL/TLS socket + with socket.create_connection((host, port), timeout = 5) as sock: + with context.wrap_socket(sock, server_hostname=host) as secure_sock: + # Get the server's certificate in PEM format + cert_pem = ssl.DER_cert_to_PEM_cert(secure_sock.getpeercert(True)) + + if cert_pem: + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + except ConnectionRefusedError: + LOGGER.info(f"Connection to {host}:{port} was refused.") + return None + except socket.gaierror: + LOGGER.info(f"Failed to resolve the hostname '{host}'.") + return None + except ssl.SSLError as e: + LOGGER.info(f"SSL error occurred: {e}") + return None + + return cert + + def get_public_key(self,public_cert): + # Extract and return the public key from the certificate + public_key = public_cert.get_pubkey() + return public_key + + def verify_certificate_timerange(self,public_cert): + # Extract the notBefore and notAfter dates from the certificate + not_before = datetime.strptime(public_cert.get_notBefore().decode(), "%Y%m%d%H%M%SZ") + not_after = datetime.strptime(public_cert.get_notAfter().decode(), "%Y%m%d%H%M%SZ") + + LOGGER.info('Certificate valid from: ' + str(not_before) + ' To ' + str(not_after)) + + # Get the current date + current_date = datetime.utcnow() + + # Check if today's date is within the certificate's validity range + if not_before <= current_date <= not_after: + return True, 'Certificate has a valid time range' + elif current_date <= not_before: + return False, 'Certificate is not yet valid' + else: + return False, 'Certificate has expired' + + def verify_public_key(self,public_key): + + # Get the key length based bits + key_length = public_key.bits() + LOGGER.info('Key Length: ' + str(key_length)) + + # Check the key type + key_type = 'Unknown' + if public_key.type() == crypto.TYPE_RSA: + key_type = "RSA" + elif public_key.type() == crypto.TYPE_EC: + key_type = "EC" + elif public_key.type() == crypto.TYPE_DSA: + key_type = "DSA" + elif public_key.type() == crypto.TYPE_DH: + key_type = "Diffie-Hellman" + LOGGER.info("Key Type: " + key_type) + + # Check if the public key is of RSA type + if key_type == 'RSA': + if key_length >= 2048: + return True, 'RSA key length passed: ' + str(key_length) + ' >= 2048' + else: + return False, 'RSA key length too short: ' + str(key_length) + ' < 2048' + + # Check if the public key is of EC type + elif key_type == 'EC': + if key_length >= 224: + return True, 'EC key length passed: ' + str(key_length) + ' >= 224' + else: + return False, 'EC key length too short: ' + str(key_length) + ' < 224' + else: + return False, "Key is not RSA or EC type" + + def validate_signature(self,public_cert): + print('Validating signature: TODO') + + def validate_tls_server(self,host,tls_version,port=443): + public_cert = self.get_public_certificate(host,tls_version='1.2') + if public_cert: + # Print the certificate information + cert_text = crypto.dump_certificate(crypto.FILETYPE_TEXT, public_cert).decode() + LOGGER.info(cert_text) + + # Validate the certificates time range + tr_valid = self.verify_certificate_timerange(public_cert) + + # Resolve the public key + public_key = self.get_public_key(public_cert) + if public_key: + key_valid= self.verify_public_key(public_key) + + # Check results + cert_valid = tr_valid[0] and key_valid[0] + test_details= tr_valid[1] + '\n'+ key_valid[1] + LOGGER.info("Certificate validated: " + str(cert_valid)) + LOGGER.info("Test Details:\n" + test_details) + return cert_valid, test_details + else: + LOGGER.info("Failed to resolve public certificate") \ No newline at end of file diff --git a/modules/test/security/requirements.txt b/modules/test/security/requirements.txt new file mode 100644 index 000000000..488c3aab3 --- /dev/null +++ b/modules/test/security/requirements.txt @@ -0,0 +1 @@ +cryptography \ No newline at end of file diff --git a/modules/test/security/security.Dockerfile b/modules/test/security/security.Dockerfile new file mode 100644 index 000000000..9a38f719d --- /dev/null +++ b/modules/test/security/security.Dockerfile @@ -0,0 +1,31 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Image name: test-run/security-test +FROM test-run/base-test:latest + +ARG MODULE_NAME=security +ARG MODULE_DIR=modules/test/$MODULE_NAME + +# Copy over all configuration files +COPY $MODULE_DIR/conf /testrun/conf + +# Copy over all binary files +COPY $MODULE_DIR/bin /testrun/bin + +# Copy over all python files +COPY $MODULE_DIR/python /testrun/python + +#Install all python requirements for the module +RUN pip3 install -r /testrun/python/requirements.txt \ No newline at end of file diff --git a/testing/unit_test/run_tests.sh b/testing/unit_test/run_tests.sh index 5b1ed6257..71e78391d 100644 --- a/testing/unit_test/run_tests.sh +++ b/testing/unit_test/run_tests.sh @@ -12,7 +12,11 @@ echo "Root Dir: $PWD" export PYTHONPATH="$PWD/framework/python/src" # Run the DHCP Unit tests -python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py -python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py +# python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py +# python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py + +# Run the sSecurity Module Unit Tests +python3 -u $PWD/modules/test/security/python/src/security_module_test.py + popd >/dev/null 2>&1 \ No newline at end of file From 2a5f7ab12d55ab9ca319845fd52777766b72e8f4 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Mon, 17 Jul 2023 11:43:59 -0600 Subject: [PATCH 02/17] Fix server test and implement 1.3 version --- modules/test/security/python/requirements.txt | 3 +- .../security/python/src/security_module.py | 121 +++--------------- modules/test/security/requirements.txt | 1 - 3 files changed, 20 insertions(+), 105 deletions(-) delete mode 100644 modules/test/security/requirements.txt diff --git a/modules/test/security/python/requirements.txt b/modules/test/security/python/requirements.txt index 488c3aab3..432116ff2 100644 --- a/modules/test/security/python/requirements.txt +++ b/modules/test/security/python/requirements.txt @@ -1 +1,2 @@ -cryptography \ No newline at end of file +cryptography +pyOpenSSL \ No newline at end of file diff --git a/modules/test/security/python/src/security_module.py b/modules/test/security/python/src/security_module.py index 2926d9f1c..0678def8f 100644 --- a/modules/test/security/python/src/security_module.py +++ b/modules/test/security/python/src/security_module.py @@ -35,113 +35,28 @@ def __init__(self, module): def _security_tls_v1_2_server(self): LOGGER.info('Running security.tls.v1_2_server') + self._resolve_device_ip() # If the ipv4 address wasn't resolved yet, try again - if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4(self) - - return self._tls_util.validate_tls_server(self._ipv4_addr,tls_version='1.2') - - # # If the ipv4 address wasn't resolved yet, try again - # if self._device_ipv4_addr is None: - # self._device_ipv4_addr = self._get_device_ipv4(self) - - # try: - # public_cert = self._get_public_certificate(self._device_ipv4_addr) - # LOGGER.info('Public Certificate: ' + str(public_cert)) - # except ConnectionRefusedError as e: - # LOGGER.info('Could not connect to device, skipping') - # return None, 'Could not connect to device, skipping' - - # public_key = self._get_public_key(public_cert) - # LOGGER.info('Public Key: ' + str(public_key)) - - # self._verify_certificate_timerange(public_cert) - # self._verify_public_key(public_key) - # return None, 'Test not yet implemented' + if self._device_ipv4_addr is not None: + return self._tls_util.validate_tls_server(self._device_ipv4_addr,tls_version='1.2') + else: + LOGGER.error('Could not resolve device IP address. Skipping') + + def _security_tls_v1_3_server(self): + LOGGER.info('Running security.tls.v1_3_server') + self._resolve_device_ip() + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is not None: + return self._tls_util.validate_tls_server(self._device_ipv4_addr,tls_version='1.3') + else: + LOGGER.error('Could not resolve device IP address. Skipping') def _security_tls_v1_2_client(self): LOGGER.info('Running security.tls.v1_2_client') return None, 'Test not yet implemented' -# def _verify_certificate_timerange(public_cert): -# # Extract the notBefore and notAfter dates from the certificate -# not_before = cert.not_valid_before -# not_after = cert.not_valid_after - -# LOGGER.info('Certificate valid from: ' + str(not_before) + '-' + str(not_after)) - -# # Get the current date -# current_date = datetime.utcnow() - -# # Check if today's date is within the certificate's validity range -# if not_before <= current_date <= not_after: -# return True, 'Certificate has a valid time range' -# elif current_date <= not_before: -# return False, 'Certificate is not yet valid' -# else: -# return False, 'Certificate has expired' - -# def _verify_public_key(self,public_key): -# # Serialize the public key to get its size/length -# public_key_pem = public_key.public_bytes( -# encoding=serialization.Encoding.PEM, -# format=serialization.PublicFormat.SubjectPublicKeyInfo -# ) - -# # Calculate the key length based on the serialized public key -# key_length = len(public_key_pem) * 8 - -# # Check if the public key is of RSA type -# if isinstance(public_key, rsa.RSAPublicKey): -# if key_length >= 2048: -# return True, 'RSA key length passed: ' + key_length + '>= 2048' -# else: -# return False, 'RSA key length too short: ' + str(key_length) + '< 2048' -# # Check if the public key is of EC type -# elif isinstance(public_key, ec.EllipticCurvePublicKey): -# if key_length >= 224: -# return True, 'EC key length passed: ' + key_length + '>= 224' -# else: -# return False, 'EC key length too short: ' + str(key_length) + '< 224' -# else: -# return False, "Key is not RSA or EC type" - -# def _get_public_key(self,cert_pem): -# # Parse the PEM certificate using cryptography -# cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend()) - -# # Extract and return the public key from the certificate -# public_key = cert.public_key() -# return public_key - -# def _get_public_certificate(self,host, port=443): -# # Disable certificate verification -# context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) -# context.check_hostname = False -# context.verify_mode = ssl.CERT_NONE - -# # Create an SSL/TLS socket -# with socket.create_connection((host, port)) as sock: -# with context.wrap_socket(sock, server_hostname=host) as secure_sock: -# # Get the server's certificate in PEM format -# cert_pem = secure_sock.getpeercert(binary_form=False) - -# # Parse the PEM certificate using cryptography -# cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend()) - -# return cert - - -# try: -# sec_mod = SecurityModule() -# public_cert = sec_mod._get_public_certificate('google.com') -# LOGGER.info('Public Certificate: ' + str(public_cert)) -# except ConnectionRefusedError as e: -# LOGGER.info('Could not connect to device, skipping') - -# public_key = sec_mod._get_public_key(public_cert) -# LOGGER.info('Public Key: ' + str(public_key)) - -# sec_mod._verify_certificate_timerange(public_cert) -# sec_mod._verify_public_key(public_key) \ No newline at end of file + def _resolve_device_ip(self): + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is None: + self._device_ipv4_addr = self._get_device_ipv4(self) \ No newline at end of file diff --git a/modules/test/security/requirements.txt b/modules/test/security/requirements.txt deleted file mode 100644 index 488c3aab3..000000000 --- a/modules/test/security/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -cryptography \ No newline at end of file From 47d07a49926d8cd6a5035b45ae2534d317261317 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Mon, 17 Jul 2023 11:51:09 -0600 Subject: [PATCH 03/17] pylinting --- modules/test/security/python/src/run.py | 1 - .../security/python/src/security_module.py | 11 +- .../python/src/security_module_test.py | 28 +- modules/test/security/python/src/tls_util.py | 257 +++++++++--------- 4 files changed, 151 insertions(+), 146 deletions(-) diff --git a/modules/test/security/python/src/run.py b/modules/test/security/python/src/run.py index 9caead1df..74966e0cb 100644 --- a/modules/test/security/python/src/run.py +++ b/modules/test/security/python/src/run.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Run Baseline module""" import argparse import signal diff --git a/modules/test/security/python/src/security_module.py b/modules/test/security/python/src/security_module.py index 0678def8f..f0158b0f1 100644 --- a/modules/test/security/python/src/security_module.py +++ b/modules/test/security/python/src/security_module.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Baseline test module""" from test_module import TestModule # import ssl @@ -24,6 +23,7 @@ LOG_NAME = 'test_security' LOGGER = None + class SecurityModule(TestModule): """An example testing module.""" @@ -38,7 +38,8 @@ def _security_tls_v1_2_server(self): self._resolve_device_ip() # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is not None: - return self._tls_util.validate_tls_server(self._device_ipv4_addr,tls_version='1.2') + return self._tls_util.validate_tls_server(self._device_ipv4_addr, + tls_version='1.2') else: LOGGER.error('Could not resolve device IP address. Skipping') @@ -47,7 +48,8 @@ def _security_tls_v1_3_server(self): self._resolve_device_ip() # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is not None: - return self._tls_util.validate_tls_server(self._device_ipv4_addr,tls_version='1.3') + return self._tls_util.validate_tls_server(self._device_ipv4_addr, + tls_version='1.3') else: LOGGER.error('Could not resolve device IP address. Skipping') @@ -55,8 +57,7 @@ def _security_tls_v1_2_client(self): LOGGER.info('Running security.tls.v1_2_client') return None, 'Test not yet implemented' - def _resolve_device_ip(self): # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4(self) \ No newline at end of file + self._device_ipv4_addr = self._get_device_ipv4(self) diff --git a/modules/test/security/python/src/security_module_test.py b/modules/test/security/python/src/security_module_test.py index 6c6b02487..68299728d 100644 --- a/modules/test/security/python/src/security_module_test.py +++ b/modules/test/security/python/src/security_module_test.py @@ -2,28 +2,30 @@ import unittest import common.logger as logger -MODULE_NAME='security_module_test' +MODULE_NAME = 'security_module_test' TLS_UTIL = None + class SecurityModuleTest(unittest.TestCase): - @classmethod - def setUpClass(cls): - log = logger.get_logger(MODULE_NAME) - global TLS_UTIL - TLS_UTIL = TLSUtil(log) + @classmethod + def setUpClass(cls): + log = logger.get_logger(MODULE_NAME) + global TLS_UTIL + TLS_UTIL = TLSUtil(log) + + def security_tls_v1_2_server_test(self): + test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') + self.assertTrue(test_results[0]) - def security_tls_v1_2_server_test(self): - test_results = TLS_UTIL.validate_tls_server('google.com',tls_version='1.2') - self.assertTrue(test_results[0]) + def security_tls_v1_3_server_test(self): + test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') + self.assertTrue(test_results[0]) - def security_tls_v1_3_server_test(self): - test_results = TLS_UTIL.validate_tls_server('google.com',tls_version='1.3') - self.assertTrue(test_results[0]) if __name__ == '__main__': suite = unittest.TestSuite() suite.addTest(SecurityModuleTest('security_tls_v1_2_server_test')) suite.addTest(SecurityModuleTest('security_tls_v1_3_server_test')) runner = unittest.TextTestRunner() - runner.run(suite) \ No newline at end of file + runner.run(suite) diff --git a/modules/test/security/python/src/tls_util.py b/modules/test/security/python/src/tls_util.py index 7ea33ef23..b725553fd 100644 --- a/modules/test/security/python/src/tls_util.py +++ b/modules/test/security/python/src/tls_util.py @@ -1,4 +1,3 @@ - import ssl import socket from cryptography import x509 @@ -9,130 +8,134 @@ LOG_NAME = 'tls_util' LOGGER = None -class TLSUtil(): - """Helper class for various tests concerning TLS communications""" - - def __init__(self,logger): - global LOGGER - LOGGER = logger - - def get_public_certificate(self,host, port=443, tls_version='1.2'): - try: - # Disable certificate verification - context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - - # Set the correct TLS version - context.options |= ssl.PROTOCOL_TLS - context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 - context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 - if tls_version == '1.3': - context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 - elif tls_version == '1.2': - context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 - - # Create an SSL/TLS socket - with socket.create_connection((host, port), timeout = 5) as sock: - with context.wrap_socket(sock, server_hostname=host) as secure_sock: - # Get the server's certificate in PEM format - cert_pem = ssl.DER_cert_to_PEM_cert(secure_sock.getpeercert(True)) - - if cert_pem: - cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) - except ConnectionRefusedError: - LOGGER.info(f"Connection to {host}:{port} was refused.") - return None - except socket.gaierror: - LOGGER.info(f"Failed to resolve the hostname '{host}'.") - return None - except ssl.SSLError as e: - LOGGER.info(f"SSL error occurred: {e}") - return None - - return cert - - def get_public_key(self,public_cert): - # Extract and return the public key from the certificate - public_key = public_cert.get_pubkey() - return public_key - - def verify_certificate_timerange(self,public_cert): - # Extract the notBefore and notAfter dates from the certificate - not_before = datetime.strptime(public_cert.get_notBefore().decode(), "%Y%m%d%H%M%SZ") - not_after = datetime.strptime(public_cert.get_notAfter().decode(), "%Y%m%d%H%M%SZ") - - LOGGER.info('Certificate valid from: ' + str(not_before) + ' To ' + str(not_after)) - - # Get the current date - current_date = datetime.utcnow() - - # Check if today's date is within the certificate's validity range - if not_before <= current_date <= not_after: - return True, 'Certificate has a valid time range' - elif current_date <= not_before: - return False, 'Certificate is not yet valid' - else: - return False, 'Certificate has expired' - - def verify_public_key(self,public_key): - - # Get the key length based bits - key_length = public_key.bits() - LOGGER.info('Key Length: ' + str(key_length)) - - # Check the key type - key_type = 'Unknown' - if public_key.type() == crypto.TYPE_RSA: - key_type = "RSA" - elif public_key.type() == crypto.TYPE_EC: - key_type = "EC" - elif public_key.type() == crypto.TYPE_DSA: - key_type = "DSA" - elif public_key.type() == crypto.TYPE_DH: - key_type = "Diffie-Hellman" - LOGGER.info("Key Type: " + key_type) - - # Check if the public key is of RSA type - if key_type == 'RSA': - if key_length >= 2048: - return True, 'RSA key length passed: ' + str(key_length) + ' >= 2048' - else: - return False, 'RSA key length too short: ' + str(key_length) + ' < 2048' - - # Check if the public key is of EC type - elif key_type == 'EC': - if key_length >= 224: - return True, 'EC key length passed: ' + str(key_length) + ' >= 224' - else: - return False, 'EC key length too short: ' + str(key_length) + ' < 224' - else: - return False, "Key is not RSA or EC type" - - def validate_signature(self,public_cert): - print('Validating signature: TODO') - - def validate_tls_server(self,host,tls_version,port=443): - public_cert = self.get_public_certificate(host,tls_version='1.2') - if public_cert: - # Print the certificate information - cert_text = crypto.dump_certificate(crypto.FILETYPE_TEXT, public_cert).decode() - LOGGER.info(cert_text) - - # Validate the certificates time range - tr_valid = self.verify_certificate_timerange(public_cert) - - # Resolve the public key - public_key = self.get_public_key(public_cert) - if public_key: - key_valid= self.verify_public_key(public_key) - - # Check results - cert_valid = tr_valid[0] and key_valid[0] - test_details= tr_valid[1] + '\n'+ key_valid[1] - LOGGER.info("Certificate validated: " + str(cert_valid)) - LOGGER.info("Test Details:\n" + test_details) - return cert_valid, test_details - else: - LOGGER.info("Failed to resolve public certificate") \ No newline at end of file +class TLSUtil(): + """Helper class for various tests concerning TLS communications""" + + def __init__(self, logger): + global LOGGER + LOGGER = logger + + def get_public_certificate(self, host, port=443, tls_version='1.2'): + try: + # Disable certificate verification + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + # Set the correct TLS version + context.options |= ssl.PROTOCOL_TLS + context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 + context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 + if tls_version == '1.3': + context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 + elif tls_version == '1.2': + context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + + # Create an SSL/TLS socket + with socket.create_connection((host, port), timeout=5) as sock: + with context.wrap_socket(sock, server_hostname=host) as secure_sock: + # Get the server's certificate in PEM format + cert_pem = ssl.DER_cert_to_PEM_cert(secure_sock.getpeercert(True)) + + if cert_pem: + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + except ConnectionRefusedError: + LOGGER.info(f'Connection to {host}:{port} was refused.') + return None + except socket.gaierror: + LOGGER.info(f'Failed to resolve the hostname '{host}'.') + return None + except ssl.SSLError as e: + LOGGER.info(f'SSL error occurred: {e}') + return None + + return cert + + def get_public_key(self, public_cert): + # Extract and return the public key from the certificate + public_key = public_cert.get_pubkey() + return public_key + + def verify_certificate_timerange(self, public_cert): + # Extract the notBefore and notAfter dates from the certificate + not_before = datetime.strptime(public_cert.get_notBefore().decode(), + '%Y%m%d%H%M%SZ') + not_after = datetime.strptime(public_cert.get_notAfter().decode(), + '%Y%m%d%H%M%SZ') + + LOGGER.info('Certificate valid from: ' + str(not_before) + ' To ' + + str(not_after)) + + # Get the current date + current_date = datetime.utcnow() + + # Check if today's date is within the certificate's validity range + if not_before <= current_date <= not_after: + return True, 'Certificate has a valid time range' + elif current_date <= not_before: + return False, 'Certificate is not yet valid' + else: + return False, 'Certificate has expired' + + def verify_public_key(self, public_key): + + # Get the key length based bits + key_length = public_key.bits() + LOGGER.info('Key Length: ' + str(key_length)) + + # Check the key type + key_type = 'Unknown' + if public_key.type() == crypto.TYPE_RSA: + key_type = 'RSA' + elif public_key.type() == crypto.TYPE_EC: + key_type = 'EC' + elif public_key.type() == crypto.TYPE_DSA: + key_type = 'DSA' + elif public_key.type() == crypto.TYPE_DH: + key_type = 'Diffie-Hellman' + LOGGER.info('Key Type: ' + key_type) + + # Check if the public key is of RSA type + if key_type == 'RSA': + if key_length >= 2048: + return True, 'RSA key length passed: ' + str(key_length) + ' >= 2048' + else: + return False, 'RSA key length too short: ' + str(key_length) + ' < 2048' + + # Check if the public key is of EC type + elif key_type == 'EC': + if key_length >= 224: + return True, 'EC key length passed: ' + str(key_length) + ' >= 224' + else: + return False, 'EC key length too short: ' + str(key_length) + ' < 224' + else: + return False, "Key is not RSA or EC type" + + def validate_signature(self, public_cert): + print('Validating signature: TODO') + + def validate_tls_server(self, host, tls_version, port=443): + public_cert = self.get_public_certificate(host, tls_version='1.2') + if public_cert: + # Print the certificate information + cert_text = crypto.dump_certificate(crypto.FILETYPE_TEXT, + public_cert).decode() + LOGGER.info(cert_text) + + # Validate the certificates time range + tr_valid = self.verify_certificate_timerange(public_cert) + + # Resolve the public key + public_key = self.get_public_key(public_cert) + if public_key: + key_valid = self.verify_public_key(public_key) + + # Check results + cert_valid = tr_valid[0] and key_valid[0] + test_details = tr_valid[1] + '\n' + key_valid[1] + LOGGER.info('Certificate validated: ' + str(cert_valid)) + LOGGER.info('Test Details:\n' + test_details) + return cert_valid, test_details + else: + LOGGER.info('Failed to resolve public certificate') From 49e36a77fd8ca6981978302c5f33b947119b0886 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Thu, 20 Jul 2023 16:52:56 -0600 Subject: [PATCH 04/17] More work on client tests --- .gitignore | 3 +- modules/test/security/bin/get_ciphers.sh | 10 ++ .../security/bin/get_client_hello_packets.sh | 19 ++++ .../security/bin/get_handshake_complete.sh | 19 ++++ .../security/python/src/security_module.py | 5 - .../python/src/security_module_test.py | 92 +++++++++++++++++- modules/test/security/python/src/tls_util.py | 94 ++++++++++++++++++- 7 files changed, 231 insertions(+), 11 deletions(-) create mode 100644 modules/test/security/bin/get_ciphers.sh create mode 100644 modules/test/security/bin/get_client_hello_packets.sh create mode 100644 modules/test/security/bin/get_handshake_complete.sh diff --git a/.gitignore b/.gitignore index e168ec07a..b25d58c35 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ venv/ error pylint.out __pycache__/ -build/ \ No newline at end of file +build/ +testing/unit_test/temp/ \ No newline at end of file diff --git a/modules/test/security/bin/get_ciphers.sh b/modules/test/security/bin/get_ciphers.sh new file mode 100644 index 000000000..e82bbc180 --- /dev/null +++ b/modules/test/security/bin/get_ciphers.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +CAPTURE_FILE=$1 +DST_IP=$2 +DST_PORT=$3 + +TSHARK_FILTER="ssl.handshake.ciphersuites and ip.dst==$DST_IP and tcp.dstport==$DST_PORT" +response=$(tshark -r $CAPTURE_FILE -Y "$TSHARK_FILTER" -Vx | grep 'Cipher Suite:' | awk '{$1=$1};1' | sed 's/Cipher Suite: //') + +echo "$response" diff --git a/modules/test/security/bin/get_client_hello_packets.sh b/modules/test/security/bin/get_client_hello_packets.sh new file mode 100644 index 000000000..13e42f791 --- /dev/null +++ b/modules/test/security/bin/get_client_hello_packets.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +CAPTURE_FILE=$1 +SRC_IP=$2 +TLS_VERSION=$3 + +TSHARK_OUTPUT="-T json -e ip.src -e tcp.dstport -e ip.dst" +TSHARK_FILTER="ssl.handshake.type==1 and ip.src==$SRC_IP" + +if [[ $TLS_VERSION == '1.2' || -z $TLS_VERSION ]];then + TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.version==0x0303" +elif [ $TLS_VERSION == '1.2' ];then + TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.version==0x0304" +fi + +response=$(tshark -r $CAPTURE_FILE $TSHARK_OUTPUT $TSHARK_FILTER) + +echo "$response" + \ No newline at end of file diff --git a/modules/test/security/bin/get_handshake_complete.sh b/modules/test/security/bin/get_handshake_complete.sh new file mode 100644 index 000000000..de1eb887d --- /dev/null +++ b/modules/test/security/bin/get_handshake_complete.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +CAPTURE_FILE=$1 +SRC_IP=$2 +DST_IP=$3 +TLS_VERSION=$4 + +TSHARK_FILTER="ip.src==$SRC_IP and ip.dst==$DST_IP " + +if [[ $TLS_VERSION == '1.2' || -z $TLS_VERSION ]];then + TSHARK_FILTER=$TSHARK_FILTER " and ssl.handshake.type==2 and tls.handshake.type==14 " +elif [ $TLS_VERSION == '1.2' ];then + TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.type==2 and tls.handshake.extensions.supported_version==0x0304" +fi + +response=$(tshark -r $CAPTURE_FILE $TSHARK_FILTER) + +echo "$response" + \ No newline at end of file diff --git a/modules/test/security/python/src/security_module.py b/modules/test/security/python/src/security_module.py index f0158b0f1..2aa4a4543 100644 --- a/modules/test/security/python/src/security_module.py +++ b/modules/test/security/python/src/security_module.py @@ -13,11 +13,6 @@ # limitations under the License. """Baseline test module""" from test_module import TestModule -# import ssl -# import socket -# from cryptography import x509 -# from cryptography.hazmat.backends import default_backend -# from datetime import datetime from tls_util import TLSUtil LOG_NAME = 'test_security' diff --git a/modules/test/security/python/src/security_module_test.py b/modules/test/security/python/src/security_module_test.py index 68299728d..fb35e4160 100644 --- a/modules/test/security/python/src/security_module_test.py +++ b/modules/test/security/python/src/security_module_test.py @@ -1,9 +1,19 @@ from tls_util import TLSUtil import unittest import common.logger as logger +from scapy.all import sniff, wrpcap +import os +import threading +import time +import requests +import netifaces +import ssl +import http.client +CAPTURE_DIR = 'testing/unit_test/temp' MODULE_NAME = 'security_module_test' TLS_UTIL = None +PACKET_CAPTURE = None class SecurityModuleTest(unittest.TestCase): @@ -12,7 +22,7 @@ class SecurityModuleTest(unittest.TestCase): def setUpClass(cls): log = logger.get_logger(MODULE_NAME) global TLS_UTIL - TLS_UTIL = TLSUtil(log) + TLS_UTIL = TLSUtil(log,bin_dir="modules/test/security/bin") def security_tls_v1_2_server_test(self): test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') @@ -22,10 +32,90 @@ def security_tls_v1_3_server_test(self): test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') self.assertTrue(test_results[0]) + def security_tls_v1_2_client_test(self): + test_results = self.test_client_tls('1.2') + self.assertTrue(test_results[0]) + + def security_tls_v1_3_client_test(self): + test_results = self.test_client_tls('1.3') + self.assertTrue(test_results[0]) + + def test_client_tls(self,tls_version): + # Make the capture file + os.makedirs(CAPTURE_DIR, exist_ok=True) + capture_file = CAPTURE_DIR + '/client_tls.pcap' + + # Resolve the client ip used + client_ip = self.get_interface_ip('eth0') + + # Genrate TLS outbound traffic + self.generate_tls_traffic(capture_file, tls_version) + + # Run the client test + return TLS_UTIL.validate_tls_client(client_ip=client_ip,tls_version=tls_version,capture_file=capture_file) + + + def generate_tls_traffic(self, capture_file, tls_version): + capture_thread = self.start_capture_thread(capture_file,10) + print('Capture Started') + + # Generate some TLS 1.2 outbound traffic + while(capture_thread.is_alive()): + self.make_tls_connection("www.google.com",443,tls_version) + time.sleep(1) + + # Save the captured packets to the file. + wrpcap(capture_file, PACKET_CAPTURE) + + def make_tls_connection(self, hostname, port, tls_version): + # Create the SSL context with the desired TLS version and options + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + context.options |= ssl.PROTOCOL_TLS + context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 + context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 + + if tls_version == '1.3': + context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 + elif tls_version == '1.2': + context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + + # Create the HTTPS connection with the SSL context + connection = http.client.HTTPSConnection(hostname, port, context=context) + + # Perform the TLS handshake manually + connection.connect() + + # At this point, the TLS handshake is complete. + # You can do any further processing or just close the connection. + connection.close() + + def start_capture(self,timeout): + global PACKET_CAPTURE + PACKET_CAPTURE = sniff(iface='eth0', timeout=timeout) + + def start_capture_thread(self, capture_file,timeout): + # Start the packet capture in a separate thread to avoid blocking. + capture_thread = threading.Thread(target=self.start_capture, args=(timeout,)) + capture_thread.start() + + return capture_thread + + def get_interface_ip(self,interface_name): + try: + addresses = netifaces.ifaddresses(interface_name) + ipv4 = addresses[netifaces.AF_INET][0]['addr'] + return ipv4 + except (ValueError, KeyError) as e: + print(f"Error: {e}") + return None if __name__ == '__main__': suite = unittest.TestSuite() suite.addTest(SecurityModuleTest('security_tls_v1_2_server_test')) suite.addTest(SecurityModuleTest('security_tls_v1_3_server_test')) + suite.addTest(SecurityModuleTest('security_tls_v1_2_client_test')) + suite.addTest(SecurityModuleTest('security_tls_v1_3_client_test')) runner = unittest.TextTestRunner() runner.run(suite) diff --git a/modules/test/security/python/src/tls_util.py b/modules/test/security/python/src/tls_util.py index b725553fd..2dd70e500 100644 --- a/modules/test/security/python/src/tls_util.py +++ b/modules/test/security/python/src/tls_util.py @@ -1,20 +1,24 @@ import ssl import socket -from cryptography import x509 -from cryptography.hazmat.backends import default_backend from datetime import datetime from OpenSSL import crypto +from scapy.all import rdpcap, IP, TCP +import json LOG_NAME = 'tls_util' LOGGER = None +DEFAULT_BIN_DIR = '/testrun/bin' + +import common.util as util class TLSUtil(): """Helper class for various tests concerning TLS communications""" - def __init__(self, logger): + def __init__(self, logger, bin_dir=DEFAULT_BIN_DIR): global LOGGER LOGGER = logger + self._bin_dir = bin_dir def get_public_certificate(self, host, port=443, tls_version='1.2'): try: @@ -44,7 +48,7 @@ def get_public_certificate(self, host, port=443, tls_version='1.2'): LOGGER.info(f'Connection to {host}:{port} was refused.') return None except socket.gaierror: - LOGGER.info(f'Failed to resolve the hostname '{host}'.') + LOGGER.info(f'Failed to resolve the hostname {host}.') return None except ssl.SSLError as e: LOGGER.info(f'SSL error occurred: {e}') @@ -139,3 +143,85 @@ def validate_tls_server(self, host, tls_version, port=443): return cert_valid, test_details else: LOGGER.info('Failed to resolve public certificate') + + def get_ciphers(self, capture_file, dst_ip, dst_port): + bin_file = self._bin_dir + "/get_ciphers.sh" + args = (f'{capture_file} {dst_ip} {dst_port}') + command = f'{bin_file} {args}' + response = util.run_command(command) + ciphers = response[0].split("\n") + return ciphers + + def get_hello_packets(self, capture_file, src_ip, tls_version): + bin_file = self._bin_dir + "/get_client_hello_packets.sh" + args = (f'{capture_file} {src_ip} {tls_version}') + command = f'{bin_file} {args}' + response = util.run_command(command) + packets = response[0] + return self.parse_hello_packets(json.loads(packets), capture_file) + + def get_handshake_complete(self, capture_file, src_ip, dst_ip, tls_version): + bin_file = self._bin_dir + "/get_handshake_complete.sh" + args = (f'{capture_file} {src_ip} {dst_ip} {tls_version}') + command = f'{bin_file} {args}' + response = util.run_command(command) + return response + + def parse_hello_packets(self, packets, capture_file): + hello_packets = [] + for packet in packets: + # Extract all the basic IP information about the packet + dst_ip = packet['_source']['layers']['ip.dst'][0] + src_ip = packet['_source']['layers']['ip.src'][0] + dst_port = packet['_source']['layers']['tcp.dstport'][0] + + # Resolve the ciphers used in this packet and validate expected ones exist + ciphers = self.get_ciphers(capture_file, dst_ip, dst_port) + cipher_support = self.is_ecdh_and_ecdsa(ciphers) + + # Put result together + hello_packet = {} + hello_packet['dst_ip'] = packet['_source']['layers']['ip.dst'][0] + hello_packet['src_ip'] = packet['_source']['layers']['ip.src'][0] + hello_packet['dst_port'] = packet['_source']['layers']['tcp.dstport'][0] + hello_packet['cipher_support'] = cipher_support + + hello_packets.append(hello_packet) + return hello_packets + + def validate_tls_client(self, client_ip, tls_version, capture_file): + hello_packets = self.get_hello_packets(capture_file, client_ip, tls_version) + # Validate the ciphers only for tls 1.2 + if tls_version == '1.2': + for packet in hello_packets: + LOGGER.info("Hello Packet: " + str(packet)) + if not packet['cipher_support']['ecdh'] or not packet['cipher_support'][ + 'ecdsa']: + hello_packets.remove(packet) + + handshakes = {"complete":[],"incomplete":[]} + for packet in hello_packets: + # Filter out already tested IP's since only 1 handshake success is needed + if not packet['dst_ip'] in handshakes['complete'] and not packet['dst_ip'] in handshakes['incomplete']: + handshake_complete = self.get_handshake_complete( + capture_file, packet['src_ip'], packet['dst_ip'], tls_version) + + # One of the responses will be a complaint about running as root so + # we have to have at least 2 entries to consider a completed handshake + if len(handshake_complete) > 1: + LOGGER.info("Handshake completed from: " + packet['dst_ip']) + handshakes['complete'].append(packet['dst_ip']) + else: + handshakes['incomplete'].append(packet['dst_ip']) + + for handshake in handshakes['complete']: + LOGGER.info("Valid Client: " + str(handshake)) + return len(handshakes['complete']) > 0, 'Test not yet implemented' + + def is_ecdh_and_ecdsa(self, ciphers): + ecdh = False + ecdsa = False + for cipher in ciphers: + ecdh |= 'ECDH' in cipher + ecdsa |= 'ECDSA' in cipher + return {'ecdh': ecdh, 'ecdsa': ecdsa} From 7ca8cb30eb4c67091abc9df5368e4a09a8d5334a Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 21 Jul 2023 07:02:29 -0600 Subject: [PATCH 05/17] Add client tls tests Add unit tets Add common python code to base test module --- modules/test/base/base.Dockerfile | 4 + modules/test/base/python/requirements.txt | 3 +- modules/test/conn/python/requirements.txt | 2 +- .../security/python/src/security_module.py | 40 ++++++- .../python/src/security_module_test.py | 102 +++++++++++++----- modules/test/security/python/src/tls_util.py | 71 +++++++++--- modules/test/security/security.Dockerfile | 6 ++ 7 files changed, 187 insertions(+), 41 deletions(-) diff --git a/modules/test/base/base.Dockerfile b/modules/test/base/base.Dockerfile index 10344cbc7..08af7a259 100644 --- a/modules/test/base/base.Dockerfile +++ b/modules/test/base/base.Dockerfile @@ -17,10 +17,14 @@ FROM ubuntu:jammy ARG MODULE_NAME=base ARG MODULE_DIR=modules/test/$MODULE_NAME +ARG COMMON_DIR=framework/python/src/common # Install common software RUN apt-get update && apt-get install -y net-tools iputils-ping tcpdump iproute2 jq python3 python3-pip dos2unix nmap --fix-missing +# Install common python modules +COPY $COMMON_DIR/ /testrun/python/src/common + # Setup the base python requirements COPY $MODULE_DIR/python /testrun/python diff --git a/modules/test/base/python/requirements.txt b/modules/test/base/python/requirements.txt index 9c4e2b056..9d9473d74 100644 --- a/modules/test/base/python/requirements.txt +++ b/modules/test/base/python/requirements.txt @@ -1,2 +1,3 @@ grpcio -grpcio-tools \ No newline at end of file +grpcio-tools +netifaces \ No newline at end of file diff --git a/modules/test/conn/python/requirements.txt b/modules/test/conn/python/requirements.txt index 93b351f44..2b8d18750 100644 --- a/modules/test/conn/python/requirements.txt +++ b/modules/test/conn/python/requirements.txt @@ -1 +1 @@ -scapy \ No newline at end of file +pyOpenSSL \ No newline at end of file diff --git a/modules/test/security/python/src/security_module.py b/modules/test/security/python/src/security_module.py index 2aa4a4543..45951829a 100644 --- a/modules/test/security/python/src/security_module.py +++ b/modules/test/security/python/src/security_module.py @@ -17,6 +17,8 @@ LOG_NAME = 'test_security' LOGGER = None +STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap' +MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap' class SecurityModule(TestModule): @@ -37,6 +39,7 @@ def _security_tls_v1_2_server(self): tls_version='1.2') else: LOGGER.error('Could not resolve device IP address. Skipping') + return None, 'Could not resolve device IP address. Skipping' def _security_tls_v1_3_server(self): LOGGER.info('Running security.tls.v1_3_server') @@ -47,10 +50,45 @@ def _security_tls_v1_3_server(self): tls_version='1.3') else: LOGGER.error('Could not resolve device IP address. Skipping') + return None, 'Could not resolve device IP address. Skipping' def _security_tls_v1_2_client(self): LOGGER.info('Running security.tls.v1_2_client') - return None, 'Test not yet implemented' + self._resolve_device_ip() + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is not None: + return self._validate_tls_client(self._device_ipv4_addr, '1.2') + else: + LOGGER.error('Could not resolve device IP address. Skipping') + return None, 'Could not resolve device IP address. Skipping' + + def _security_tls_v1_3_client(self): + LOGGER.info('Running security.tls.v1_3_client') + self._resolve_device_ip() + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is not None: + return self._validate_tls_client(self._device_ipv4_addr, '1.3') + else: + LOGGER.error('Could not resolve device IP address. Skipping') + return None, 'Could not resolve device IP address. Skipping' + + def _validate_tls_client(self, client_ip, tls_version): + monitor_result = self._tls_util.validate_tls_client(client_ip=client_ip, + tls_version=tls_version, + capture_file=MONITOR_CAPTURE_FILE) + startup_result = self._tls_util.validate_tls_client(client_ip=client_ip, + tls_version=tls_version, + capture_file=STARTUP_CAPTURE_FILE) + if not monitor_result[0] or not startup_result[0]: + result = False, startup_result[1] + monitor_result[1] + elif monitor_result[0] and startup_result[0]: + result = True, startup_result[1] + monitor_result[1] + elif monitor_result[0]: + result = True, monitor_result[1] + elif startup_result[0]: + result = True, monitor_result[1] + else: + result = True, startup_result[1] + monitor_result[1] def _resolve_device_ip(self): # If the ipv4 address wasn't resolved yet, try again diff --git a/modules/test/security/python/src/security_module_test.py b/modules/test/security/python/src/security_module_test.py index fb35e4160..cea7b2591 100644 --- a/modules/test/security/python/src/security_module_test.py +++ b/modules/test/security/python/src/security_module_test.py @@ -22,7 +22,7 @@ class SecurityModuleTest(unittest.TestCase): def setUpClass(cls): log = logger.get_logger(MODULE_NAME) global TLS_UTIL - TLS_UTIL = TLSUtil(log,bin_dir="modules/test/security/bin") + TLS_UTIL = TLSUtil(log, bin_dir="modules/test/security/bin") def security_tls_v1_2_server_test(self): test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') @@ -34,13 +34,30 @@ def security_tls_v1_3_server_test(self): def security_tls_v1_2_client_test(self): test_results = self.test_client_tls('1.2') + print(str(test_results)) self.assertTrue(test_results[0]) + def security_tls_v1_2_client_cipher_fail_test(self): + test_results = self.test_client_tls('1.2', disable_valid_ciphers=True) + print(str(test_results)) + self.assertFalse(test_results[0]) + + def security_tls_client_skip_test(self): + # 1.1 will fail to connect and so no hello client will exist + # which should result in a skip result + test_results = self.test_client_tls('1.2', tls_generate='1.1') + print(str(test_results)) + self.assertIsNone(test_results[0]) + def security_tls_v1_3_client_test(self): test_results = self.test_client_tls('1.3') + print(str(test_results)) self.assertTrue(test_results[0]) - def test_client_tls(self,tls_version): + def test_client_tls(self, + tls_version, + tls_generate=None, + disable_valid_ciphers=False): # Make the capture file os.makedirs(CAPTURE_DIR, exist_ok=True) capture_file = CAPTURE_DIR + '/client_tls.pcap' @@ -49,67 +66,100 @@ def test_client_tls(self,tls_version): client_ip = self.get_interface_ip('eth0') # Genrate TLS outbound traffic - self.generate_tls_traffic(capture_file, tls_version) + if tls_generate is None: + tls_generate = tls_version + self.generate_tls_traffic(capture_file, tls_generate, disable_valid_ciphers) # Run the client test - return TLS_UTIL.validate_tls_client(client_ip=client_ip,tls_version=tls_version,capture_file=capture_file) - - - def generate_tls_traffic(self, capture_file, tls_version): - capture_thread = self.start_capture_thread(capture_file,10) + return TLS_UTIL.validate_tls_client(client_ip=client_ip, + tls_version=tls_version, + capture_file=capture_file) + + def generate_tls_traffic(self, + capture_file, + tls_version, + disable_valid_ciphers=False): + capture_thread = self.start_capture_thread(capture_file, 10) print('Capture Started') # Generate some TLS 1.2 outbound traffic - while(capture_thread.is_alive()): - self.make_tls_connection("www.google.com",443,tls_version) + while (capture_thread.is_alive()): + self.make_tls_connection("www.google.com", 443, tls_version, + disable_valid_ciphers) time.sleep(1) # Save the captured packets to the file. wrpcap(capture_file, PACKET_CAPTURE) - def make_tls_connection(self, hostname, port, tls_version): + def make_tls_connection(self, + hostname, + port, + tls_version, + disable_valid_ciphers=False): # Create the SSL context with the desired TLS version and options context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.check_hostname = False context.verify_mode = ssl.CERT_NONE context.options |= ssl.PROTOCOL_TLS - context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 - context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 + + if disable_valid_ciphers: + # Create a list of ciphers that do not use ECDH or ECDSA + ciphers_str = [ + "TLS_AES_256_GCM_SHA384", "TLS_CHACHA20_POLY1305_SHA256", + "TLS_AES_128_GCM_SHA256", "AES256-GCM-SHA384", + "PSK-AES256-GCM-SHA384", "PSK-CHACHA20-POLY1305", + "RSA-PSK-AES128-GCM-SHA256", "DHE-PSK-AES128-GCM-SHA256", + "AES128-GCM-SHA256", "PSK-AES128-GCM-SHA256", "AES256-SHA256", + "AES128-SHA" + ] + context.set_ciphers(':'.join(ciphers_str)) + + if tls_version != '1.1': + context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 + context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 + else: + context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 + context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 if tls_version == '1.3': - context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 + context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 elif tls_version == '1.2': - context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 # Create the HTTPS connection with the SSL context connection = http.client.HTTPSConnection(hostname, port, context=context) # Perform the TLS handshake manually - connection.connect() + try: + connection.connect() + except ssl.SSLError as e: + print('Failed to make connection: ' + str(e)) # At this point, the TLS handshake is complete. # You can do any further processing or just close the connection. connection.close() - def start_capture(self,timeout): + def start_capture(self, timeout): global PACKET_CAPTURE PACKET_CAPTURE = sniff(iface='eth0', timeout=timeout) - def start_capture_thread(self, capture_file,timeout): + def start_capture_thread(self, capture_file, timeout): # Start the packet capture in a separate thread to avoid blocking. - capture_thread = threading.Thread(target=self.start_capture, args=(timeout,)) + capture_thread = threading.Thread(target=self.start_capture, + args=(timeout, )) capture_thread.start() return capture_thread - def get_interface_ip(self,interface_name): + def get_interface_ip(self, interface_name): try: - addresses = netifaces.ifaddresses(interface_name) - ipv4 = addresses[netifaces.AF_INET][0]['addr'] - return ipv4 + addresses = netifaces.ifaddresses(interface_name) + ipv4 = addresses[netifaces.AF_INET][0]['addr'] + return ipv4 except (ValueError, KeyError) as e: - print(f"Error: {e}") - return None + print(f"Error: {e}") + return None + if __name__ == '__main__': suite = unittest.TestSuite() @@ -117,5 +167,7 @@ def get_interface_ip(self,interface_name): suite.addTest(SecurityModuleTest('security_tls_v1_3_server_test')) suite.addTest(SecurityModuleTest('security_tls_v1_2_client_test')) suite.addTest(SecurityModuleTest('security_tls_v1_3_client_test')) + suite.addTest(SecurityModuleTest('security_tls_client_skip_test')) + suite.addTest(SecurityModuleTest('security_tls_v1_2_client_cipher_fail_test')) runner = unittest.TextTestRunner() runner.run(suite) diff --git a/modules/test/security/python/src/tls_util.py b/modules/test/security/python/src/tls_util.py index 2dd70e500..4b29c64e3 100644 --- a/modules/test/security/python/src/tls_util.py +++ b/modules/test/security/python/src/tls_util.py @@ -2,7 +2,6 @@ import socket from datetime import datetime from OpenSSL import crypto -from scapy.all import rdpcap, IP, TCP import json LOG_NAME = 'tls_util' @@ -11,7 +10,6 @@ import common.util as util - class TLSUtil(): """Helper class for various tests concerning TLS communications""" @@ -157,7 +155,7 @@ def get_hello_packets(self, capture_file, src_ip, tls_version): args = (f'{capture_file} {src_ip} {tls_version}') command = f'{bin_file} {args}' response = util.run_command(command) - packets = response[0] + packets = response[0].strip() return self.parse_hello_packets(json.loads(packets), capture_file) def get_handshake_complete(self, capture_file, src_ip, dst_ip, tls_version): @@ -190,33 +188,80 @@ def parse_hello_packets(self, packets, capture_file): return hello_packets def validate_tls_client(self, client_ip, tls_version, capture_file): + LOGGER.info("Validating client for TLS: " + tls_version) hello_packets = self.get_hello_packets(capture_file, client_ip, tls_version) + # Validate the ciphers only for tls 1.2 + client_hello_results = {"valid": [], "invalid": []} if tls_version == '1.2': for packet in hello_packets: - LOGGER.info("Hello Packet: " + str(packet)) - if not packet['cipher_support']['ecdh'] or not packet['cipher_support'][ - 'ecdsa']: - hello_packets.remove(packet) + if packet['dst_ip'] not in str(client_hello_results['valid']): + LOGGER.info('Checking client ciphers: ' + str(packet)) + if packet['cipher_support']['ecdh'] and packet['cipher_support'][ + 'ecdsa']: + LOGGER.info('Valid ciphers detected') + client_hello_results['valid'].append(packet) + # If a previous hello packet to the same destination failed, + # we can now remove it as it has passed on a different attempt + if packet['dst_ip'] in str(client_hello_results['invalid']): + client_hello_results['invalid'].remove(packet) + else: + LOGGER.info("Invalid ciphers detected") + if packet['dst_ip'] not in str(client_hello_results['invalid']): + client_hello_results['invalid'].append(packet) + else: + # No cipher check for TLS 1.3 + client_hello_results['valid'] = hello_packets - handshakes = {"complete":[],"incomplete":[]} - for packet in hello_packets: + handshakes = {'complete': [], 'incomplete': []} + for packet in client_hello_results['valid']: # Filter out already tested IP's since only 1 handshake success is needed - if not packet['dst_ip'] in handshakes['complete'] and not packet['dst_ip'] in handshakes['incomplete']: + if not packet['dst_ip'] in handshakes['complete'] and not packet[ + 'dst_ip'] in handshakes['incomplete']: handshake_complete = self.get_handshake_complete( capture_file, packet['src_ip'], packet['dst_ip'], tls_version) # One of the responses will be a complaint about running as root so # we have to have at least 2 entries to consider a completed handshake if len(handshake_complete) > 1: - LOGGER.info("Handshake completed from: " + packet['dst_ip']) + LOGGER.info('TLS handshake completed from: ' + packet['dst_ip']) handshakes['complete'].append(packet['dst_ip']) else: + LOGGER.warning('No TLS handshakes completed from: ' + + packet['dst_ip']) handshakes['incomplete'].append(packet['dst_ip']) for handshake in handshakes['complete']: - LOGGER.info("Valid Client: " + str(handshake)) - return len(handshakes['complete']) > 0, 'Test not yet implemented' + LOGGER.info('Valid TLS client connection to server: ' + str(handshake)) + + # Process and return the results + tls_client_details = '' + tls_client_valid = None + if len(hello_packets) > 0: + if len(client_hello_results['invalid']) > 0: + tls_client_valid = False + for result in client_hello_results['invalid']: + tls_client_details += 'Client hello packet to ' + result[ + 'dst_ip'] + ' did not have expected ciphers:' + if not result['cipher_support']['ecdh']: + tls_client_details += ' ecdh ' + if not result['cipher_support']['ecdsa']: + tls_client_details += 'ecdsa' + tls_client_details += '\n' + if len(handshakes['incomplete']) > 0: + for result in handshakes['incomplete']: + tls_client_details += 'Incomplete handshake detected from server: ' + result + '\n' + if len(handshakes['complete']) > 0: + # If we haven't already failed the test from previous checks + # allow a passing result + if not tls_client_valid: + tls_client_valid = True + for result in handshakes['complete']: + tls_client_details += 'Completed handshake detected from server: ' + result + '\n' + else: + LOGGER.info('No client hello packets detected. Skipping') + tls_client_details = 'No client hello packets detected. Skipping' + return tls_client_valid, tls_client_details def is_ecdh_and_ecdsa(self, ciphers): ecdh = False diff --git a/modules/test/security/security.Dockerfile b/modules/test/security/security.Dockerfile index 9a38f719d..847c1f76e 100644 --- a/modules/test/security/security.Dockerfile +++ b/modules/test/security/security.Dockerfile @@ -15,6 +15,12 @@ # Image name: test-run/security-test FROM test-run/base-test:latest +# Set DEBIAN_FRONTEND to noninteractive mode +ENV DEBIAN_FRONTEND=noninteractive + +# Install required software +RUN apt-get update && apt-get install -y tshark + ARG MODULE_NAME=security ARG MODULE_DIR=modules/test/$MODULE_NAME From 57e080d074f4f5fff9baea51c10ea72b6dd4124d Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 21 Jul 2023 07:03:38 -0600 Subject: [PATCH 06/17] re-enable dhcp unit tests disabled during dev --- testing/unit_test/run_tests.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/unit_test/run_tests.sh b/testing/unit_test/run_tests.sh index 71e78391d..ff816df57 100644 --- a/testing/unit_test/run_tests.sh +++ b/testing/unit_test/run_tests.sh @@ -12,10 +12,10 @@ echo "Root Dir: $PWD" export PYTHONPATH="$PWD/framework/python/src" # Run the DHCP Unit tests -# python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py -# python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py +python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py +python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py -# Run the sSecurity Module Unit Tests +# Run the Security Module Unit Tests python3 -u $PWD/modules/test/security/python/src/security_module_test.py From ec7f8f278f47259032ad0763cff1084deb9dc821 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 21 Jul 2023 07:16:23 -0600 Subject: [PATCH 07/17] rename module to tls --- modules/test/{security => tls}/bin/get_ciphers.sh | 0 .../{security => tls}/bin/get_client_hello_packets.sh | 0 .../test/{security => tls}/bin/get_handshake_complete.sh | 0 modules/test/{security => tls}/bin/start_test_module | 0 modules/test/{security => tls}/conf/module_config.json | 6 +++--- modules/test/{security => tls}/python/requirements.txt | 0 modules/test/{security => tls}/python/src/run.py | 8 ++++---- .../security_module.py => tls/python/src/tls_module.py} | 4 ++-- .../python/src/tls_module_test.py} | 5 ++--- modules/test/{security => tls}/python/src/tls_util.py | 0 .../{security/security.Dockerfile => tls/tls.Dockerfile} | 2 +- 11 files changed, 12 insertions(+), 13 deletions(-) rename modules/test/{security => tls}/bin/get_ciphers.sh (100%) rename modules/test/{security => tls}/bin/get_client_hello_packets.sh (100%) rename modules/test/{security => tls}/bin/get_handshake_complete.sh (100%) rename modules/test/{security => tls}/bin/start_test_module (100%) rename modules/test/{security => tls}/conf/module_config.json (86%) rename modules/test/{security => tls}/python/requirements.txt (100%) rename modules/test/{security => tls}/python/src/run.py (89%) rename modules/test/{security/python/src/security_module.py => tls/python/src/tls_module.py} (96%) rename modules/test/{security/python/src/security_module_test.py => tls/python/src/tls_module_test.py} (95%) rename modules/test/{security => tls}/python/src/tls_util.py (100%) rename modules/test/{security/security.Dockerfile => tls/tls.Dockerfile} (93%) diff --git a/modules/test/security/bin/get_ciphers.sh b/modules/test/tls/bin/get_ciphers.sh similarity index 100% rename from modules/test/security/bin/get_ciphers.sh rename to modules/test/tls/bin/get_ciphers.sh diff --git a/modules/test/security/bin/get_client_hello_packets.sh b/modules/test/tls/bin/get_client_hello_packets.sh similarity index 100% rename from modules/test/security/bin/get_client_hello_packets.sh rename to modules/test/tls/bin/get_client_hello_packets.sh diff --git a/modules/test/security/bin/get_handshake_complete.sh b/modules/test/tls/bin/get_handshake_complete.sh similarity index 100% rename from modules/test/security/bin/get_handshake_complete.sh rename to modules/test/tls/bin/get_handshake_complete.sh diff --git a/modules/test/security/bin/start_test_module b/modules/test/tls/bin/start_test_module similarity index 100% rename from modules/test/security/bin/start_test_module rename to modules/test/tls/bin/start_test_module diff --git a/modules/test/security/conf/module_config.json b/modules/test/tls/conf/module_config.json similarity index 86% rename from modules/test/security/conf/module_config.json rename to modules/test/tls/conf/module_config.json index d4598cc88..5515696fc 100644 --- a/modules/test/security/conf/module_config.json +++ b/modules/test/tls/conf/module_config.json @@ -1,9 +1,9 @@ { "config": { "meta": { - "name": "security", - "display_name": "Security", - "description": "Security tests" + "name": "tls", + "display_name": "TLS", + "description": "TLS tests" }, "network": true, "docker": { diff --git a/modules/test/security/python/requirements.txt b/modules/test/tls/python/requirements.txt similarity index 100% rename from modules/test/security/python/requirements.txt rename to modules/test/tls/python/requirements.txt diff --git a/modules/test/security/python/src/run.py b/modules/test/tls/python/src/run.py similarity index 89% rename from modules/test/security/python/src/run.py rename to modules/test/tls/python/src/run.py index 74966e0cb..04a295d72 100644 --- a/modules/test/security/python/src/run.py +++ b/modules/test/tls/python/src/run.py @@ -17,13 +17,13 @@ import sys import logger -from security_module import SecurityModule +from tls_module import TLSModule LOGGER = logger.get_logger('test_module') RUNTIME = 1500 -class SecurityModuleRunner: +class TLSModuleRunner: """An example runner class for test modules.""" def __init__(self, module): @@ -33,7 +33,7 @@ def __init__(self, module): signal.signal(signal.SIGABRT, self._handler) signal.signal(signal.SIGQUIT, self._handler) - LOGGER.info('Starting Baseline Module') + LOGGER.info('Starting TLS Module') self._test_module = SecurityModule(module) self._test_module.run_tests() @@ -61,7 +61,7 @@ def run(): # For some reason passing in the args from bash adds an extra # space before the argument so we'll just strip out extra space - SecurityModuleRunner(args.module.strip()) + TLSModuleRunner(args.module.strip()) if __name__ == '__main__': diff --git a/modules/test/security/python/src/security_module.py b/modules/test/tls/python/src/tls_module.py similarity index 96% rename from modules/test/security/python/src/security_module.py rename to modules/test/tls/python/src/tls_module.py index 45951829a..abb2c5650 100644 --- a/modules/test/security/python/src/security_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -15,13 +15,13 @@ from test_module import TestModule from tls_util import TLSUtil -LOG_NAME = 'test_security' +LOG_NAME = 'test_tls' LOGGER = None STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap' MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap' -class SecurityModule(TestModule): +class TLSModule(TestModule): """An example testing module.""" def __init__(self, module): diff --git a/modules/test/security/python/src/security_module_test.py b/modules/test/tls/python/src/tls_module_test.py similarity index 95% rename from modules/test/security/python/src/security_module_test.py rename to modules/test/tls/python/src/tls_module_test.py index cea7b2591..f8cb08313 100644 --- a/modules/test/security/python/src/security_module_test.py +++ b/modules/test/tls/python/src/tls_module_test.py @@ -11,12 +11,11 @@ import http.client CAPTURE_DIR = 'testing/unit_test/temp' -MODULE_NAME = 'security_module_test' +MODULE_NAME = 'tls_module_test' TLS_UTIL = None PACKET_CAPTURE = None - -class SecurityModuleTest(unittest.TestCase): +class TLSModuleTest(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/modules/test/security/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py similarity index 100% rename from modules/test/security/python/src/tls_util.py rename to modules/test/tls/python/src/tls_util.py diff --git a/modules/test/security/security.Dockerfile b/modules/test/tls/tls.Dockerfile similarity index 93% rename from modules/test/security/security.Dockerfile rename to modules/test/tls/tls.Dockerfile index 847c1f76e..685aba7f5 100644 --- a/modules/test/security/security.Dockerfile +++ b/modules/test/tls/tls.Dockerfile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Image name: test-run/security-test +# Image name: test-run/tls-test FROM test-run/base-test:latest # Set DEBIAN_FRONTEND to noninteractive mode From d3c6556bbe0fdef880ef21335d88f7db7d813263 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 21 Jul 2023 07:26:23 -0600 Subject: [PATCH 08/17] fix renaming --- modules/test/tls/python/src/run.py | 2 +- modules/test/tls/tls.Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/test/tls/python/src/run.py b/modules/test/tls/python/src/run.py index 04a295d72..51bc82f8f 100644 --- a/modules/test/tls/python/src/run.py +++ b/modules/test/tls/python/src/run.py @@ -35,7 +35,7 @@ def __init__(self, module): LOGGER.info('Starting TLS Module') - self._test_module = SecurityModule(module) + self._test_module = TLSModule(module) self._test_module.run_tests() def _handler(self, signum): diff --git a/modules/test/tls/tls.Dockerfile b/modules/test/tls/tls.Dockerfile index 685aba7f5..8139f4584 100644 --- a/modules/test/tls/tls.Dockerfile +++ b/modules/test/tls/tls.Dockerfile @@ -21,7 +21,7 @@ ENV DEBIAN_FRONTEND=noninteractive # Install required software RUN apt-get update && apt-get install -y tshark -ARG MODULE_NAME=security +ARG MODULE_NAME=tls ARG MODULE_DIR=modules/test/$MODULE_NAME # Copy over all configuration files From b0b18b037af8c3bae4a7050902a274afef65579b Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 21 Jul 2023 07:33:54 -0600 Subject: [PATCH 09/17] Fix unit tests broken by module rename Add TLS 1.3 tests to config --- testing/unit_test/run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/unit_test/run_tests.sh b/testing/unit_test/run_tests.sh index ff816df57..5fa1179b1 100644 --- a/testing/unit_test/run_tests.sh +++ b/testing/unit_test/run_tests.sh @@ -16,7 +16,7 @@ python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.p python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py # Run the Security Module Unit Tests -python3 -u $PWD/modules/test/security/python/src/security_module_test.py +python3 -u $PWD/modules/test/tls/python/src/tls_module_test.py popd >/dev/null 2>&1 \ No newline at end of file From 3566442516e0c38ce92c1f4a11b621b13ed53f10 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 21 Jul 2023 07:37:17 -0600 Subject: [PATCH 10/17] Add TLS 1.3 tests to config fix unit tests --- modules/test/tls/conf/module_config.json | 10 ++++++++++ modules/test/tls/python/src/tls_module_test.py | 14 +++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/modules/test/tls/conf/module_config.json b/modules/test/tls/conf/module_config.json index 5515696fc..d96bb86f4 100644 --- a/modules/test/tls/conf/module_config.json +++ b/modules/test/tls/conf/module_config.json @@ -17,10 +17,20 @@ "description": "Check the device web server TLS 1.2 & certificate is valid", "expected_behavior": "TLS 1.2 certificate is issued to the web browser client when accessed" }, + { + "name": "security.tls.v1_3_server", + "description": "Check the device web server TLS 1.3 & certificate is valid", + "expected_behavior": "TLS 1.3 certificate is issued to the web browser client when accessed" + }, { "name": "security.tls.v1_2_client", "description": "Device uses TLS with connection to an external service on port 443 (or any other port which could be running the webserver-HTTPS)", "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers" + }, + { + "name": "security.tls.v1_3_client", + "description": "Device uses TLS with connection to an external service on port 443 (or any other port which could be running the webserver-HTTPS)", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.3" } ] } diff --git a/modules/test/tls/python/src/tls_module_test.py b/modules/test/tls/python/src/tls_module_test.py index f8cb08313..585bba415 100644 --- a/modules/test/tls/python/src/tls_module_test.py +++ b/modules/test/tls/python/src/tls_module_test.py @@ -21,7 +21,7 @@ class TLSModuleTest(unittest.TestCase): def setUpClass(cls): log = logger.get_logger(MODULE_NAME) global TLS_UTIL - TLS_UTIL = TLSUtil(log, bin_dir="modules/test/security/bin") + TLS_UTIL = TLSUtil(log, bin_dir="modules/test/tls/bin") def security_tls_v1_2_server_test(self): test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') @@ -162,11 +162,11 @@ def get_interface_ip(self, interface_name): if __name__ == '__main__': suite = unittest.TestSuite() - suite.addTest(SecurityModuleTest('security_tls_v1_2_server_test')) - suite.addTest(SecurityModuleTest('security_tls_v1_3_server_test')) - suite.addTest(SecurityModuleTest('security_tls_v1_2_client_test')) - suite.addTest(SecurityModuleTest('security_tls_v1_3_client_test')) - suite.addTest(SecurityModuleTest('security_tls_client_skip_test')) - suite.addTest(SecurityModuleTest('security_tls_v1_2_client_cipher_fail_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_3_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_client_test')) + suite.addTest(TLSModuleTest('security_tls_v1_3_client_test')) + suite.addTest(TLSModuleTest('security_tls_client_skip_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_client_cipher_fail_test')) runner = unittest.TextTestRunner() runner.run(suite) From d46a4f1d0e03a3a2c859328e95b1fe6749e236b3 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Tue, 25 Jul 2023 13:54:54 -0600 Subject: [PATCH 11/17] Add certificate signature checks --- local/.gitignore | 3 +- modules/test/tls/bin/check_cert_signature.sh | 11 ++ modules/test/tls/bin/start_test_module | 110 +++++++++--------- .../test/tls/python/src/tls_module_test.py | 3 +- modules/test/tls/python/src/tls_util.py | 81 ++++++++++--- modules/test/tls/tls.Dockerfile | 7 +- 6 files changed, 141 insertions(+), 74 deletions(-) create mode 100644 modules/test/tls/bin/check_cert_signature.sh diff --git a/local/.gitignore b/local/.gitignore index 4fb365c03..f13ce8d85 100644 --- a/local/.gitignore +++ b/local/.gitignore @@ -1,2 +1,3 @@ system.json -devices \ No newline at end of file +devices +root_certs \ No newline at end of file diff --git a/modules/test/tls/bin/check_cert_signature.sh b/modules/test/tls/bin/check_cert_signature.sh new file mode 100644 index 000000000..ebd4a7549 --- /dev/null +++ b/modules/test/tls/bin/check_cert_signature.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +ROOT_CERT=$1 +DEVICE_CERT=$2 + +echo "ROOT: $ROOT_CERT" +echo "DEVICE_CERT: $DEVICE_CERT" + +response=$(openssl verify -CAfile $ROOT_CERT $DEVICE_CERT) + +echo "$response" diff --git a/modules/test/tls/bin/start_test_module b/modules/test/tls/bin/start_test_module index a529c2fcf..d8cede486 100644 --- a/modules/test/tls/bin/start_test_module +++ b/modules/test/tls/bin/start_test_module @@ -1,56 +1,56 @@ -#!/bin/bash - -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# An example startup script that does the bare minimum to start -# a test module via a pyhon script. Each test module should include a -# start_test_module file that overwrites this one to boot all of its -# specific requirements to run. - -# Define where the python source files are located -PYTHON_SRC_DIR=/testrun/python/src - -# Fetch module name -MODULE_NAME=$1 - -# Default interface should be veth0 for all containers -DEFAULT_IFACE=veth0 - -# Allow a user to define an interface by passing it into this script -DEFINED_IFACE=$2 - -# Select which interace to use -if [[ -z $DEFINED_IFACE || "$DEFINED_IFACE" == "null" ]] -then - echo "No interface defined, defaulting to veth0" - INTF=$DEFAULT_IFACE -else - INTF=$DEFINED_IFACE -fi - -# Create and set permissions on the log files -LOG_FILE=/runtime/output/$MODULE_NAME.log -RESULT_FILE=/runtime/output/$MODULE_NAME-result.json -touch $LOG_FILE -touch $RESULT_FILE -chown $HOST_USER $LOG_FILE -chown $HOST_USER $RESULT_FILE - -# Run the python scrip that will execute the tests for this module -# -u flag allows python print statements -# to be logged by docker by running unbuffered -python3 -u $PYTHON_SRC_DIR/run.py "-m $MODULE_NAME" - +#!/bin/bash + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# An example startup script that does the bare minimum to start +# a test module via a pyhon script. Each test module should include a +# start_test_module file that overwrites this one to boot all of its +# specific requirements to run. + +# Define where the python source files are located +PYTHON_SRC_DIR=/testrun/python/src + +# Fetch module name +MODULE_NAME=$1 + +# Default interface should be veth0 for all containers +DEFAULT_IFACE=veth0 + +# Allow a user to define an interface by passing it into this script +DEFINED_IFACE=$2 + +# Select which interace to use +if [[ -z $DEFINED_IFACE || "$DEFINED_IFACE" == "null" ]] +then + echo "No interface defined, defaulting to veth0" + INTF=$DEFAULT_IFACE +else + INTF=$DEFINED_IFACE +fi + +# Create and set permissions on the log files +LOG_FILE=/runtime/output/$MODULE_NAME.log +RESULT_FILE=/runtime/output/$MODULE_NAME-result.json +touch $LOG_FILE +touch $RESULT_FILE +chown $HOST_USER $LOG_FILE +chown $HOST_USER $RESULT_FILE + +# Run the python scrip that will execute the tests for this module +# -u flag allows python print statements +# to be logged by docker by running unbuffered +python3 -u $PYTHON_SRC_DIR/run.py "-m $MODULE_NAME" + echo Module has finished \ No newline at end of file diff --git a/modules/test/tls/python/src/tls_module_test.py b/modules/test/tls/python/src/tls_module_test.py index 585bba415..148818ca3 100644 --- a/modules/test/tls/python/src/tls_module_test.py +++ b/modules/test/tls/python/src/tls_module_test.py @@ -21,7 +21,8 @@ class TLSModuleTest(unittest.TestCase): def setUpClass(cls): log = logger.get_logger(MODULE_NAME) global TLS_UTIL - TLS_UTIL = TLSUtil(log, bin_dir="modules/test/tls/bin") + TLS_UTIL = TLSUtil(log, bin_dir="modules/test/tls/bin", + cert_out_dir='testing/unit_test/temp', root_certs_dir='local/root_certs') def security_tls_v1_2_server_test(self): test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 4b29c64e3..feac49c90 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -3,27 +3,36 @@ from datetime import datetime from OpenSSL import crypto import json +import os +import common.util as util LOG_NAME = 'tls_util' LOGGER = None DEFAULT_BIN_DIR = '/testrun/bin' - -import common.util as util +DEFAULT_CERTS_OUT_DIR = '/runtime/output/certs' +DEFAULT_ROOT_CERTS_DIR = '/testrun/root_certs' class TLSUtil(): """Helper class for various tests concerning TLS communications""" - def __init__(self, logger, bin_dir=DEFAULT_BIN_DIR): + def __init__(self, logger, bin_dir=DEFAULT_BIN_DIR, cert_out_dir=DEFAULT_CERTS_OUT_DIR, root_certs_dir=DEFAULT_ROOT_CERTS_DIR): global LOGGER LOGGER = logger self._bin_dir = bin_dir + self._dev_cert_file = cert_out_dir + '/device_cert.crt' + self._root_certs_dir = root_certs_dir - def get_public_certificate(self, host, port=443, tls_version='1.2'): + def get_public_certificate(self, host, port=443,validate_cert=False, tls_version='1.2'): try: - # Disable certificate verification context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.check_hostname = False - context.verify_mode = ssl.CERT_NONE + if not validate_cert: + # Disable certificate verification + context.verify_mode = ssl.CERT_NONE + else: + # Use host CA certs for validation + context.load_default_certs() + context.verify_mode = ssl.CERT_REQUIRED # Set the correct TLS version context.options |= ssl.PROTOCOL_TLS @@ -40,8 +49,6 @@ def get_public_certificate(self, host, port=443, tls_version='1.2'): # Get the server's certificate in PEM format cert_pem = ssl.DER_cert_to_PEM_cert(secure_sock.getpeercert(True)) - if cert_pem: - cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) except ConnectionRefusedError: LOGGER.info(f'Connection to {host}:{port} was refused.') return None @@ -52,7 +59,7 @@ def get_public_certificate(self, host, port=443, tls_version='1.2'): LOGGER.info(f'SSL error occurred: {e}') return None - return cert + return cert_pem def get_public_key(self, public_cert): # Extract and return the public key from the certificate @@ -114,16 +121,52 @@ def verify_public_key(self, public_key): else: return False, "Key is not RSA or EC type" - def validate_signature(self, public_cert): - print('Validating signature: TODO') + def validate_signature(self,host): + # Reconnect to the device but with validate signature option + # set to true which will check for proper cert chains + # within the valid CA root certs stored on the server + LOGGER.info('Checking for valid signature from authorized Certificate Authorities') + public_cert = self.get_public_certificate(host,validate_cert=True, tls_version='1.2') + if public_cert: + LOGGER.info('Authorized Certificate Authority signature confirmed') + return True, 'Authorized Certificate Authority signature confirmed' + else: + LOGGER.info('Authorized Certificate Authority signature not present') + LOGGER.info('Resolving configured root certificates') + bin_file = self._bin_dir + "/check_cert_signature.sh" + # Get a list of all root certificates + root_certs = os.listdir(self._root_certs_dir) + for root_cert in root_certs: + try: + # Create the file path + root_cert_path = os.path.join(self._root_certs_dir, root_cert) + + args = (f'{root_cert_path} {self._dev_cert_file}') + command = f'{bin_file} {args}' + response = util.run_command(command) + if 'device_cert.crt: OK' in str(response): + LOGGER.info('Device signed by cert:' + root_cert) + return True, 'Device signed by cert:' + root_cert + else: + LOGGER.info('Device not signed by cert: ' + root_cert) + except Exception as e: + LOGGER.error('Failed to check cert:' + root_cert) + LOGGER.error(str(e)) + return False, 'Device certificate has not been signed' def validate_tls_server(self, host, tls_version, port=443): - public_cert = self.get_public_certificate(host, tls_version='1.2') - if public_cert: + cert_pem = self.get_public_certificate(host,validate_cert=False, tls_version='1.2') + if cert_pem: + + # Write pem encoding to a file + self.write_cert_to_file(cert_pem) + + # Load pem encoding into a certifiate so we can process the contents + public_cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + # Print the certificate information cert_text = crypto.dump_certificate(crypto.FILETYPE_TEXT, public_cert).decode() - LOGGER.info(cert_text) # Validate the certificates time range tr_valid = self.verify_certificate_timerange(public_cert) @@ -133,15 +176,21 @@ def validate_tls_server(self, host, tls_version, port=443): if public_key: key_valid = self.verify_public_key(public_key) + sig_valid = self.validate_signature(host) + # Check results - cert_valid = tr_valid[0] and key_valid[0] - test_details = tr_valid[1] + '\n' + key_valid[1] + cert_valid = tr_valid[0] and key_valid[0] and sig_valid[0] + test_details = tr_valid[1] + '\n' + key_valid[1] + '\n' + sig_valid[1] LOGGER.info('Certificate validated: ' + str(cert_valid)) LOGGER.info('Test Details:\n' + test_details) return cert_valid, test_details else: LOGGER.info('Failed to resolve public certificate') + def write_cert_to_file(self,pem_cert): + with open(self._dev_cert_file, 'w',encoding='UTF-8') as f: + f.write(pem_cert) + def get_ciphers(self, capture_file, dst_ip, dst_port): bin_file = self._bin_dir + "/get_ciphers.sh" args = (f'{capture_file} {dst_ip} {dst_port}') diff --git a/modules/test/tls/tls.Dockerfile b/modules/test/tls/tls.Dockerfile index 8139f4584..9a671192a 100644 --- a/modules/test/tls/tls.Dockerfile +++ b/modules/test/tls/tls.Dockerfile @@ -23,6 +23,7 @@ RUN apt-get update && apt-get install -y tshark ARG MODULE_NAME=tls ARG MODULE_DIR=modules/test/$MODULE_NAME +ARG CERTS_DIR=local/root_certs # Copy over all configuration files COPY $MODULE_DIR/conf /testrun/conf @@ -34,4 +35,8 @@ COPY $MODULE_DIR/bin /testrun/bin COPY $MODULE_DIR/python /testrun/python #Install all python requirements for the module -RUN pip3 install -r /testrun/python/requirements.txt \ No newline at end of file +RUN pip3 install -r /testrun/python/requirements.txt + +# Copy over all the local certificates for device signature +# checks if the folder exists +COPY $CERTS_DIR/ /testrun/root_certs || true \ No newline at end of file From f8a267755c185e2b0cbcce1f19c5f2fde781f969 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Tue, 25 Jul 2023 16:30:32 -0600 Subject: [PATCH 12/17] Add local cert mounting for signature validatoin Fix test results --- .../python/src/test_orc/test_orchestrator.py | 4 ++ local/.gitignore | 4 +- modules/test/tls/python/src/tls_module.py | 13 ++++-- modules/test/tls/python/src/tls_util.py | 8 ++-- modules/test/tls/tls.Dockerfile | 8 +++- testing/unit_test/run_tests.sh | 42 +++++++++---------- 6 files changed, 48 insertions(+), 31 deletions(-) diff --git a/framework/python/src/test_orc/test_orchestrator.py b/framework/python/src/test_orc/test_orchestrator.py index fef4e5bb5..248258534 100644 --- a/framework/python/src/test_orc/test_orchestrator.py +++ b/framework/python/src/test_orc/test_orchestrator.py @@ -27,6 +27,7 @@ RUNTIME_DIR = "runtime/test" TEST_MODULES_DIR = "modules/test" MODULE_CONFIG = "conf/module_config.json" +DEVICE_ROOT_CERTS = "local/root_certs" class TestOrchestrator: @@ -61,6 +62,9 @@ def start(self): os.makedirs(RUNTIME_DIR, exist_ok=True) util.run_command(f"chown -R {self._host_user} {RUNTIME_DIR}") + # Setup the root_certs folder + os.makedirs(DEVICE_ROOT_CERTS, exist_ok=True) + self._load_test_modules() self.build_test_modules() diff --git a/local/.gitignore b/local/.gitignore index f13ce8d85..a175714c6 100644 --- a/local/.gitignore +++ b/local/.gitignore @@ -1,3 +1,3 @@ -system.json -devices +system.json +devices root_certs \ No newline at end of file diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index abb2c5650..1833ee1f4 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -79,16 +79,21 @@ def _validate_tls_client(self, client_ip, tls_version): startup_result = self._tls_util.validate_tls_client(client_ip=client_ip, tls_version=tls_version, capture_file=STARTUP_CAPTURE_FILE) - if not monitor_result[0] or not startup_result[0]: + + LOGGER.info("Montor: " + str(monitor_result)) + LOGGER.info("Startup: " + str(startup_result)) + + if (not monitor_result[0] and monitor_result[0] is not None) or (not startup_result[0] and startup_result[0] is not None): result = False, startup_result[1] + monitor_result[1] elif monitor_result[0] and startup_result[0]: result = True, startup_result[1] + monitor_result[1] - elif monitor_result[0]: + elif monitor_result[0] and startup_result[0] is None: result = True, monitor_result[1] - elif startup_result[0]: + elif startup_result[0] and monitor_result[0] is None: result = True, monitor_result[1] else: - result = True, startup_result[1] + monitor_result[1] + result = None, startup_result[1] + monitor_result[1] + return result def _resolve_device_ip(self): # If the ipv4 address wasn't resolved yet, try again diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index feac49c90..3acc18744 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -9,7 +9,7 @@ LOG_NAME = 'tls_util' LOGGER = None DEFAULT_BIN_DIR = '/testrun/bin' -DEFAULT_CERTS_OUT_DIR = '/runtime/output/certs' +DEFAULT_CERTS_OUT_DIR = '/runtime/output' DEFAULT_ROOT_CERTS_DIR = '/testrun/root_certs' class TLSUtil(): @@ -24,7 +24,8 @@ def __init__(self, logger, bin_dir=DEFAULT_BIN_DIR, cert_out_dir=DEFAULT_CERTS_O def get_public_certificate(self, host, port=443,validate_cert=False, tls_version='1.2'): try: - context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + #context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False if not validate_cert: # Disable certificate verification @@ -136,11 +137,12 @@ def validate_signature(self,host): bin_file = self._bin_dir + "/check_cert_signature.sh" # Get a list of all root certificates root_certs = os.listdir(self._root_certs_dir) + LOGGER.info("Root Certs Found: " + str(len(root_certs))) for root_cert in root_certs: try: # Create the file path root_cert_path = os.path.join(self._root_certs_dir, root_cert) - + LOGGER.info("Checking root cert: " + str(root_cert_path)) args = (f'{root_cert_path} {self._dev_cert_file}') command = f'{bin_file} {args}' response = util.run_command(command) diff --git a/modules/test/tls/tls.Dockerfile b/modules/test/tls/tls.Dockerfile index 9a671192a..92fa6028c 100644 --- a/modules/test/tls/tls.Dockerfile +++ b/modules/test/tls/tls.Dockerfile @@ -37,6 +37,12 @@ COPY $MODULE_DIR/python /testrun/python #Install all python requirements for the module RUN pip3 install -r /testrun/python/requirements.txt +# Create a directory inside the container to store the root certificates +RUN mkdir -p /testrun/root_certs + # Copy over all the local certificates for device signature # checks if the folder exists -COPY $CERTS_DIR/ /testrun/root_certs || true \ No newline at end of file +COPY $CERTS_DIR /testrun/root_certs + + + diff --git a/testing/unit_test/run_tests.sh b/testing/unit_test/run_tests.sh index 5fa1179b1..525b65d46 100644 --- a/testing/unit_test/run_tests.sh +++ b/testing/unit_test/run_tests.sh @@ -1,22 +1,22 @@ -#!/bin/bash -e - -# This script should be run from within the unit_test directory. If -# it is run outside this directory, paths will not be resolved correctly. - -# Move into the root directory of test-run -pushd ../../ >/dev/null 2>&1 - -echo "Root Dir: $PWD" - -# Setup the python path -export PYTHONPATH="$PWD/framework/python/src" - -# Run the DHCP Unit tests -python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py -python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py - -# Run the Security Module Unit Tests -python3 -u $PWD/modules/test/tls/python/src/tls_module_test.py - - +#!/bin/bash -e + +# This script should be run from within the unit_test directory. If +# it is run outside this directory, paths will not be resolved correctly. + +# Move into the root directory of test-run +pushd ../../ >/dev/null 2>&1 + +echo "Root Dir: $PWD" + +# Setup the python path +export PYTHONPATH="$PWD/framework/python/src" + +# Run the DHCP Unit tests +python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py +python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py + +# Run the Security Module Unit Tests +python3 -u $PWD/modules/test/tls/python/src/tls_module_test.py + + popd >/dev/null 2>&1 \ No newline at end of file From 497a5af378bb3a2f1d4a8b48b6c2856388ef2d58 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Wed, 26 Jul 2023 09:13:12 -0600 Subject: [PATCH 13/17] Update tls 1.2 server to pass with tls 1.3 compliance Add unit tests around tls 1.2 server Misc updates and cleanup --- modules/test/tls/python/src/tls_module.py | 5 +- .../test/tls/python/src/tls_module_test.py | 56 ++++++++++++++++++- modules/test/tls/python/src/tls_util.py | 43 ++++++++++++-- testing/unit_test/run_tests.sh | 42 +++++++------- 4 files changed, 117 insertions(+), 29 deletions(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 1833ee1f4..0bede4f31 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -35,8 +35,11 @@ def _security_tls_v1_2_server(self): self._resolve_device_ip() # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is not None: - return self._tls_util.validate_tls_server(self._device_ipv4_addr, + tls_1_2_results = self._tls_util.validate_tls_server(self._device_ipv4_addr, tls_version='1.2') + tls_1_3_results = self._tls_util.validate_tls_server(self._device_ipv4_addr, + tls_version='1.3') + return self._tls_util.process_tls_server_results(tls_1_2_results,tls_1_3_results) else: LOGGER.error('Could not resolve device IP address. Skipping') return None, 'Could not resolve device IP address. Skipping' diff --git a/modules/test/tls/python/src/tls_module_test.py b/modules/test/tls/python/src/tls_module_test.py index 148818ca3..13887fd5a 100644 --- a/modules/test/tls/python/src/tls_module_test.py +++ b/modules/test/tls/python/src/tls_module_test.py @@ -24,10 +24,55 @@ def setUpClass(cls): TLS_UTIL = TLSUtil(log, bin_dir="modules/test/tls/bin", cert_out_dir='testing/unit_test/temp', root_certs_dir='local/root_certs') + # Test 1.2 server when only 1.2 connection is established def security_tls_v1_2_server_test(self): - test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') + tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') + tls_1_3_results = None, 'No TLS 1.3' + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) self.assertTrue(test_results[0]) + # Test 1.2 server when 1.3 connection is established + def security_tls_v1_2_for_1_3_server_test(self): + tls_1_2_results = None, 'No TLS 1.2' + tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + self.assertTrue(test_results[0]) + + # Test 1.2 server when 1.2 and 1.3 connection is established + def security_tls_v1_2_for_1_2_and_1_3_server_test(self): + tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') + tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + self.assertTrue(test_results[0]) + + # Test 1.2 server when 1.2 and failed 1.3 connection is established + def security_tls_v1_2_for_1_2_and_1_3_fail_server_test(self): + tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') + tls_1_3_results = False, 'Signature faild' + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + self.assertTrue(test_results[0]) + + # Test 1.2 server when 1.3 and failed 1.2 connection is established + def security_tls_v1_2_for_1_3_and_1_2_fail_server_test(self): + tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') + tls_1_2_results = False, 'Signature faild' + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + self.assertTrue(test_results[0]) + + # Test 1.2 server when 1.3 and 1.2 failed connection is established + def security_tls_v1_2_fail_server_test(self): + tls_1_2_results = False, 'Signature faild' + tls_1_3_results = False, 'Signature faild' + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + self.assertFalse(test_results[0]) + + # Test 1.2 server when 1.3 and 1.2 failed connection is established + def security_tls_v1_2_none_server_test(self): + tls_1_2_results = None, 'No cert' + tls_1_3_results = None, 'No cert' + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + self.assertIsNone(test_results[0]) + def security_tls_v1_3_server_test(self): test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') self.assertTrue(test_results[0]) @@ -163,8 +208,17 @@ def get_interface_ip(self, interface_name): if __name__ == '__main__': suite = unittest.TestSuite() + # TLS 1.2 server tests suite.addTest(TLSModuleTest('security_tls_v1_2_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_3_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_2_and_1_3_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_2_and_1_3_fail_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_3_and_1_2_fail_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_fail_server_test')) + suite.addTest(TLSModuleTest('security_tls_v1_2_none_server_test')) + # # TLS 1.3 server tests suite.addTest(TLSModuleTest('security_tls_v1_3_server_test')) + # TLS client tests suite.addTest(TLSModuleTest('security_tls_v1_2_client_test')) suite.addTest(TLSModuleTest('security_tls_v1_3_client_test')) suite.addTest(TLSModuleTest('security_tls_client_skip_test')) diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 3acc18744..dc7e7fc28 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -156,6 +156,35 @@ def validate_signature(self,host): LOGGER.error(str(e)) return False, 'Device certificate has not been signed' + def process_tls_server_results(self,tls_1_2_results, tls_1_3_results): + results = '' + if tls_1_2_results[0] is None and tls_1_3_results[0]: + results = True, 'TLS 1.3 validated:\n' + tls_1_3_results[1] + elif tls_1_3_results[0] is None and tls_1_2_results[0]: + results = True, 'TLS 1.2 validated:\n' + tls_1_2_results[1] + elif tls_1_2_results[0] and tls_1_3_results[0]: + description = 'TLS 1.2 validated:\n' + tls_1_2_results[1] + description += '\nTLS 1.3 validated:\n' + tls_1_3_results[1] + results = True, description + elif tls_1_2_results[0] and not tls_1_3_results[0]: + description = 'TLS 1.2 validated:\n' + tls_1_2_results[1] + description += '\nTLS 1.3 not validated:\n' + tls_1_3_results[1] + results = True, description + elif tls_1_3_results[0] and not tls_1_2_results[0]: + description = 'TLS 1.2 not validated:\n' + tls_1_2_results[1] + description += '\nTLS 1.3 validated:\n' + tls_1_3_results[1] + results = True, description + elif not tls_1_3_results[0] and not tls_1_2_results[0] and tls_1_2_results[0] is not None and tls_1_3_results is not None: + description = 'TLS 1.2 not validated:\n' + tls_1_2_results[1] + description += '\nTLS 1.3 not validated:\n' + tls_1_3_results[1] + results = False, description + else: + description = 'TLS 1.2 not validated:\n' + tls_1_2_results[1] + description += '\nTLS 1.3 not validated:\n' + tls_1_3_results[1] + results = None, description + LOGGER.info("TLS 1.2 server test results: " + str(results)) + return results + def validate_tls_server(self, host, tls_version, port=443): cert_pem = self.get_public_certificate(host,validate_cert=False, tls_version='1.2') if cert_pem: @@ -188,6 +217,7 @@ def validate_tls_server(self, host, tls_version, port=443): return cert_valid, test_details else: LOGGER.info('Failed to resolve public certificate') + return None, 'Failed to resolve public certificate' def write_cert_to_file(self,pem_cert): with open(self._dev_cert_file, 'w',encoding='UTF-8') as f: @@ -220,9 +250,10 @@ def parse_hello_packets(self, packets, capture_file): hello_packets = [] for packet in packets: # Extract all the basic IP information about the packet - dst_ip = packet['_source']['layers']['ip.dst'][0] - src_ip = packet['_source']['layers']['ip.src'][0] - dst_port = packet['_source']['layers']['tcp.dstport'][0] + packet_layers = packet['_source']['layers'] + dst_ip = packet_layers['ip.dst'][0] if 'ip.dst' in packet_layers else '' + src_ip = packet_layers['ip.src'][0] if 'ip.src' in packet_layers else '' + dst_port = packet_layers['tcp.dstport'][0] if 'tcp.dstport' in packet_layers else '' # Resolve the ciphers used in this packet and validate expected ones exist ciphers = self.get_ciphers(capture_file, dst_ip, dst_port) @@ -230,9 +261,9 @@ def parse_hello_packets(self, packets, capture_file): # Put result together hello_packet = {} - hello_packet['dst_ip'] = packet['_source']['layers']['ip.dst'][0] - hello_packet['src_ip'] = packet['_source']['layers']['ip.src'][0] - hello_packet['dst_port'] = packet['_source']['layers']['tcp.dstport'][0] + hello_packet['dst_ip'] = dst_ip + hello_packet['src_ip'] = src_ip + hello_packet['dst_port'] = dst_port hello_packet['cipher_support'] = cipher_support hello_packets.append(hello_packet) diff --git a/testing/unit_test/run_tests.sh b/testing/unit_test/run_tests.sh index 525b65d46..5fa1179b1 100644 --- a/testing/unit_test/run_tests.sh +++ b/testing/unit_test/run_tests.sh @@ -1,22 +1,22 @@ -#!/bin/bash -e - -# This script should be run from within the unit_test directory. If -# it is run outside this directory, paths will not be resolved correctly. - -# Move into the root directory of test-run -pushd ../../ >/dev/null 2>&1 - -echo "Root Dir: $PWD" - -# Setup the python path -export PYTHONPATH="$PWD/framework/python/src" - -# Run the DHCP Unit tests -python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py -python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py - -# Run the Security Module Unit Tests -python3 -u $PWD/modules/test/tls/python/src/tls_module_test.py - - +#!/bin/bash -e + +# This script should be run from within the unit_test directory. If +# it is run outside this directory, paths will not be resolved correctly. + +# Move into the root directory of test-run +pushd ../../ >/dev/null 2>&1 + +echo "Root Dir: $PWD" + +# Setup the python path +export PYTHONPATH="$PWD/framework/python/src" + +# Run the DHCP Unit tests +python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py +python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py + +# Run the Security Module Unit Tests +python3 -u $PWD/modules/test/tls/python/src/tls_module_test.py + + popd >/dev/null 2>&1 \ No newline at end of file From e715c455159e9947b6769a92c3ef2c2825cda15d Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Wed, 26 Jul 2023 09:51:33 -0600 Subject: [PATCH 14/17] pylinting --- modules/test/tls/python/src/tls_module.py | 34 +++--- .../test/tls/python/src/tls_module_test.py | 93 ++++++++++------ modules/test/tls/python/src/tls_util.py | 101 ++++++++++++------ 3 files changed, 148 insertions(+), 80 deletions(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 0bede4f31..245d60081 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -35,11 +35,12 @@ def _security_tls_v1_2_server(self): self._resolve_device_ip() # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is not None: - tls_1_2_results = self._tls_util.validate_tls_server(self._device_ipv4_addr, - tls_version='1.2') - tls_1_3_results = self._tls_util.validate_tls_server(self._device_ipv4_addr, - tls_version='1.3') - return self._tls_util.process_tls_server_results(tls_1_2_results,tls_1_3_results) + tls_1_2_results = self._tls_util.validate_tls_server( + self._device_ipv4_addr, tls_version='1.2') + tls_1_3_results = self._tls_util.validate_tls_server( + self._device_ipv4_addr, tls_version='1.3') + return self._tls_util.process_tls_server_results(tls_1_2_results, + tls_1_3_results) else: LOGGER.error('Could not resolve device IP address. Skipping') return None, 'Could not resolve device IP address. Skipping' @@ -76,17 +77,20 @@ def _security_tls_v1_3_client(self): return None, 'Could not resolve device IP address. Skipping' def _validate_tls_client(self, client_ip, tls_version): - monitor_result = self._tls_util.validate_tls_client(client_ip=client_ip, - tls_version=tls_version, - capture_file=MONITOR_CAPTURE_FILE) - startup_result = self._tls_util.validate_tls_client(client_ip=client_ip, - tls_version=tls_version, - capture_file=STARTUP_CAPTURE_FILE) + monitor_result = self._tls_util.validate_tls_client( + client_ip=client_ip, + tls_version=tls_version, + capture_file=MONITOR_CAPTURE_FILE) + startup_result = self._tls_util.validate_tls_client( + client_ip=client_ip, + tls_version=tls_version, + capture_file=STARTUP_CAPTURE_FILE) - LOGGER.info("Montor: " + str(monitor_result)) - LOGGER.info("Startup: " + str(startup_result)) + LOGGER.info('Montor: ' + str(monitor_result)) + LOGGER.info('Startup: ' + str(startup_result)) - if (not monitor_result[0] and monitor_result[0] is not None) or (not startup_result[0] and startup_result[0] is not None): + if (not monitor_result[0] and monitor_result[0] is not None) or ( + not startup_result[0] and startup_result[0] is not None): result = False, startup_result[1] + monitor_result[1] elif monitor_result[0] and startup_result[0]: result = True, startup_result[1] + monitor_result[1] @@ -101,4 +105,4 @@ def _validate_tls_client(self, client_ip, tls_version): def _resolve_device_ip(self): # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4(self) + self._device_ipv4_addr = self._get_device_ipv4() diff --git a/modules/test/tls/python/src/tls_module_test.py b/modules/test/tls/python/src/tls_module_test.py index 13887fd5a..889444ddf 100644 --- a/modules/test/tls/python/src/tls_module_test.py +++ b/modules/test/tls/python/src/tls_module_test.py @@ -1,11 +1,24 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module run all the TLS related unit tests""" from tls_util import TLSUtil import unittest -import common.logger as logger +from common import logger from scapy.all import sniff, wrpcap import os import threading import time -import requests import netifaces import ssl import http.client @@ -15,62 +28,78 @@ TLS_UTIL = None PACKET_CAPTURE = None -class TLSModuleTest(unittest.TestCase): +class TLSModuleTest(unittest.TestCase): + """Contains and runs all the unit tests concerning TLS behaviors""" @classmethod def setUpClass(cls): log = logger.get_logger(MODULE_NAME) global TLS_UTIL - TLS_UTIL = TLSUtil(log, bin_dir="modules/test/tls/bin", - cert_out_dir='testing/unit_test/temp', root_certs_dir='local/root_certs') + TLS_UTIL = TLSUtil(log, + bin_dir='modules/test/tls/bin', + cert_out_dir='testing/unit_test/temp', + root_certs_dir='local/root_certs') # Test 1.2 server when only 1.2 connection is established def security_tls_v1_2_server_test(self): - tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') + tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', + tls_version='1.2') tls_1_3_results = None, 'No TLS 1.3' - test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, + tls_1_3_results) self.assertTrue(test_results[0]) # Test 1.2 server when 1.3 connection is established def security_tls_v1_2_for_1_3_server_test(self): tls_1_2_results = None, 'No TLS 1.2' - tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') - test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', + tls_version='1.3') + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, + tls_1_3_results) self.assertTrue(test_results[0]) # Test 1.2 server when 1.2 and 1.3 connection is established def security_tls_v1_2_for_1_2_and_1_3_server_test(self): - tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') - tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') - test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', + tls_version='1.2') + tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', + tls_version='1.3') + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, + tls_1_3_results) self.assertTrue(test_results[0]) # Test 1.2 server when 1.2 and failed 1.3 connection is established def security_tls_v1_2_for_1_2_and_1_3_fail_server_test(self): - tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.2') + tls_1_2_results = TLS_UTIL.validate_tls_server('google.com', + tls_version='1.2') tls_1_3_results = False, 'Signature faild' - test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, + tls_1_3_results) self.assertTrue(test_results[0]) # Test 1.2 server when 1.3 and failed 1.2 connection is established def security_tls_v1_2_for_1_3_and_1_2_fail_server_test(self): - tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') + tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', + tls_version='1.3') tls_1_2_results = False, 'Signature faild' - test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, + tls_1_3_results) self.assertTrue(test_results[0]) # Test 1.2 server when 1.3 and 1.2 failed connection is established def security_tls_v1_2_fail_server_test(self): tls_1_2_results = False, 'Signature faild' tls_1_3_results = False, 'Signature faild' - test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, + tls_1_3_results) self.assertFalse(test_results[0]) # Test 1.2 server when 1.3 and 1.2 failed connection is established def security_tls_v1_2_none_server_test(self): tls_1_2_results = None, 'No cert' tls_1_3_results = None, 'No cert' - test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,tls_1_3_results) + test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, + tls_1_3_results) self.assertIsNone(test_results[0]) def security_tls_v1_3_server_test(self): @@ -124,12 +153,12 @@ def generate_tls_traffic(self, capture_file, tls_version, disable_valid_ciphers=False): - capture_thread = self.start_capture_thread(capture_file, 10) + capture_thread = self.start_capture_thread(10) print('Capture Started') # Generate some TLS 1.2 outbound traffic - while (capture_thread.is_alive()): - self.make_tls_connection("www.google.com", 443, tls_version, + while capture_thread.is_alive(): + self.make_tls_connection('www.google.com', 443, tls_version, disable_valid_ciphers) time.sleep(1) @@ -150,12 +179,12 @@ def make_tls_connection(self, if disable_valid_ciphers: # Create a list of ciphers that do not use ECDH or ECDSA ciphers_str = [ - "TLS_AES_256_GCM_SHA384", "TLS_CHACHA20_POLY1305_SHA256", - "TLS_AES_128_GCM_SHA256", "AES256-GCM-SHA384", - "PSK-AES256-GCM-SHA384", "PSK-CHACHA20-POLY1305", - "RSA-PSK-AES128-GCM-SHA256", "DHE-PSK-AES128-GCM-SHA256", - "AES128-GCM-SHA256", "PSK-AES128-GCM-SHA256", "AES256-SHA256", - "AES128-SHA" + 'TLS_AES_256_GCM_SHA384', 'TLS_CHACHA20_POLY1305_SHA256', + 'TLS_AES_128_GCM_SHA256', 'AES256-GCM-SHA384', + 'PSK-AES256-GCM-SHA384', 'PSK-CHACHA20-POLY1305', + 'RSA-PSK-AES128-GCM-SHA256', 'DHE-PSK-AES128-GCM-SHA256', + 'AES128-GCM-SHA256', 'PSK-AES128-GCM-SHA256', 'AES256-SHA256', + 'AES128-SHA' ] context.set_ciphers(':'.join(ciphers_str)) @@ -188,7 +217,7 @@ def start_capture(self, timeout): global PACKET_CAPTURE PACKET_CAPTURE = sniff(iface='eth0', timeout=timeout) - def start_capture_thread(self, capture_file, timeout): + def start_capture_thread(self, timeout): # Start the packet capture in a separate thread to avoid blocking. capture_thread = threading.Thread(target=self.start_capture, args=(timeout, )) @@ -202,7 +231,7 @@ def get_interface_ip(self, interface_name): ipv4 = addresses[netifaces.AF_INET][0]['addr'] return ipv4 except (ValueError, KeyError) as e: - print(f"Error: {e}") + print(f'Error: {e}') return None @@ -212,8 +241,10 @@ def get_interface_ip(self, interface_name): suite.addTest(TLSModuleTest('security_tls_v1_2_server_test')) suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_3_server_test')) suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_2_and_1_3_server_test')) - suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_2_and_1_3_fail_server_test')) - suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_3_and_1_2_fail_server_test')) + suite.addTest( + TLSModuleTest('security_tls_v1_2_for_1_2_and_1_3_fail_server_test')) + suite.addTest( + TLSModuleTest('security_tls_v1_2_for_1_3_and_1_2_fail_server_test')) suite.addTest(TLSModuleTest('security_tls_v1_2_fail_server_test')) suite.addTest(TLSModuleTest('security_tls_v1_2_none_server_test')) # # TLS 1.3 server tests diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index dc7e7fc28..5a1477897 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -1,10 +1,24 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module that contains various metehods for validating TLS communications""" import ssl import socket from datetime import datetime from OpenSSL import crypto import json import os -import common.util as util +from common import util LOG_NAME = 'tls_util' LOGGER = None @@ -12,17 +26,26 @@ DEFAULT_CERTS_OUT_DIR = '/runtime/output' DEFAULT_ROOT_CERTS_DIR = '/testrun/root_certs' + class TLSUtil(): """Helper class for various tests concerning TLS communications""" - def __init__(self, logger, bin_dir=DEFAULT_BIN_DIR, cert_out_dir=DEFAULT_CERTS_OUT_DIR, root_certs_dir=DEFAULT_ROOT_CERTS_DIR): + def __init__(self, + logger, + bin_dir=DEFAULT_BIN_DIR, + cert_out_dir=DEFAULT_CERTS_OUT_DIR, + root_certs_dir=DEFAULT_ROOT_CERTS_DIR): global LOGGER LOGGER = logger self._bin_dir = bin_dir self._dev_cert_file = cert_out_dir + '/device_cert.crt' self._root_certs_dir = root_certs_dir - def get_public_certificate(self, host, port=443,validate_cert=False, tls_version='1.2'): + def get_public_certificate(self, + host, + port=443, + validate_cert=False, + tls_version='1.2'): try: #context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) @@ -120,30 +143,33 @@ def verify_public_key(self, public_key): else: return False, 'EC key length too short: ' + str(key_length) + ' < 224' else: - return False, "Key is not RSA or EC type" + return False, 'Key is not RSA or EC type' - def validate_signature(self,host): + def validate_signature(self, host): # Reconnect to the device but with validate signature option # set to true which will check for proper cert chains # within the valid CA root certs stored on the server - LOGGER.info('Checking for valid signature from authorized Certificate Authorities') - public_cert = self.get_public_certificate(host,validate_cert=True, tls_version='1.2') + LOGGER.info( + 'Checking for valid signature from authorized Certificate Authorities') + public_cert = self.get_public_certificate(host, + validate_cert=True, + tls_version='1.2') if public_cert: LOGGER.info('Authorized Certificate Authority signature confirmed') return True, 'Authorized Certificate Authority signature confirmed' else: LOGGER.info('Authorized Certificate Authority signature not present') LOGGER.info('Resolving configured root certificates') - bin_file = self._bin_dir + "/check_cert_signature.sh" + bin_file = self._bin_dir + '/check_cert_signature.sh' # Get a list of all root certificates root_certs = os.listdir(self._root_certs_dir) - LOGGER.info("Root Certs Found: " + str(len(root_certs))) + LOGGER.info('Root Certs Found: ' + str(len(root_certs))) for root_cert in root_certs: try: # Create the file path root_cert_path = os.path.join(self._root_certs_dir, root_cert) - LOGGER.info("Checking root cert: " + str(root_cert_path)) - args = (f'{root_cert_path} {self._dev_cert_file}') + LOGGER.info('Checking root cert: ' + str(root_cert_path)) + args = f'{root_cert_path} {self._dev_cert_file}' command = f'{bin_file} {args}' response = util.run_command(command) if 'device_cert.crt: OK' in str(response): @@ -151,12 +177,12 @@ def validate_signature(self,host): return True, 'Device signed by cert:' + root_cert else: LOGGER.info('Device not signed by cert: ' + root_cert) - except Exception as e: + except Exception as e: # pylint: disable=W0718 LOGGER.error('Failed to check cert:' + root_cert) LOGGER.error(str(e)) return False, 'Device certificate has not been signed' - def process_tls_server_results(self,tls_1_2_results, tls_1_3_results): + def process_tls_server_results(self, tls_1_2_results, tls_1_3_results): results = '' if tls_1_2_results[0] is None and tls_1_3_results[0]: results = True, 'TLS 1.3 validated:\n' + tls_1_3_results[1] @@ -174,7 +200,8 @@ def process_tls_server_results(self,tls_1_2_results, tls_1_3_results): description = 'TLS 1.2 not validated:\n' + tls_1_2_results[1] description += '\nTLS 1.3 validated:\n' + tls_1_3_results[1] results = True, description - elif not tls_1_3_results[0] and not tls_1_2_results[0] and tls_1_2_results[0] is not None and tls_1_3_results is not None: + elif not tls_1_3_results[0] and not tls_1_2_results[0] and tls_1_2_results[ + 0] is not None and tls_1_3_results is not None: description = 'TLS 1.2 not validated:\n' + tls_1_2_results[1] description += '\nTLS 1.3 not validated:\n' + tls_1_3_results[1] results = False, description @@ -182,11 +209,13 @@ def process_tls_server_results(self,tls_1_2_results, tls_1_3_results): description = 'TLS 1.2 not validated:\n' + tls_1_2_results[1] description += '\nTLS 1.3 not validated:\n' + tls_1_3_results[1] results = None, description - LOGGER.info("TLS 1.2 server test results: " + str(results)) + LOGGER.info('TLS 1.2 server test results: ' + str(results)) return results - def validate_tls_server(self, host, tls_version, port=443): - cert_pem = self.get_public_certificate(host,validate_cert=False, tls_version='1.2') + def validate_tls_server(self, host, tls_version): + cert_pem = self.get_public_certificate(host, + validate_cert=False, + tls_version=tls_version) if cert_pem: # Write pem encoding to a file @@ -198,6 +227,7 @@ def validate_tls_server(self, host, tls_version, port=443): # Print the certificate information cert_text = crypto.dump_certificate(crypto.FILETYPE_TEXT, public_cert).decode() + LOGGER.info('Device Certificate:\n' + cert_text) # Validate the certificates time range tr_valid = self.verify_certificate_timerange(public_cert) @@ -219,29 +249,29 @@ def validate_tls_server(self, host, tls_version, port=443): LOGGER.info('Failed to resolve public certificate') return None, 'Failed to resolve public certificate' - def write_cert_to_file(self,pem_cert): - with open(self._dev_cert_file, 'w',encoding='UTF-8') as f: + def write_cert_to_file(self, pem_cert): + with open(self._dev_cert_file, 'w', encoding='UTF-8') as f: f.write(pem_cert) def get_ciphers(self, capture_file, dst_ip, dst_port): - bin_file = self._bin_dir + "/get_ciphers.sh" - args = (f'{capture_file} {dst_ip} {dst_port}') + bin_file = self._bin_dir + '/get_ciphers.sh' + args = f'{capture_file} {dst_ip} {dst_port}' command = f'{bin_file} {args}' response = util.run_command(command) - ciphers = response[0].split("\n") + ciphers = response[0].split('\n') return ciphers def get_hello_packets(self, capture_file, src_ip, tls_version): - bin_file = self._bin_dir + "/get_client_hello_packets.sh" - args = (f'{capture_file} {src_ip} {tls_version}') + bin_file = self._bin_dir + '/get_client_hello_packets.sh' + args = f'{capture_file} {src_ip} {tls_version}' command = f'{bin_file} {args}' response = util.run_command(command) packets = response[0].strip() return self.parse_hello_packets(json.loads(packets), capture_file) def get_handshake_complete(self, capture_file, src_ip, dst_ip, tls_version): - bin_file = self._bin_dir + "/get_handshake_complete.sh" - args = (f'{capture_file} {src_ip} {dst_ip} {tls_version}') + bin_file = self._bin_dir + '/get_handshake_complete.sh' + args = f'{capture_file} {src_ip} {dst_ip} {tls_version}' command = f'{bin_file} {args}' response = util.run_command(command) return response @@ -253,7 +283,8 @@ def parse_hello_packets(self, packets, capture_file): packet_layers = packet['_source']['layers'] dst_ip = packet_layers['ip.dst'][0] if 'ip.dst' in packet_layers else '' src_ip = packet_layers['ip.src'][0] if 'ip.src' in packet_layers else '' - dst_port = packet_layers['tcp.dstport'][0] if 'tcp.dstport' in packet_layers else '' + dst_port = packet_layers['tcp.dstport'][ + 0] if 'tcp.dstport' in packet_layers else '' # Resolve the ciphers used in this packet and validate expected ones exist ciphers = self.get_ciphers(capture_file, dst_ip, dst_port) @@ -270,11 +301,11 @@ def parse_hello_packets(self, packets, capture_file): return hello_packets def validate_tls_client(self, client_ip, tls_version, capture_file): - LOGGER.info("Validating client for TLS: " + tls_version) + LOGGER.info('Validating client for TLS: ' + tls_version) hello_packets = self.get_hello_packets(capture_file, client_ip, tls_version) # Validate the ciphers only for tls 1.2 - client_hello_results = {"valid": [], "invalid": []} + client_hello_results = {'valid': [], 'invalid': []} if tls_version == '1.2': for packet in hello_packets: if packet['dst_ip'] not in str(client_hello_results['valid']): @@ -288,9 +319,9 @@ def validate_tls_client(self, client_ip, tls_version, capture_file): if packet['dst_ip'] in str(client_hello_results['invalid']): client_hello_results['invalid'].remove(packet) else: - LOGGER.info("Invalid ciphers detected") - if packet['dst_ip'] not in str(client_hello_results['invalid']): - client_hello_results['invalid'].append(packet) + LOGGER.info('Invalid ciphers detected') + if packet['dst_ip'] not in str(client_hello_results['invalid']): + client_hello_results['invalid'].append(packet) else: # No cipher check for TLS 1.3 client_hello_results['valid'] = hello_packets @@ -332,14 +363,16 @@ def validate_tls_client(self, client_ip, tls_version, capture_file): tls_client_details += '\n' if len(handshakes['incomplete']) > 0: for result in handshakes['incomplete']: - tls_client_details += 'Incomplete handshake detected from server: ' + result + '\n' + tls_client_details += 'Incomplete handshake detected from server: ' + tls_client_details += result + '\n' if len(handshakes['complete']) > 0: # If we haven't already failed the test from previous checks # allow a passing result if not tls_client_valid: tls_client_valid = True for result in handshakes['complete']: - tls_client_details += 'Completed handshake detected from server: ' + result + '\n' + tls_client_details += 'Completed handshake detected from server: ' + tls_client_details += result + '\n' else: LOGGER.info('No client hello packets detected. Skipping') tls_client_details = 'No client hello packets detected. Skipping' From 8100431dc4a03978d353143c20f12a7d005214f6 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Wed, 26 Jul 2023 11:41:29 -0600 Subject: [PATCH 15/17] Update cipher checks and add test --- modules/test/tls/conf/module_config.json | 2 +- modules/test/tls/python/src/tls_module_test.py | 10 ++++++++++ modules/test/tls/python/src/tls_util.py | 18 ++++++++++++------ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/modules/test/tls/conf/module_config.json b/modules/test/tls/conf/module_config.json index d96bb86f4..59e5a839d 100644 --- a/modules/test/tls/conf/module_config.json +++ b/modules/test/tls/conf/module_config.json @@ -9,7 +9,7 @@ "docker": { "depends_on": "base", "enable_container": true, - "timeout": 60 + "timeout": 300 }, "tests":[ { diff --git a/modules/test/tls/python/src/tls_module_test.py b/modules/test/tls/python/src/tls_module_test.py index 889444ddf..84a1c70eb 100644 --- a/modules/test/tls/python/src/tls_module_test.py +++ b/modules/test/tls/python/src/tls_module_test.py @@ -128,6 +128,15 @@ def security_tls_v1_3_client_test(self): print(str(test_results)) self.assertTrue(test_results[0]) + def client_hello_packets_test(self): + packet_fail = {'dst_ip': '10.10.10.1', 'src_ip': '10.10.10.14', 'dst_port': '443', 'cipher_support': {'ecdh': False, 'ecdsa': True}} + packet_success = {'dst_ip': '10.10.10.1', 'src_ip': '10.10.10.14', 'dst_port': '443', 'cipher_support': {'ecdh': True, 'ecdsa': True}} + hello_packets = [packet_fail,packet_success] + hello_results = TLS_UTIL.process_hello_packets(hello_packets,'1.2') + print("Hello packets test results: " + str(hello_results)) + expected = {'valid':[packet_success],'invalid':[]} + self.assertEqual(hello_results,expected) + def test_client_tls(self, tls_version, tls_generate=None, @@ -237,6 +246,7 @@ def get_interface_ip(self, interface_name): if __name__ == '__main__': suite = unittest.TestSuite() + suite.addTest(TLSModuleTest('client_hello_packets_test')) # TLS 1.2 server tests suite.addTest(TLSModuleTest('security_tls_v1_2_server_test')) suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_3_server_test')) diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 5a1477897..c83c131af 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -300,10 +300,7 @@ def parse_hello_packets(self, packets, capture_file): hello_packets.append(hello_packet) return hello_packets - def validate_tls_client(self, client_ip, tls_version, capture_file): - LOGGER.info('Validating client for TLS: ' + tls_version) - hello_packets = self.get_hello_packets(capture_file, client_ip, tls_version) - + def process_hello_packets(self,hello_packets, tls_version = '1.2'): # Validate the ciphers only for tls 1.2 client_hello_results = {'valid': [], 'invalid': []} if tls_version == '1.2': @@ -317,7 +314,10 @@ def validate_tls_client(self, client_ip, tls_version, capture_file): # If a previous hello packet to the same destination failed, # we can now remove it as it has passed on a different attempt if packet['dst_ip'] in str(client_hello_results['invalid']): - client_hello_results['invalid'].remove(packet) + LOGGER.info(str(client_hello_results['invalid'])) + for invalid_packet in client_hello_results['invalid']: + if packet['dst_ip'] in str(invalid_packet): + client_hello_results['invalid'].remove(invalid_packet) else: LOGGER.info('Invalid ciphers detected') if packet['dst_ip'] not in str(client_hello_results['invalid']): @@ -325,6 +325,12 @@ def validate_tls_client(self, client_ip, tls_version, capture_file): else: # No cipher check for TLS 1.3 client_hello_results['valid'] = hello_packets + return client_hello_results + + def validate_tls_client(self, client_ip, tls_version, capture_file): + LOGGER.info('Validating client for TLS: ' + tls_version) + hello_packets = self.get_hello_packets(capture_file, client_ip, tls_version) + client_hello_results = self.process_hello_packets(hello_packets,tls_version) handshakes = {'complete': [], 'incomplete': []} for packet in client_hello_results['valid']: @@ -368,7 +374,7 @@ def validate_tls_client(self, client_ip, tls_version, capture_file): if len(handshakes['complete']) > 0: # If we haven't already failed the test from previous checks # allow a passing result - if not tls_client_valid: + if tls_client_valid is None: tls_client_valid = True for result in handshakes['complete']: tls_client_details += 'Completed handshake detected from server: ' From 08db163b346a8136d652beb41538ed8b59f70cab Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Wed, 26 Jul 2023 14:12:05 -0600 Subject: [PATCH 16/17] Fix test results when None is returned with details --- modules/test/base/python/src/test_module.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py index 6ff4f815b..dfa40e253 100644 --- a/modules/test/base/python/src/test_module.py +++ b/modules/test/base/python/src/test_module.py @@ -96,7 +96,10 @@ def run_tests(self): if isinstance(result, bool): test['result'] = 'compliant' if result else 'non-compliant' else: - test['result'] = 'compliant' if result[0] else 'non-compliant' + if result[0] is None: + test['result'] = 'skipped' + else: + test['result'] = 'compliant' if result[0] else 'non-compliant' test['result_details'] = result[1] else: test['result'] = 'skipped' From ae35491e575a5e599112d05947a7b4be4d21ebd0 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Tue, 1 Aug 2023 13:27:35 -0600 Subject: [PATCH 17/17] Fix duplicate results --- modules/test/tls/python/src/tls_module.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 245d60081..d58163266 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -99,7 +99,7 @@ def _validate_tls_client(self, client_ip, tls_version): elif startup_result[0] and monitor_result[0] is None: result = True, monitor_result[1] else: - result = None, startup_result[1] + monitor_result[1] + result = None, startup_result[1] return result def _resolve_device_ip(self):