Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions modules/test/tls/python/src/tls_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from ipaddress import IPv4Address

LOG_NAME = 'tls_util'
LOGGER = None
DEFAULT_BIN_DIR = '/testrun/bin'
DEFAULT_CERTS_OUT_DIR = '/runtime/output'
DEFAULT_ROOT_CERTS_DIR = '/testrun/root_certs'
# Define private IP subnets
PRIVATE_SUBNETS = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16')
]


class TLSUtil():
Expand Down Expand Up @@ -244,8 +251,7 @@ def validate_trusted_ca_signature_chain(self, device_cert_path,

combined_cert_name = 'device_cert_full.crt'
combined_cert = dev_cert + inter_cert
combined_cert_path = os.path.join(self._cert_out_dir,
combined_cert_name)
combined_cert_path = os.path.join(self._cert_out_dir, combined_cert_name)
with open(combined_cert_path, 'w', encoding='utf-8') as f:
f.write(combined_cert)

Expand All @@ -254,7 +260,7 @@ def validate_trusted_ca_signature_chain(self, device_cert_path,
args = f'{intermediate_cert_path} {combined_cert_path}'
command = f'{bin_file} {args}'
response = util.run_command(command)
return combined_cert_name +': OK' in str(response)
return combined_cert_name + ': OK' in str(response)

def get_ca_issuer(self, certificate):
cert_file_path = None
Expand Down Expand Up @@ -600,6 +606,13 @@ def get_tls_client_connection_ips(self, client_ip, capture_files):
tls_dst_ips.add(str(dst_ip))
return tls_dst_ips

def is_private_ip(self, ip):
# Check if an IP is within any private IP subnet
for subnet in PRIVATE_SUBNETS:
if ip in subnet:
return True
return False

def validate_tls_client(self, client_ip, tls_version, capture_files):
LOGGER.info('Validating client for TLS: ' + tls_version)
hello_packets = self.get_hello_packets(capture_files, client_ip,
Expand Down Expand Up @@ -672,7 +685,13 @@ def validate_tls_client(self, client_ip, tls_version, capture_files):
# need to report true unencrypted oubound connections
if len(non_tls_client_ips) > 0:
for ip in non_tls_client_ips:
if ip not in tls_client_ips:
if self.is_private_ip(IPv4Address(ip)):
# Allow private IP unencrypted traffic but report in results
LOGGER.info(
f'Non-TLS client traffic detected on private subnet to {ip}')
tls_client_details += (
f'\nAllowing non-TLS traffic to private subnet {ip}')
elif ip not in tls_client_ips:
tls_client_valid = False
tls_client_details += f'''\nNon-TLS connection detected to {ip}'''
else:
Expand Down