From ba73627392bfbe0f37aa684f1ed015f5cd115179 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Thu, 10 Oct 2024 16:27:14 -0600 Subject: [PATCH 1/3] Add methods to generate table of all outbound connection ips --- modules/test/tls/python/src/tls_module.py | 21 +++++++++++++ modules/test/tls/python/src/tls_util.py | 38 +++++++++++++++++++++++ testing/unit/tls/tls_module_test.py | 26 ++++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 186766b17..d6b013bca 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -191,6 +191,27 @@ def __init__(self, # LOGGER.info('Module report generated at: ' + str(report_path)) # return report_path + def generate_outbound_connection_table(self, ip_list): + """Generate just an HTML table from a list of IPs""" + html_content = """ + + + + + + """ + + rows = [f'\t' for ip in ip_list] + html_content += '\n'.join(rows) + + # Close the table + html_content += """ + + \r
Destination IP
{ip}
+ """ + + return html_content + def extract_certificates_from_pcap(self, pcap_files, mac_address): # Initialize a list to store packets all_packets = [] diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 9f00b96ef..8a1605b78 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -25,6 +25,7 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from ipaddress import IPv4Address +from scapy.all import rdpcap, IP, Ether LOG_NAME = 'tls_util' LOGGER = None @@ -37,6 +38,7 @@ ipaddress.ip_network('172.16.0.0/12'), ipaddress.ip_network('192.168.0.0/16') ] +TR_CONTAINER_MAC_PREFIX = '9a:02:57:1e:8f:' #Define the allowed protocols as tshark filters DEFAULT_ALLOWED_PROTOCOLS = ['quic'] @@ -59,6 +61,42 @@ def __init__(self, if allowed_protocols is None: self._allowed_protocols = DEFAULT_ALLOWED_PROTOCOLS + def get_all_outbound_connections(self, device_mac, capture_files): + """Process multiple pcap files and combine unique IP destinations.""" + + ip_destinations = set() + for capture in capture_files: + ips = self.get_outbound_connections(device_mac=device_mac, + capture_file=capture) + ip_destinations.update(ips) + return list(ip_destinations) + + def get_outbound_connections(self, device_mac, capture_file): + """Extract unique IP destinations from a single pcap file based on the + known MAC address.""" + packets = rdpcap(capture_file) + ip_destinations = set() + for packet in packets: + if Ether in packet and IP in packet: + if packet[Ether].src == device_mac: + ip_dst = packet[IP].dst + if self.is_external_ip(ip_dst): + ip_destinations.add(ip_dst) + return ip_destinations + + def is_external_ip(self, ip): + """Check if the IP is an external (non-private) IP address.""" + try: + # Convert the IP string into an IPv4Address object + ip_addr = ipaddress.ip_address(ip) + + # Return True only if the IP is not in a private or reserved range + return not (ip_addr.is_private or ip_addr.is_loopback + or ip_addr.is_link_local) + except ValueError: + # Return False if the IP is invalid or not IPv4 + return False + def get_public_certificate(self, host, port=443, diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index fc37ade40..dfad40319 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Module run all the TLS related unit tests""" +from tls_module import TLSModule from tls_util import TLSUtil import os import unittest @@ -319,6 +320,28 @@ def security_tls_client_allowed_protocols_test(self): print(str(test_results)) self.assertTrue(test_results[0]) + def outbound_connections_test(self): + """ Test generation of the outbound connection ips""" + print('\noutbound_connections_test') + capture_file = os.path.join(CAPTURES_DIR, 'monitor.pcap') + ip_dst = TLS_UTIL.get_all_outbound_connections( + device_mac='70:b3:d5:96:c0:00', capture_files=[capture_file]) + print(str(ip_dst)) + # Compare as sets since returned order is not guaranteed + self.assertEqual( + set(ip_dst), + set(['8.8.8.8', '224.0.0.22', '18.140.82.197', '216.239.35.0'])) + + def outbound_connections_report_test(self): + """ Test generation of the outbound connection ips""" + print('\noutbound_connections_report_test') + capture_file = os.path.join(CAPTURES_DIR, 'monitor.pcap') + ip_dst = TLS_UTIL.get_all_outbound_connections( + device_mac='70:b3:d5:96:c0:00', capture_files=[capture_file]) + tls = TLSModule(module=MODULE) + gen_html = tls.generate_outbound_connection_table(ip_dst) + print(gen_html) + # Commented out whilst TLS report is recreated # def tls_module_report_test(self): # print('\ntls_module_report_test') @@ -576,6 +599,9 @@ def download_public_cert(self, hostname, port=443): suite.addTest(TLSModuleTest('security_tls_client_allowed_protocols_test')) + suite.addTest(TLSModuleTest('outbound_connections_test')) + suite.addTest(TLSModuleTest('outbound_connections_report_test')) + runner = unittest.TextTestRunner() test_result = runner.run(suite) From bc33d539b9351849b1afd4f7eceaa951360f0c66 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Thu, 10 Oct 2024 16:39:11 -0600 Subject: [PATCH 2/3] pylint --- testing/unit/tls/tls_module_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index 7ad070488..4d15e97a9 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -14,7 +14,6 @@ """Module run all the TLS related unit tests""" from tls_module import TLSModule from tls_util import TLSUtil -from tls_module import TLSModule import os import unittest from common import logger From a06584325d1dd739bfcb37a28d3c5086ab1b2f73 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 11 Oct 2024 16:05:40 -0600 Subject: [PATCH 3/3] Add outbound connections to report Add port to table --- modules/test/tls/python/src/tls_module.py | 38 ++++++++++++++++------- modules/test/tls/python/src/tls_util.py | 27 ++++++++++------ 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index fae9cbd98..bda5238c0 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -86,8 +86,10 @@ def generate_module_report(self): ''' cert_tables = [] - for cert_num, ((ip_address, port), # pylint: disable=W0612 - cert) in enumerate(certificates.items()): + # pylint: disable=W0612 + for cert_num, ( + (ip_address, port), + cert) in enumerate(certificates.items()): # Extract certificate data not_valid_before = cert.not_valid_before @@ -251,8 +253,13 @@ def generate_module_report(self): ''' + outbound_conns = self._tls_util.get_all_outbound_connections( + device_mac=self._device_mac, capture_files=pcap_files) + conn_table = self.generate_outbound_connection_table(outbound_conns) + html_content += summary_table + '\n'.join('\n' + tables for tables in cert_tables) + html_content += conn_table else: html_content += (''' @@ -322,17 +329,24 @@ def format_extension_value(self, value): f'crl_sign={value.crl_sign}') return str(value) # Fallback to string conversion - def generate_outbound_connection_table(self, ip_list): + def generate_outbound_connection_table(self, outbound_conns): """Generate just an HTML table from a list of IPs""" - html_content = """ - - - - - - """ - - rows = [f'\t' for ip in ip_list] + html_content = ''' +

Outbound Connections

+
Destination IP
{ip}
+ + + + + + + + ''' + + rows = [ + f'\t' + for ip, port in outbound_conns + ] html_content += '\n'.join(rows) # Close the table diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 8a1605b78..1554eba8d 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -25,7 +25,7 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from ipaddress import IPv4Address -from scapy.all import rdpcap, IP, Ether +from scapy.all import rdpcap, IP, Ether, TCP, UDP LOG_NAME = 'tls_util' LOGGER = None @@ -64,25 +64,34 @@ def __init__(self, def get_all_outbound_connections(self, device_mac, capture_files): """Process multiple pcap files and combine unique IP destinations.""" - ip_destinations = set() + outbound_conns = set() for capture in capture_files: ips = self.get_outbound_connections(device_mac=device_mac, capture_file=capture) - ip_destinations.update(ips) - return list(ip_destinations) + outbound_conns.update(ips) + return list(outbound_conns) def get_outbound_connections(self, device_mac, capture_file): - """Extract unique IP destinations from a single pcap file based on the - known MAC address.""" + """Extract unique IP and port destinations from a single pcap file + based on the known MAC address.""" packets = rdpcap(capture_file) - ip_destinations = set() + outbound_conns = set() for packet in packets: if Ether in packet and IP in packet: if packet[Ether].src == device_mac: ip_dst = packet[IP].dst + port_dst = 'Unknown' + + # Check if the packet has TCP or UDP layer to get the destination port + if TCP in packet: + port_dst = packet[TCP].dport + elif UDP in packet: + port_dst = packet[UDP].dport + if self.is_external_ip(ip_dst): - ip_destinations.add(ip_dst) - return ip_destinations + outbound_conns.add((ip_dst, port_dst)) + + return outbound_conns def is_external_ip(self, ip): """Check if the IP is an external (non-private) IP address."""
Destination IPPort
{ip}{port}