@@ -264,8 +264,8 @@ def generate_header(self, json_data):
tr_img_b64 = base64.b64encode(f.read()).decode('utf-8')
return f'''
'''
@@ -299,7 +299,7 @@ def generate_summary(self, json_data):
summary += '''
- Device Configuration
+
Device Configuration
'''
@@ -379,7 +379,7 @@ def generate_result_summary_item(self, key, value, style=None):
def generate_device_summary_label(self, key, value, trailing_space=True):
label = f'''
-
{key}
+
{key}
{value}
'''
if trailing_space:
@@ -457,18 +457,30 @@ def generate_css(self):
position: relative;
}
- .header-text {
+ h1 {
margin: 0 0 8px 0;
font-size: 20px;
font-weight: 400;
}
- .header-title {
+ h2 {
margin: 0px;
font-size: 48px;
font-weight: 700;
}
+ h3 {
+ font-size: 24px;
+ }
+
+ h4 {
+ font-size: 12px;
+ font-weight: 500;
+ color: #5F6368;
+ margin-bottom: 0;
+ margin-top: 0;
+ }
+
/* Define the summary related css elements*/
.summary-content {
position: relative;
@@ -480,9 +492,6 @@ def generate_css(self):
.summary-item-label {
position: relative;
- font-size: 12px;
- font-weight: 500;
- color: #5F6368;
}
.summary-item-value {
diff --git a/framework/python/src/test_orc/test_orchestrator.py b/framework/python/src/test_orc/test_orchestrator.py
index d9b25108d..488bc880a 100644
--- a/framework/python/src/test_orc/test_orchestrator.py
+++ b/framework/python/src/test_orc/test_orchestrator.py
@@ -24,6 +24,7 @@
from common.testreport import TestReport
from test_orc.module import TestModule
from test_orc.test_case import TestCase
+import threading
LOG_NAME = "test_orc"
LOGGER = logger.get_logger("test_orc")
@@ -35,7 +36,6 @@
LOCAL_DEVICE_REPORTS = "local/devices/{device_folder}/reports"
DEVICE_ROOT_CERTS = "local/root_certs"
TESTRUN_DIR = "/usr/local/testrun"
-API_URL = "http://localhost:8000"
class TestOrchestrator:
@@ -43,7 +43,10 @@ class TestOrchestrator:
def __init__(self, session, net_orc):
self._test_modules = []
+ self._container_logs = []
self._session = session
+ self._api_url = (self._session.get_api_url() + ":" +
+ str(self._session.get_api_port()))
self._net_orc = net_orc
self._test_in_progress = False
self._path = os.path.dirname(
@@ -88,7 +91,7 @@ def run_test_modules(self):
test_modules = []
for module in self._test_modules:
- if module is None or not module.enable_container or not module.enabled:
+ if module is None or not module.enable_container:
continue
if not self._is_module_enabled(module, device):
@@ -133,15 +136,15 @@ def _write_reports(self, test_report):
LOGGER.debug(f"Writing reports to {out_dir}")
# Write the json report
- with open(os.path.join(out_dir,"report.json"),"w", encoding="utf-8") as f:
+ with open(os.path.join(out_dir, "report.json"), "w", encoding="utf-8") as f:
json.dump(test_report.to_json(), f, indent=2)
# Write the html report
- with open(os.path.join(out_dir,"report.html"),"w", encoding="utf-8") as f:
+ with open(os.path.join(out_dir, "report.html"), "w", encoding="utf-8") as f:
f.write(test_report.to_html())
# Write the pdf report
- with open(os.path.join(out_dir,"report.pdf"),"wb") as f:
+ with open(os.path.join(out_dir, "report.pdf"), "wb") as f:
f.write(test_report.to_pdf().getvalue())
util.run_command(f"chown -R {self._host_user} {out_dir}")
@@ -157,11 +160,10 @@ def _generate_report(self):
report["status"] = self._calculate_result()
report["tests"] = self.get_session().get_report_tests()
report["report"] = (
- API_URL + "/" +
- SAVED_DEVICE_REPORTS.replace("{device_folder}",
- self.get_session().get_target_device().device_folder) +
- self.get_session().get_started().strftime("%Y-%m-%dT%H:%M:%S")
- )
+ self._api_url + "/" + SAVED_DEVICE_REPORTS.replace(
+ "{device_folder}",
+ self.get_session().get_target_device().device_folder) +
+ self.get_session().get_started().strftime("%Y-%m-%dT%H:%M:%S"))
return report
@@ -229,18 +231,14 @@ def _find_oldest_test(self, completed_tests_dir):
def _timestamp_results(self, device):
# Define the current device results directory
- cur_results_dir = os.path.join(
- self._root_path,
- RUNTIME_DIR,
- device.mac_addr.replace(":", "")
- )
+ cur_results_dir = os.path.join(self._root_path, RUNTIME_DIR,
+ device.mac_addr.replace(":", ""))
- # Define the directory
+ # Define the directory
completed_results_dir = os.path.join(
- self._root_path,
- LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder),
- self.get_session().get_started().strftime("%Y-%m-%dT%H:%M:%S")
- )
+ self._root_path,
+ LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder),
+ self.get_session().get_started().strftime("%Y-%m-%dT%H:%M:%S"))
# Copy the results to the timestamp directory
# leave current copy in place for quick reference to
@@ -254,12 +252,18 @@ def test_in_progress(self):
return self._test_in_progress
def _is_module_enabled(self, module, device):
+
+ # Enable module as fallback
enabled = True
if device.test_modules is not None:
test_modules = device.test_modules
if module.name in test_modules:
if "enabled" in test_modules[module.name]:
enabled = test_modules[module.name]["enabled"]
+ else:
+ # Module has not been specified in the device config
+ enabled = module.enabled
+
return enabled
def _run_test_module(self, module):
@@ -271,19 +275,20 @@ def _run_test_module(self, module):
device = self._session.get_target_device()
- LOGGER.info("Running test module " + module.name)
+ LOGGER.info(f"Running test module {module.name}")
try:
device_test_dir = os.path.join(self._root_path, RUNTIME_DIR,
device.mac_addr.replace(":", ""))
- root_certs_dir = os.path.join(self._root_path,DEVICE_ROOT_CERTS)
-
+ root_certs_dir = os.path.join(self._root_path, DEVICE_ROOT_CERTS)
container_runtime_dir = os.path.join(device_test_dir, module.name)
os.makedirs(container_runtime_dir, exist_ok=True)
+ container_log_file = os.path.join(container_runtime_dir, "module.log")
+
network_runtime_dir = os.path.join(self._root_path, "runtime/network")
device_startup_capture = os.path.join(device_test_dir, "startup.pcap")
@@ -324,13 +329,13 @@ def _run_test_module(self, module):
read_only=True)
],
environment={
- "TZ": self.get_session().get_timezone(),
- "HOST_USER": self._host_user,
- "DEVICE_MAC": device.mac_addr,
- "IPV4_ADDR": device.ip_addr,
- "DEVICE_TEST_MODULES": json.dumps(device.test_modules),
- "IPV4_SUBNET": self._net_orc.network_config.ipv4_network,
- "IPV6_SUBNET": self._net_orc.network_config.ipv6_network
+ "TZ": self.get_session().get_timezone(),
+ "HOST_USER": self._host_user,
+ "DEVICE_MAC": device.mac_addr,
+ "IPV4_ADDR": device.ip_addr,
+ "DEVICE_TEST_MODULES": json.dumps(device.test_modules),
+ "IPV4_SUBNET": self._net_orc.network_config.ipv4_network,
+ "IPV6_SUBNET": self._net_orc.network_config.ipv6_network
})
except (docker.errors.APIError,
docker.errors.ContainerError) as container_error:
@@ -347,20 +352,25 @@ def _run_test_module(self, module):
test_module_timeout = time.time() + module.timeout
status = self._get_module_status(module)
+ # Resolving container logs is blocking so we need to spawn a new thread
log_stream = module.container.logs(stream=True, stdout=True, stderr=True)
+ log_thread = threading.Thread(target=self._get_container_logs,
+ args=(log_stream, ))
+ log_thread.daemon = True
+ log_thread.start()
+
while (status == "running" and self._session.get_status() == "In Progress"):
if time.time() > test_module_timeout:
LOGGER.error("Module timeout exceeded, killing module: " + module.name)
- self._stop_module(module=module,kill=True)
+ self._stop_module(module=module, kill=True)
break
- try:
- line = next(log_stream).decode("utf-8").strip()
- if re.search(LOG_REGEX, line):
- print(line)
- except Exception: # pylint: disable=W0718
- time.sleep(1)
status = self._get_module_status(module)
+ # Save all container logs to file
+ with open(container_log_file, "w", encoding="utf-8") as f:
+ for line in self._container_logs:
+ f.write(line + "\n")
+
# Check that Testrun has not been stopped whilst this module was running
if self.get_session().get_status() == "Stopping":
# Discard results for this module
@@ -386,6 +396,20 @@ def _run_test_module(self, module):
LOGGER.info(f"Test module {module.name} has finished")
+ # Resolve all current log data in the containers log_stream
+ # this method is blocking so should be called in
+ # a thread or within a proper blocking context
+ def _get_container_logs(self, log_stream):
+ self._container_logs = []
+ for log_chunk in log_stream:
+ lines = log_chunk.decode("utf-8").splitlines()
+ # Process each line and strip blank space
+ processed_lines = [line.strip() for line in lines if line.strip()]
+ self._container_logs.extend(processed_lines)
+ for line in lines:
+ if re.search(LOG_REGEX, line):
+ print(line)
+
def _get_module_status(self, module):
container = self._get_module_container(module)
if container is not None:
@@ -459,13 +483,12 @@ def _load_test_module(self, module_dir):
for test_case_json in module_json["config"]["tests"]:
try:
test_case = TestCase(
- name=test_case_json["name"],
- description=test_case_json["test_description"],
- expected_behavior=test_case_json["expected_behavior"],
- required_result=test_case_json["required_result"]
- )
+ name=test_case_json["name"],
+ description=test_case_json["test_description"],
+ expected_behavior=test_case_json["expected_behavior"],
+ required_result=test_case_json["required_result"])
module.tests.append(test_case)
- except Exception as error:
+ except Exception as error: # pylint: disable=W0718
LOGGER.error("Failed to load test case. See error for details")
LOGGER.error(error)
diff --git a/framework/requirements.txt b/framework/requirements.txt
index 3ce048ba3..8b4854763 100644
--- a/framework/requirements.txt
+++ b/framework/requirements.txt
@@ -11,7 +11,7 @@ scapy
weasyprint
# Requirements for the API
-fastapi==0.99.1
+fastapi==0.109.1
psutil
uvicorn
pydantic==1.10.11
diff --git a/make/DEBIAN/control b/make/DEBIAN/control
index 6fa63afeb..d85397606 100644
--- a/make/DEBIAN/control
+++ b/make/DEBIAN/control
@@ -1,5 +1,5 @@
Package: Testrun
-Version: 1.1.1
+Version: 1.1.2
Architecture: amd64
Maintainer: Google
Homepage: https://github.com/google/testrun
diff --git a/modules/network/base/base.Dockerfile b/modules/network/base/base.Dockerfile
index a9317e85c..b30f6a7d9 100644
--- a/modules/network/base/base.Dockerfile
+++ b/modules/network/base/base.Dockerfile
@@ -13,7 +13,7 @@
# limitations under the License.
# Image name: test-run/base
-FROM ubuntu:jammy
+FROM ubuntu@sha256:e6173d4dc55e76b87c4af8db8821b1feae4146dd47341e4d431118c7dd060a74
RUN apt-get update
diff --git a/modules/network/gateway/gateway.Dockerfile b/modules/network/gateway/gateway.Dockerfile
index d15d31610..885e4a9f0 100644
--- a/modules/network/gateway/gateway.Dockerfile
+++ b/modules/network/gateway/gateway.Dockerfile
@@ -19,7 +19,7 @@ ARG MODULE_NAME=gateway
ARG MODULE_DIR=modules/network/$MODULE_NAME
# Install required packages
-RUN apt-get install -y iptables isc-dhcp-client
+RUN apt-get update && apt-get install -y iptables isc-dhcp-client
# Copy over all configuration files
COPY $MODULE_DIR/conf /testrun/conf
diff --git a/modules/test/base/base.Dockerfile b/modules/test/base/base.Dockerfile
index 878273055..9c9f095cf 100644
--- a/modules/test/base/base.Dockerfile
+++ b/modules/test/base/base.Dockerfile
@@ -13,7 +13,7 @@
# limitations under the License.
# Image name: test-run/base-test
-FROM ubuntu:jammy
+FROM ubuntu@sha256:e6173d4dc55e76b87c4af8db8821b1feae4146dd47341e4d431118c7dd060a74
ARG MODULE_NAME=base
ARG MODULE_DIR=modules/test/$MODULE_NAME
diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py
index a462f15fe..9cc51fba3 100644
--- a/modules/test/base/python/src/test_module.py
+++ b/modules/test/base/python/src/test_module.py
@@ -91,10 +91,15 @@ def run_tests(self):
LOGGER.debug('Attempting to run test: ' + test['name'])
# Resolve the correct python method by test name and run test
if hasattr(self, test_method_name):
- if 'config' in test:
- result = getattr(self, test_method_name)(config=test['config'])
- else:
- result = getattr(self, test_method_name)()
+ try:
+ if 'config' in test:
+ result = getattr(self, test_method_name)(config=test['config'])
+ else:
+ result = getattr(self, test_method_name)()
+ except Exception as e:
+ LOGGER.info(f'An error occurred whilst running {test["name"]}')
+ LOGGER.error(e)
+ return None
else:
LOGGER.info(f'Test {test["name"]} not implemented. Skipping')
result = None
diff --git a/modules/test/tls/bin/check_cert_signature.sh b/modules/test/tls/bin/check_cert_signature.sh
index ebd4a7549..37ea0f187 100644
--- a/modules/test/tls/bin/check_cert_signature.sh
+++ b/modules/test/tls/bin/check_cert_signature.sh
@@ -1,5 +1,19 @@
#!/bin/bash
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
ROOT_CERT=$1
DEVICE_CERT=$2
diff --git a/modules/test/tls/bin/get_ciphers.sh b/modules/test/tls/bin/get_ciphers.sh
index e82bbc180..3896af388 100644
--- a/modules/test/tls/bin/get_ciphers.sh
+++ b/modules/test/tls/bin/get_ciphers.sh
@@ -1,5 +1,19 @@
#!/bin/bash
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
CAPTURE_FILE=$1
DST_IP=$2
DST_PORT=$3
diff --git a/modules/test/tls/bin/get_client_hello_packets.sh b/modules/test/tls/bin/get_client_hello_packets.sh
index 13e42f791..03cfe903c 100644
--- a/modules/test/tls/bin/get_client_hello_packets.sh
+++ b/modules/test/tls/bin/get_client_hello_packets.sh
@@ -1,5 +1,19 @@
#!/bin/bash
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
CAPTURE_FILE=$1
SRC_IP=$2
TLS_VERSION=$3
@@ -8,9 +22,9 @@ TSHARK_OUTPUT="-T json -e ip.src -e tcp.dstport -e ip.dst"
TSHARK_FILTER="ssl.handshake.type==1 and ip.src==$SRC_IP"
if [[ $TLS_VERSION == '1.2' || -z $TLS_VERSION ]];then
- TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.version==0x0303"
-elif [ $TLS_VERSION == '1.2' ];then
- TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.version==0x0304"
+ TSHARK_FILTER="$TSHARK_FILTER and ssl.handshake.version==0x0303"
+elif [ $TLS_VERSION == '1.3' ];then
+ TSHARK_FILTER="$TSHARK_FILTER and (ssl.handshake.version==0x0304 or tls.handshake.extensions.supported_version==0x0304)"
fi
response=$(tshark -r $CAPTURE_FILE $TSHARK_OUTPUT $TSHARK_FILTER)
diff --git a/modules/test/tls/bin/get_handshake_complete.sh b/modules/test/tls/bin/get_handshake_complete.sh
index de1eb887d..a2a6dc222 100644
--- a/modules/test/tls/bin/get_handshake_complete.sh
+++ b/modules/test/tls/bin/get_handshake_complete.sh
@@ -1,5 +1,19 @@
#!/bin/bash
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
CAPTURE_FILE=$1
SRC_IP=$2
DST_IP=$3
diff --git a/modules/test/tls/bin/get_non_tls_client_connections.sh b/modules/test/tls/bin/get_non_tls_client_connections.sh
new file mode 100644
index 000000000..08ed19090
--- /dev/null
+++ b/modules/test/tls/bin/get_non_tls_client_connections.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CAPTURE_FILE=$1
+SRC_IP=$2
+
+TSHARK_OUTPUT="-T json -e ip.src -e tcp.dstport -e ip.dst"
+# Filter out TLS, DNS and NTP, ICMP (ping), braodcast and multicast packets
+# - NTP and DNS traffic is not encrypted and if invalid NTP and/or DNS traffic has been detected
+# this will be handled by their respective test modules.
+# - Multicast and braodcast protocols are not typically encrypted so we aren't expecting them to
+# be over TLS connections
+# - ICMP (ping) requests are not encrypted so we also need to ignore these
+TSHARK_FILTER="ip.src == $SRC_IP and not tls and not dns and not ntp and not icmp and not(ip.dst == 224.0.0.0/4 or ip.dst == 255.255.255.255)"
+
+response=$(tshark -r $CAPTURE_FILE $TSHARK_OUTPUT $TSHARK_FILTER)
+
+echo "$response"
+
\ No newline at end of file
diff --git a/modules/test/tls/bin/get_tls_client_connections.sh b/modules/test/tls/bin/get_tls_client_connections.sh
new file mode 100644
index 000000000..2486a5f9a
--- /dev/null
+++ b/modules/test/tls/bin/get_tls_client_connections.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CAPTURE_FILE=$1
+SRC_IP=$2
+
+TSHARK_OUTPUT="-T json -e ip.src -e tcp.dstport -e ip.dst"
+TSHARK_FILTER="ip.src == $SRC_IP and tls"
+
+response=$(tshark -r $CAPTURE_FILE $TSHARK_OUTPUT $TSHARK_FILTER)
+
+echo "$response"
+
\ No newline at end of file
diff --git a/modules/test/tls/bin/get_tls_packets.sh b/modules/test/tls/bin/get_tls_packets.sh
new file mode 100644
index 000000000..e06a51571
--- /dev/null
+++ b/modules/test/tls/bin/get_tls_packets.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CAPTURE_FILE=$1
+SRC_IP=$2
+TLS_VERSION=$3
+
+TSHARK_OUTPUT="-T json -e ip.src -e tcp.dstport -e ip.dst"
+# Handshakes will still report TLS version 1 even for TLS 1.2 connections
+# so we need to filter thes out
+TSHARK_FILTER="ip.src==$SRC_IP and ssl.handshake.type!=1"
+
+if [ $TLS_VERSION == '1.0' ];then
+ TSHARK_FILTER="$TSHARK_FILTER and ssl.record.version==0x0301"
+elif [ $TLS_VERSION == '1.1' ];then
+ TSHARK_FILTER="$TSHARK_FILTER and ssl.record.version==0x0302"
+elif [ $TLS_VERSION == '1.2' ];then
+ TSHARK_FILTER="$TSHARK_FILTER and ssl.record.version==0x0303"
+elif [ $TLS_VERSION == '1.3' ];then
+ TSHARK_FILTER="$TSHARK_FILTER and ssl.record.version==0x0304"
+fi
+
+response=$(tshark -r $CAPTURE_FILE $TSHARK_OUTPUT $TSHARK_FILTER)
+
+echo "$response"
+
\ No newline at end of file
diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py
index 970152768..1d4824d31 100644
--- a/modules/test/tls/python/src/tls_module.py
+++ b/modules/test/tls/python/src/tls_module.py
@@ -19,6 +19,7 @@
LOGGER = None
STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap'
MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap'
+GATEWAY_CAPTURE_FILE = '/runtime/network/gateway.pcap'
class TLSModule(TestModule):
@@ -77,30 +78,24 @@ def _security_tls_v1_3_client(self):
return None, 'Could not resolve device IP address'
def _validate_tls_client(self, client_ip, tls_version):
- monitor_result = self._tls_util.validate_tls_client(
+ client_results = self._tls_util.validate_tls_client(
client_ip=client_ip,
tls_version=tls_version,
- capture_file=MONITOR_CAPTURE_FILE)
- startup_result = self._tls_util.validate_tls_client(
- client_ip=client_ip,
- tls_version=tls_version,
- capture_file=STARTUP_CAPTURE_FILE)
-
- LOGGER.info('Montor: ' + str(monitor_result))
- LOGGER.info('Startup: ' + str(startup_result))
+ capture_files=[MONITOR_CAPTURE_FILE,STARTUP_CAPTURE_FILE,
+ GATEWAY_CAPTURE_FILE])
- if (not monitor_result[0] and monitor_result[0] is not None) or (
- not startup_result[0] and startup_result[0] is not None):
- result = False, startup_result[1] + monitor_result[1]
- elif monitor_result[0] and startup_result[0]:
- result = True, startup_result[1] + monitor_result[1]
- elif monitor_result[0] and startup_result[0] is None:
- result = True, monitor_result[1]
- elif startup_result[0] and monitor_result[0] is None:
- result = True, startup_result[1]
+ # Generate results based on the state
+ result_message = 'No outbound connections were found.'
+ result_state = None
+ #If any of the packetes detect failed client comms, fail the test
+ if not client_results[0] and client_results[0] is not None:
+ result_state = False
+ result_message = client_results[1]
else:
- result = None, startup_result[1]
- return result
+ if client_results[0]:
+ result_state = True
+ result_message = client_results[1]
+ return result_state, result_message
def _resolve_device_ip(self):
# If the ipv4 address wasn't resolved yet, try again
diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py
index ff7a79c44..85ddcc012 100644
--- a/modules/test/tls/python/src/tls_util.py
+++ b/modules/test/tls/python/src/tls_util.py
@@ -19,6 +19,7 @@
import json
import os
from common import util
+import ipaddress
LOG_NAME = 'tls_util'
LOGGER = None
@@ -177,39 +178,35 @@ def validate_signature(self, host):
return True, 'Device signed by cert:' + root_cert
else:
LOGGER.info('Device not signed by cert: ' + root_cert)
- except Exception as e: # pylint: disable=W0718
+ except Exception as e: # pylint: disable=W0718
LOGGER.error('Failed to check cert:' + root_cert)
LOGGER.error(str(e))
return False, 'Device certificate has not been signed'
def process_tls_server_results(self, tls_1_2_results, tls_1_3_results):
results = ''
- if tls_1_2_results[0] is None and tls_1_3_results[0]:
- results = True, 'TLS 1.3 validated: ' + tls_1_3_results[1]
- elif tls_1_3_results[0] is None and tls_1_2_results[0]:
- results = True, 'TLS 1.2 validated: ' + tls_1_2_results[1]
- elif tls_1_2_results[0] and tls_1_3_results[0]:
- description = 'TLS 1.2 validated: ' + tls_1_2_results[1] + '. '
- description += '\nTLS 1.3 validated: ' + tls_1_3_results[1] + '. '
- results = True, description
- elif tls_1_2_results[0] and not tls_1_3_results[0]:
- description = 'TLS 1.2 validated: ' + tls_1_2_results[1] + '. '
- description += '\nTLS 1.3 not validated: ' + tls_1_3_results[1] + '. '
- results = True, description
- elif tls_1_3_results[0] and not tls_1_2_results[0]:
- description = 'TLS 1.2 not validated: ' + tls_1_2_results[1] + '. '
- description += 'TLS 1.3 validated: ' + tls_1_3_results[1] + '. '
- results = True, description
- elif not tls_1_3_results[0] and not tls_1_2_results[0] and tls_1_2_results[
- 0] is not None and tls_1_3_results is not None:
- description = 'TLS 1.2 not validated:' + tls_1_2_results[1] + '. '
- description += 'TLS 1.3 not validated: ' + tls_1_3_results[1] + '. '
- results = False, description
+ if tls_1_2_results[0] is None and tls_1_3_results[0] is not None:
+ # Validate only TLS 1.3 results
+ description = 'TLS 1.3' + (' not' if not tls_1_3_results[
+ 0] else '') + ' validated: ' + tls_1_3_results[1]
+ results = tls_1_3_results[0], description
+ elif tls_1_3_results[0] is None and tls_1_2_results[0] is not None:
+ # Vaidate only TLS 1.2 results
+ description = 'TLS 1.2' + (' not' if not tls_1_2_results[
+ 0] else '') + ' validated: ' + tls_1_2_results[1]
+ results = tls_1_2_results[0], description
+ elif tls_1_3_results[0] is not None and tls_1_2_results[0] is not None:
+ # Validate both results
+ description = 'TLS 1.2' + (' not' if not tls_1_2_results[
+ 0] else '') + ' validated: ' + tls_1_2_results[1]
+ description += '\nTLS 1.3' + (' not' if not tls_1_3_results[
+ 0] else '') + ' validated: ' + tls_1_3_results[1]
+ results = tls_1_2_results[0] or tls_1_3_results[0], description
else:
- description = 'TLS 1.2 not validated: ' + tls_1_2_results[1] + '. '
- description += 'TLS 1.3 not validated: ' + tls_1_3_results[1] + '. '
+ description = f'TLS 1.2 not validated: {tls_1_2_results[1]}'
+ description += f'\nTLS 1.3 not validated: {tls_1_3_results[1]}'
results = None, description
- LOGGER.info('TLS 1.2 server test results: ' + str(results))
+ LOGGER.info('TLS server test results: ' + str(results))
return results
def validate_tls_server(self, host, tls_version):
@@ -261,22 +258,74 @@ def get_ciphers(self, capture_file, dst_ip, dst_port):
ciphers = response[0].split('\n')
return ciphers
- def get_hello_packets(self, capture_file, src_ip, tls_version):
- bin_file = self._bin_dir + '/get_client_hello_packets.sh'
- args = f'{capture_file} {src_ip} {tls_version}'
- command = f'{bin_file} {args}'
- response = util.run_command(command)
- packets = response[0].strip()
- return self.parse_hello_packets(json.loads(packets), capture_file)
-
- def get_handshake_complete(self, capture_file, src_ip, dst_ip, tls_version):
- bin_file = self._bin_dir + '/get_handshake_complete.sh'
- args = f'{capture_file} {src_ip} {dst_ip} {tls_version}'
- command = f'{bin_file} {args}'
- response = util.run_command(command)
- return response
-
- def parse_hello_packets(self, packets, capture_file):
+ def get_hello_packets(self, capture_files, src_ip, tls_version):
+ combined_results = []
+ for capture_file in capture_files:
+ bin_file = self._bin_dir + '/get_client_hello_packets.sh'
+ args = f'{capture_file} {src_ip} {tls_version}'
+ command = f'{bin_file} {args}'
+ response = util.run_command(command)
+ packets = response[0].strip()
+ if len(packets) > 0:
+ # Parse each packet and append key-value pairs to combined_results
+ result = self.parse_packets(json.loads(packets), capture_file)
+ combined_results.extend(result)
+ return combined_results
+
+ def get_handshake_complete(self, capture_files, src_ip, dst_ip, tls_version):
+ combined_results = ''
+ for capture_file in capture_files:
+ bin_file = self._bin_dir + '/get_handshake_complete.sh'
+ args = f'{capture_file} {src_ip} {dst_ip} {tls_version}'
+ command = f'{bin_file} {args}'
+ response = util.run_command(command)
+ if len(response) > 0:
+ combined_results += response[0]
+ return combined_results
+
+ # Resolve all connections from the device that don't use TLS
+ def get_non_tls_packetes(self, client_ip, capture_files):
+ combined_packets = []
+ for capture_file in capture_files:
+ bin_file = self._bin_dir + '/get_non_tls_client_connections.sh'
+ args = f'{capture_file} {client_ip}'
+ command = f'{bin_file} {args}'
+ response = util.run_command(command)
+ if len(response) > 0:
+ packets = json.loads(response[0].strip())
+ combined_packets.extend(packets)
+ return combined_packets
+
+ # Resolve all connections from the device that use TLS
+ def get_tls_client_connection_packetes(self, client_ip, capture_files):
+ combined_packets = []
+ for capture_file in capture_files:
+ bin_file = self._bin_dir + '/get_tls_client_connections.sh'
+ args = f'{capture_file} {client_ip}'
+ command = f'{bin_file} {args}'
+ response = util.run_command(command)
+ packets = json.loads(response[0].strip())
+ combined_packets.extend(packets)
+ return combined_packets
+
+ # Resolve any TLS packets for the specified version. Does not care if the
+ # connections are established or any other validation only
+ # that there is some level of connection attempt from the device
+ # using the TLS version specified.
+ def get_tls_packets(self, capture_files, src_ip, tls_version):
+ combined_results = []
+ for capture_file in capture_files:
+ bin_file = self._bin_dir + '/get_tls_packets.sh'
+ args = f'{capture_file} {src_ip} {tls_version}'
+ command = f'{bin_file} {args}'
+ response = util.run_command(command)
+ packets = response[0].strip()
+ # Parse each packet and append key-value pairs to combined_results
+ result = self.parse_packets(json.loads(packets), capture_file)
+ combined_results.extend(result)
+ return combined_results
+
+ def parse_packets(self, packets, capture_file):
hello_packets = []
for packet in packets:
# Extract all the basic IP information about the packet
@@ -300,7 +349,7 @@ def parse_hello_packets(self, packets, capture_file):
hello_packets.append(hello_packet)
return hello_packets
- def process_hello_packets(self,hello_packets, tls_version = '1.2'):
+ def process_hello_packets(self, hello_packets, tls_version='1.2'):
# Validate the ciphers only for tls 1.2
client_hello_results = {'valid': [], 'invalid': []}
if tls_version == '1.2':
@@ -327,10 +376,87 @@ def process_hello_packets(self,hello_packets, tls_version = '1.2'):
client_hello_results['valid'] = hello_packets
return client_hello_results
- def validate_tls_client(self, client_ip, tls_version, capture_file):
+ # Check if the device has made any outbound connections that don't
+ # use TLS. Since some protocols do use non-encrypted methods (NTP, DHCP, etc.)
+ # we will assume any local connections using the same IP subnet as our
+ # local network are approved and only connections to IP addresses outside
+ # our network will be flagged.
+ def get_non_tls_client_connection_ips(self, client_ip, capture_files):
+ LOGGER.info('Checking client for non-TLS client connections')
+ packets = self.get_non_tls_packetes(client_ip=client_ip,
+ capture_files=capture_files)
+
+ # Extract the subnet from the client IP address
+ src_ip = ipaddress.ip_address(client_ip)
+ src_subnet = ipaddress.ip_network(src_ip, strict=False)
+ subnet_with_mask = ipaddress.ip_network(
+ src_subnet, strict=False).supernet(new_prefix=24)
+
+ non_tls_dst_ips = set() # Store unique destination IPs
+ for packet in packets:
+ # Check if an IP address is within the specified subnet.
+ dst_ip = ipaddress.ip_address(packet['_source']['layers']['ip.dst'][0])
+ if not dst_ip in subnet_with_mask:
+ non_tls_dst_ips.add(str(dst_ip))
+
+ return non_tls_dst_ips
+
+ # Check if the device has made any outbound connections that don't
+ # use TLS. Since some protocols do use non-encrypted methods (NTP, DHCP, etc.)
+ # we will assume any local connections using the same IP subnet as our
+ # local network are approved and only connections to IP addresses outside
+ # our network will be flagged.
+ def get_unsupported_tls_ips(self, client_ip, capture_files):
+ LOGGER.info('Checking client for unsupported TLS client connections')
+ tls_1_0_packets = self.get_tls_packets(capture_files, client_ip, '1.0')
+ tls_1_1_packets = self.get_tls_packets(capture_files, client_ip, '1.1')
+
+ unsupported_tls_dst_ips = {}
+ if len(tls_1_0_packets) > 0:
+ for packet in tls_1_0_packets:
+ dst_ip = packet['dst_ip']
+ tls_version = '1.0'
+ if dst_ip not in unsupported_tls_dst_ips:
+ LOGGER.info(f'''Unsupported TLS {tls_version}
+ connections detected to {dst_ip}''')
+ unsupported_tls_dst_ips[dst_ip] = [tls_version]
+
+ if len(tls_1_1_packets) > 0:
+ for packet in tls_1_1_packets:
+ dst_ip = packet['dst_ip']
+ tls_version = '1.1'
+ # Check if the IP is already in the dictionary
+ if dst_ip in unsupported_tls_dst_ips:
+ # If the IP is already present, append the new TLS version to the
+ # list
+ unsupported_tls_dst_ips[dst_ip].append(tls_version)
+ else:
+ # If the IP is not present, create a new list with the current
+ # TLS version
+ LOGGER.info(f'''Unsupported TLS {tls_version} connections detected
+ to {dst_ip}''')
+ unsupported_tls_dst_ips[dst_ip] = [tls_version]
+ return unsupported_tls_dst_ips
+
+ # Check if the device has made any outbound connections that use any
+ # version of TLS.
+ def get_tls_client_connection_ips(self, client_ip, capture_files):
+ LOGGER.info('Checking client for TLS client connections')
+ packets = self.get_tls_client_connection_packetes(
+ client_ip=client_ip, capture_files=capture_files)
+
+ tls_dst_ips = set() # Store unique destination IPs
+ for packet in packets:
+ dst_ip = ipaddress.ip_address(packet['_source']['layers']['ip.dst'][0])
+ tls_dst_ips.add(str(dst_ip))
+ return tls_dst_ips
+
+ def validate_tls_client(self, client_ip, tls_version, capture_files):
LOGGER.info('Validating client for TLS: ' + tls_version)
- hello_packets = self.get_hello_packets(capture_file, client_ip, tls_version)
- client_hello_results = self.process_hello_packets(hello_packets,tls_version)
+ hello_packets = self.get_hello_packets(capture_files, client_ip,
+ tls_version)
+ client_hello_results = self.process_hello_packets(hello_packets,
+ tls_version)
handshakes = {'complete': [], 'incomplete': []}
for packet in client_hello_results['valid']:
@@ -338,7 +464,7 @@ def validate_tls_client(self, client_ip, tls_version, capture_file):
if not packet['dst_ip'] in handshakes['complete'] and not packet[
'dst_ip'] in handshakes['incomplete']:
handshake_complete = self.get_handshake_complete(
- capture_file, packet['src_ip'], packet['dst_ip'], tls_version)
+ capture_files, packet['src_ip'], packet['dst_ip'], tls_version)
# One of the responses will be a complaint about running as root so
# we have to have at least 2 entries to consider a completed handshake
@@ -382,6 +508,35 @@ def validate_tls_client(self, client_ip, tls_version, capture_file):
else:
LOGGER.info('No client hello packets detected')
tls_client_details = 'No client hello packets detected'
+
+ # Resolve all non-TLS related client connections
+ non_tls_client_ips = self.get_non_tls_client_connection_ips(
+ client_ip, capture_files)
+
+ # Resolve all TLS related client connections
+ tls_client_ips = self.get_tls_client_connection_ips(client_ip,
+ capture_files)
+
+ # Filter out all outbound TLS connections regardless on whether
+ # or not they were validated. If they were not validated,
+ # they will already be failed by those tests and we only
+ # need to report true unencrypted oubound connections
+ if len(non_tls_client_ips) > 0:
+ for ip in non_tls_client_ips:
+ if ip not in tls_client_ips:
+ tls_client_valid = False
+ tls_client_details += f'''\nNon-TLS connection detected to {ip}'''
+ else:
+ LOGGER.info(f'''TLS connection detected to {ip}.
+ Ignoring non-TLS traffic detected to this IP''')
+
+ unsupported_tls_ips = self.get_unsupported_tls_ips(client_ip, capture_files)
+ if len(unsupported_tls_ips) > 0:
+ tls_client_valid = False
+ for ip, tls_versions in unsupported_tls_ips.items():
+ for tls_version in tls_versions:
+ tls_client_details += f'''\nUnsupported TLS {tls_version}
+ connection detected to {ip}'''
return tls_client_valid, tls_client_details
def is_ecdh_and_ecdsa(self, ciphers):
diff --git a/modules/ui/package-lock.json b/modules/ui/package-lock.json
index f5a5b5e2f..cf8b3e958 100644
--- a/modules/ui/package-lock.json
+++ b/modules/ui/package-lock.json
@@ -24,7 +24,7 @@
"zone.js": "~0.13.3"
},
"devDependencies": {
- "@angular-devkit/build-angular": "^16.2.11",
+ "@angular-devkit/build-angular": "^16.2.12",
"@angular-eslint/builder": "16.2.0",
"@angular-eslint/eslint-plugin": "16.2.0",
"@angular-eslint/eslint-plugin-template": "16.2.0",
@@ -72,12 +72,12 @@
}
},
"node_modules/@angular-devkit/architect": {
- "version": "0.1602.11",
- "resolved": "https://registry.npmjs.org/@angular-devkit/architect/-/architect-0.1602.11.tgz",
- "integrity": "sha512-qC1tPL/82gxqCS1z9pTpLn5NQH6uqbV6UNjbkFEQpTwEyWEK6VLChAJsybHHfbpssPS2HWf31VoUzX7RqDjoQQ==",
+ "version": "0.1602.12",
+ "resolved": "https://registry.npmjs.org/@angular-devkit/architect/-/architect-0.1602.12.tgz",
+ "integrity": "sha512-19Fwwfx+KvJ01SyI6cstRgqT9+cwer8Ro1T27t1JqlGyOX8tY3pV78ulwxy2+wCzPjR18V6W7cb7Cv6fyK4xog==",
"dev": true,
"dependencies": {
- "@angular-devkit/core": "16.2.11",
+ "@angular-devkit/core": "16.2.12",
"rxjs": "7.8.1"
},
"engines": {
@@ -87,15 +87,15 @@
}
},
"node_modules/@angular-devkit/build-angular": {
- "version": "16.2.11",
- "resolved": "https://registry.npmjs.org/@angular-devkit/build-angular/-/build-angular-16.2.11.tgz",
- "integrity": "sha512-yNzUiAeg1WHMsFG9IBg4S/7dsMcEAMYQ1I360ib80c0T/IwRb8pHhOokrl5Mu8zfNqZ/dxH4ItKY1uIMDmuMGQ==",
+ "version": "16.2.12",
+ "resolved": "https://registry.npmjs.org/@angular-devkit/build-angular/-/build-angular-16.2.12.tgz",
+ "integrity": "sha512-VVGKZ0N3gyR0DP7VrcZl4io3ruWYT94mrlyJsJMLlrYy/EX8JCvqrJC9c+dscrtKjhZzjwdyhszkJQY4JfwACA==",
"dev": true,
"dependencies": {
"@ampproject/remapping": "2.2.1",
- "@angular-devkit/architect": "0.1602.11",
- "@angular-devkit/build-webpack": "0.1602.11",
- "@angular-devkit/core": "16.2.11",
+ "@angular-devkit/architect": "0.1602.12",
+ "@angular-devkit/build-webpack": "0.1602.12",
+ "@angular-devkit/core": "16.2.12",
"@babel/core": "7.22.9",
"@babel/generator": "7.22.9",
"@babel/helper-annotate-as-pure": "7.22.5",
@@ -107,7 +107,7 @@
"@babel/runtime": "7.22.6",
"@babel/template": "7.22.5",
"@discoveryjs/json-ext": "0.5.7",
- "@ngtools/webpack": "16.2.11",
+ "@ngtools/webpack": "16.2.12",
"@vitejs/plugin-basic-ssl": "1.0.1",
"ansi-colors": "4.1.3",
"autoprefixer": "10.4.14",
@@ -150,7 +150,7 @@
"text-table": "0.2.0",
"tree-kill": "1.2.2",
"tslib": "2.6.1",
- "vite": "4.5.1",
+ "vite": "4.5.2",
"webpack": "5.88.2",
"webpack-dev-middleware": "6.1.1",
"webpack-dev-server": "4.15.1",
@@ -215,12 +215,12 @@
"dev": true
},
"node_modules/@angular-devkit/build-webpack": {
- "version": "0.1602.11",
- "resolved": "https://registry.npmjs.org/@angular-devkit/build-webpack/-/build-webpack-0.1602.11.tgz",
- "integrity": "sha512-2Au6xRMxNugFkXP0LS1TwNE5gAfGW4g6yxC9P5j5p3kdGDnAVaZRTOKB9dg73i3uXtJHUMciYOThV0b78XRxwA==",
+ "version": "0.1602.12",
+ "resolved": "https://registry.npmjs.org/@angular-devkit/build-webpack/-/build-webpack-0.1602.12.tgz",
+ "integrity": "sha512-1lmR4jCkxPJuAFXReesEY3CB+/5jSebGE5ry6qJJvNm6kuSc9bzfTytrcwosVY+Q7kAA2ij7kAYw0loGbTjLWA==",
"dev": true,
"dependencies": {
- "@angular-devkit/architect": "0.1602.11",
+ "@angular-devkit/architect": "0.1602.12",
"rxjs": "7.8.1"
},
"engines": {
@@ -234,9 +234,9 @@
}
},
"node_modules/@angular-devkit/core": {
- "version": "16.2.11",
- "resolved": "https://registry.npmjs.org/@angular-devkit/core/-/core-16.2.11.tgz",
- "integrity": "sha512-u3cEQHqhSMWyAFIaPdRukCJwEUJt7Fy3C02gTlTeCB4F/OnftVFIm2e5vmCqMo9rgbfdvjWj9V+7wWiCpMrzAQ==",
+ "version": "16.2.12",
+ "resolved": "https://registry.npmjs.org/@angular-devkit/core/-/core-16.2.12.tgz",
+ "integrity": "sha512-o6ziQs+EcEonFezrsA46jbZqkQrs4ckS1bAQj93g5ZjGtieUz8l/U3lclvKpL/iEzWkGVViSYuP2KyW2oqTDiQ==",
"dev": true,
"dependencies": {
"ajv": "8.12.0",
@@ -3820,9 +3820,9 @@
}
},
"node_modules/@ngtools/webpack": {
- "version": "16.2.11",
- "resolved": "https://registry.npmjs.org/@ngtools/webpack/-/webpack-16.2.11.tgz",
- "integrity": "sha512-4ndXJ4s94ZsryVGSDk/waIDrUqXqdGWftoOEn81Zu+nkL9ncI/G1fNUlSJ5OqeKmMLxMFouoy+BuJfvT+gEgnQ==",
+ "version": "16.2.12",
+ "resolved": "https://registry.npmjs.org/@ngtools/webpack/-/webpack-16.2.12.tgz",
+ "integrity": "sha512-f9R9Qsk8v+ffDxryl6PQ7Wnf2JCNd4dDXOH+d/AuF06VFiwcwGDRDZpmqkAXbFxQfcWTbT1FFvfoJ+SFcJgXLA==",
"dev": true,
"engines": {
"node": "^16.14.0 || >=18.10.0",
@@ -4412,9 +4412,9 @@
}
},
"node_modules/@types/express-serve-static-core": {
- "version": "4.17.41",
- "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.41.tgz",
- "integrity": "sha512-OaJ7XLaelTgrvlZD8/aa0vvvxZdUmlCn6MtWeB7TkiKW70BQLc9XEPpDLPdbo52ZhXUCrznlWdCHWxJWtdyajA==",
+ "version": "4.17.42",
+ "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.42.tgz",
+ "integrity": "sha512-ckM3jm2bf/MfB3+spLPWYPUH573plBFwpOhqQ2WottxYV85j1HQFlxmnTq57X1yHY9awZPig06hL/cLMgNWHIQ==",
"dev": true,
"dependencies": {
"@types/node": "*",
@@ -14543,9 +14543,9 @@
}
},
"node_modules/vite": {
- "version": "4.5.1",
- "resolved": "https://registry.npmjs.org/vite/-/vite-4.5.1.tgz",
- "integrity": "sha512-AXXFaAJ8yebyqzoNB9fu2pHoo/nWX+xZlaRwoeYUxEqBO+Zj4msE5G+BhGBll9lYEKv9Hfks52PAF2X7qDYXQA==",
+ "version": "4.5.2",
+ "resolved": "https://registry.npmjs.org/vite/-/vite-4.5.2.tgz",
+ "integrity": "sha512-tBCZBNSBbHQkaGyhGCDUGqeo2ph8Fstyp6FMSvTtsXeZSPpSMGlviAOav2hxVTqFcx8Hj/twtWKsMJXNY0xI8w==",
"dev": true,
"dependencies": {
"esbuild": "^0.18.10",
diff --git a/modules/ui/package.json b/modules/ui/package.json
index 46f775272..180ecb841 100644
--- a/modules/ui/package.json
+++ b/modules/ui/package.json
@@ -33,7 +33,7 @@
"zone.js": "~0.13.3"
},
"devDependencies": {
- "@angular-devkit/build-angular": "^16.2.11",
+ "@angular-devkit/build-angular": "^16.2.12",
"@angular-eslint/builder": "16.2.0",
"@angular-eslint/eslint-plugin": "16.2.0",
"@angular-eslint/eslint-plugin-template": "16.2.0",
diff --git a/modules/ui/src/app/components/general-settings/general-settings.component.html b/modules/ui/src/app/components/general-settings/general-settings.component.html
index c698e167f..3f951dcc9 100644
--- a/modules/ui/src/app/components/general-settings/general-settings.component.html
+++ b/modules/ui/src/app/components/general-settings/general-settings.component.html
@@ -149,7 +149,7 @@
class="save-button"
color="primary"
(click)="saveSetting()"
- [disabled]="!isFormValues || isLessThanTwoInterfaces">
+ [disabled]="!isFormValues || isLessThanOneInterface">
Save
diff --git a/modules/ui/src/app/components/general-settings/general-settings.component.spec.ts b/modules/ui/src/app/components/general-settings/general-settings.component.spec.ts
index e9b82ddca..08f9309e5 100644
--- a/modules/ui/src/app/components/general-settings/general-settings.component.spec.ts
+++ b/modules/ui/src/app/components/general-settings/general-settings.component.spec.ts
@@ -249,13 +249,9 @@ describe('GeneralSettingsComponent', () => {
});
});
- describe('with intefaces lenght less then two', () => {
+ describe('with interfaces length less than one', () => {
beforeEach(() => {
- component.interfaces = { mockDeviceValue: 'mockDeviceValue' };
- testRunServiceMock.systemConfig$ = of(MOCK_SYSTEM_CONFIG_WITH_DATA);
- testRunServiceMock.getSystemConfig.and.returnValue(
- of(MOCK_SYSTEM_CONFIG_WITH_DATA)
- );
+ component.interfaces = {};
fixture.detectChanges();
});
diff --git a/modules/ui/src/app/components/general-settings/general-settings.component.ts b/modules/ui/src/app/components/general-settings/general-settings.component.ts
index 5f34ff98c..d9bbecea1 100644
--- a/modules/ui/src/app/components/general-settings/general-settings.component.ts
+++ b/modules/ui/src/app/components/general-settings/general-settings.component.ts
@@ -79,8 +79,8 @@ export class GeneralSettingsComponent implements OnInit, OnDestroy {
return this.settingForm.hasError('hasSameValues');
}
- get isLessThanTwoInterfaces(): boolean {
- return Object.keys(this.interfaces).length < 2;
+ get isLessThanOneInterface(): boolean {
+ return Object.keys(this.interfaces).length < 1;
}
constructor(
diff --git a/modules/ui/src/app/services/test-run.service.ts b/modules/ui/src/app/services/test-run.service.ts
index 37d46f23b..918204f17 100644
--- a/modules/ui/src/app/services/test-run.service.ts
+++ b/modules/ui/src/app/services/test-run.service.ts
@@ -28,7 +28,7 @@ import {
} from '../model/testrun-status';
import { Version } from '../model/version';
-const API_URL = 'http://localhost:8000';
+const API_URL = `http://${window.location.hostname}:8000`;
export type SystemInterfaces = {
[key: string]: string;
diff --git a/modules/ui/ui.Dockerfile b/modules/ui/ui.Dockerfile
index 2b4081470..da56be93e 100644
--- a/modules/ui/ui.Dockerfile
+++ b/modules/ui/ui.Dockerfile
@@ -13,14 +13,14 @@
# limitations under the License.
# Image name: test-run/ui
-FROM node:20 as build
+FROM node@sha256:ffebb4405810c92d267a764b21975fb2d96772e41877248a37bf3abaa0d3b590 as build
WORKDIR /modules/ui
COPY modules/ui/ /modules/ui
RUN npm install
RUN npm run build
-FROM nginx:1.25.1
+FROM nginx@sha256:4c0fdaa8b6341bfdeca5f18f7837462c80cff90527ee35ef185571e1c327beac
COPY --from=build /modules/ui/dist/ /usr/share/nginx/html
diff --git a/testing/api/test_api.py b/testing/api/test_api.py
index c6368148c..44daef335 100644
--- a/testing/api/test_api.py
+++ b/testing/api/test_api.py
@@ -231,7 +231,7 @@ def test_get_system_interfaces(testrun):
r = requests.get(f"{API}/system/interfaces")
response = json.loads(r.text)
local_interfaces = get_network_interfaces()
- assert set(response) == set(local_interfaces)
+ assert set(response.keys()) == set(local_interfaces)
# schema expects a flat list
assert all([isinstance(x, str) for x in response])
@@ -262,13 +262,18 @@ def test_modify_device(testing_devices, testrun):
}
updated_device["test_modules"] = new_test_modules
+ updated_device_payload = {}
+ updated_device_payload["device"] = updated_device
+ updated_device_payload["mac_addr"] = mac_addr
+
print("updated_device")
pretty_print(updated_device)
print("api_device")
pretty_print(api_device)
# update device
- r = requests.post(f"{API}/device", data=json.dumps(updated_device))
+ r = requests.post(f"{API}/device/edit",
+ data=json.dumps(updated_device_payload))
assert r.status_code == 200
diff --git a/testing/baseline/test_baseline b/testing/baseline/test_baseline
index 7ecc6ba19..c52c08aef 100755
--- a/testing/baseline/test_baseline
+++ b/testing/baseline/test_baseline
@@ -23,7 +23,7 @@ ifconfig
sudo apt-get update
sudo apt-get install openvswitch-common openvswitch-switch tcpdump jq moreutils coreutils isc-dhcp-client
-pip3 install pytest
+pip3 install pytest==7.4.4
# Setup device network
sudo ip link add dev endev0a type veth peer name endev0b
diff --git a/testing/docker/ci_baseline/Dockerfile b/testing/docker/ci_baseline/Dockerfile
index 468c6f7a0..93ad905f9 100644
--- a/testing/docker/ci_baseline/Dockerfile
+++ b/testing/docker/ci_baseline/Dockerfile
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM ubuntu:jammy
+FROM ubuntu@sha256:e6173d4dc55e76b87c4af8db8821b1feae4146dd47341e4d431118c7dd060a74
# Update and get all additional requirements not contained in the base image
RUN apt-get update && apt-get -y upgrade
diff --git a/testing/docker/ci_test_device1/Dockerfile b/testing/docker/ci_test_device1/Dockerfile
index 1c62d231d..0279df5ef 100644
--- a/testing/docker/ci_test_device1/Dockerfile
+++ b/testing/docker/ci_test_device1/Dockerfile
@@ -1,5 +1,5 @@
-FROM ubuntu:jammy
+FROM ubuntu@sha256:e6173d4dc55e76b87c4af8db8821b1feae4146dd47341e4d431118c7dd060a74
ENV DEBIAN_FRONTEND=noninteractive
diff --git a/testing/pylint/test_pylint b/testing/pylint/test_pylint
index 6d28226cc..82949d909 100755
--- a/testing/pylint/test_pylint
+++ b/testing/pylint/test_pylint
@@ -19,7 +19,7 @@ ERROR_LIMIT=175
sudo cmd/install
source venv/bin/activate
-sudo pip3 install pylint
+sudo pip3 install pylint==3.0.3
files=$(find ./framework -path ./venv -prune -o -name '*.py' -print)
diff --git a/testing/tests/test_tests b/testing/tests/test_tests
index 9fad12d80..3b509f702 100755
--- a/testing/tests/test_tests
+++ b/testing/tests/test_tests
@@ -27,7 +27,7 @@ mkdir -p $TEST_DIR
sudo apt-get update
sudo apt-get install -y openvswitch-common openvswitch-switch tcpdump jq moreutils coreutils isc-dhcp-client
-pip3 install pytest
+pip3 install pytest==7.4.4
# Start OVS
# Setup device network
diff --git a/testing/unit/run_tests.sh b/testing/unit/run_tests.sh
index 5fa1179b1..ce43bbf8f 100644
--- a/testing/unit/run_tests.sh
+++ b/testing/unit/run_tests.sh
@@ -1,5 +1,19 @@
#!/bin/bash -e
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
# This script should be run from within the unit_test directory. If
# it is run outside this directory, paths will not be resolved correctly.
@@ -8,15 +22,21 @@ pushd ../../ >/dev/null 2>&1
echo "Root Dir: $PWD"
-# Setup the python path
-export PYTHONPATH="$PWD/framework/python/src"
+# Add the framework sources
+PYTHONPATH="$PWD/framework/python/src"
+
+# Add the test module sources
+PYTHONPATH="$PYTHONPATH:$PWD/modules/test/tls/python/src"
+
+# Set the python path with all sources
+export PYTHONPATH
# Run the DHCP Unit tests
python3 -u $PWD/modules/network/dhcp-1/python/src/grpc_server/dhcp_config_test.py
python3 -u $PWD/modules/network/dhcp-2/python/src/grpc_server/dhcp_config_test.py
-# Run the Security Module Unit Tests
-python3 -u $PWD/modules/test/tls/python/src/tls_module_test.py
+# Run the TLS Module Unit Tests
+python3 -u $PWD/testing/unit/tls/tls_module_test.py
popd >/dev/null 2>&1
\ No newline at end of file
diff --git a/testing/unit/tls/monitor.pcap b/testing/unit/tls/monitor.pcap
new file mode 100644
index 000000000..52f5034de
Binary files /dev/null and b/testing/unit/tls/monitor.pcap differ
diff --git a/testing/unit/tls/no_tls.pcap b/testing/unit/tls/no_tls.pcap
new file mode 100644
index 000000000..557e66b1a
Binary files /dev/null and b/testing/unit/tls/no_tls.pcap differ
diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py
new file mode 100644
index 000000000..baed9c395
--- /dev/null
+++ b/testing/unit/tls/tls_module_test.py
@@ -0,0 +1,404 @@
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Module run all the TLS related unit tests"""
+from tls_util import TLSUtil
+import unittest
+from common import logger
+from scapy.all import sniff, wrpcap
+import os
+import threading
+import time
+import netifaces
+import ssl
+import http.client
+
+MODULE = 'tls'
+# Define the file paths
+TEST_FILES_DIR = 'testing/unit/' + MODULE
+OUTPUT_DIR = TEST_FILES_DIR + '/output'
+
+TLS_UTIL = None
+PACKET_CAPTURE = None
+
+
+class TLSModuleTest(unittest.TestCase):
+ """Contains and runs all the unit tests concerning TLS behaviors"""
+
+ @classmethod
+ def setUpClass(cls):
+ log = logger.get_logger('test_' + MODULE)
+ global TLS_UTIL
+ TLS_UTIL = TLSUtil(log,
+ bin_dir='modules/test/tls/bin',
+ cert_out_dir=OUTPUT_DIR,
+ root_certs_dir='local/root_certs')
+
+ # Test 1.2 server when only 1.2 connection is established
+ def security_tls_v1_2_server_test(self):
+ tls_1_2_results = TLS_UTIL.validate_tls_server('google.com',
+ tls_version='1.2')
+ tls_1_3_results = None, 'No TLS 1.3'
+ test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertTrue(test_results[0])
+
+ # Test 1.2 server when 1.3 connection is established
+ def security_tls_v1_2_for_1_3_server_test(self):
+ tls_1_2_results = None, 'No TLS 1.2'
+ tls_1_3_results = TLS_UTIL.validate_tls_server('google.com',
+ tls_version='1.3')
+ test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertTrue(test_results[0])
+
+ # Test 1.2 server when 1.2 and 1.3 connection is established
+ def security_tls_v1_2_for_1_2_and_1_3_server_test(self):
+ tls_1_2_results = TLS_UTIL.validate_tls_server('google.com',
+ tls_version='1.2')
+ tls_1_3_results = TLS_UTIL.validate_tls_server('google.com',
+ tls_version='1.3')
+ test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertTrue(test_results[0])
+
+ # Test 1.2 server when 1.2 and failed 1.3 connection is established
+ def security_tls_v1_2_for_1_2_and_1_3_fail_server_test(self):
+ tls_1_2_results = TLS_UTIL.validate_tls_server('google.com',
+ tls_version='1.2')
+ tls_1_3_results = False, 'Signature faild'
+ test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertTrue(test_results[0])
+
+ # Test 1.2 server when 1.3 and failed 1.2 connection is established
+ def security_tls_v1_2_for_1_3_and_1_2_fail_server_test(self):
+ tls_1_3_results = TLS_UTIL.validate_tls_server('google.com',
+ tls_version='1.3')
+ tls_1_2_results = False, 'Signature faild'
+ test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertTrue(test_results[0])
+
+ def security_tls_server_results_test(self, ):
+ # Generic messages to test they are passing through
+ # to the results as expected
+ fail_message = 'Certificate not validated'
+ success_message = 'Certificate validated'
+ none_message = 'Failed to resolve public certificate'
+
+ # Both None
+ tls_1_2_results = None, none_message
+ tls_1_3_results = None, none_message
+ expected = None, (f'TLS 1.2 not validated: {none_message}\n'
+ f'TLS 1.3 not validated: {none_message}')
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # TLS 1.2 Pass and TLS 1.3 None
+ tls_1_2_results = True, success_message
+ expected = True, f'TLS 1.2 validated: {success_message}'
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # TLS 1.2 Fail and TLS 1.3 None
+ tls_1_2_results = False, fail_message
+ expected = False, f'TLS 1.2 not validated: {fail_message}'
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # TLS 1.3 Pass and TLS 1.2 None
+ tls_1_2_results = None, fail_message
+ tls_1_3_results = True, success_message
+ expected = True, f'TLS 1.3 validated: {success_message}'
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # TLS 1.3 Fail and TLS 1.2 None
+ tls_1_3_results = False, fail_message
+ expected = False, f'TLS 1.3 not validated: {fail_message}'
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # TLS 1.2 Pass and TLS 1.3 Pass
+ tls_1_2_results = True, success_message
+ tls_1_3_results = True, success_message
+ expected = True, (f'TLS 1.2 validated: {success_message}\n'
+ f'TLS 1.3 validated: {success_message}')
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # TLS 1.2 Pass and TLS 1.3 Fail
+ tls_1_2_results = True, success_message
+ tls_1_3_results = False, fail_message
+ expected = True, (f'TLS 1.2 validated: {success_message}\n'
+ f'TLS 1.3 not validated: {fail_message}')
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # TLS 1.2 Fail and TLS 1.2 Pass
+ tls_1_2_results = False, fail_message
+ tls_1_3_results = True, success_message
+ expected = True, (f'TLS 1.2 not validated: {fail_message}\n'
+ f'TLS 1.3 validated: {success_message}')
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+
+ # TLS 1.2 Fail and TLS 1.2 Fail
+ tls_1_3_results = False, fail_message
+ expected = False, (f'TLS 1.2 not validated: {fail_message}\n'
+ f'TLS 1.3 not validated: {fail_message}')
+ result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertEqual(result, expected)
+
+ # Test 1.2 server when 1.3 and 1.2 failed connection is established
+ def security_tls_v1_2_fail_server_test(self):
+ tls_1_2_results = False, 'Signature faild'
+ tls_1_3_results = False, 'Signature faild'
+ test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertFalse(test_results[0])
+
+ # Test 1.2 server when 1.3 and 1.2 failed connection is established
+ def security_tls_v1_2_none_server_test(self):
+ tls_1_2_results = None, 'No cert'
+ tls_1_3_results = None, 'No cert'
+ test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
+ tls_1_3_results)
+ self.assertIsNone(test_results[0])
+
+ def security_tls_v1_3_server_test(self):
+ test_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3')
+ self.assertTrue(test_results[0])
+
+ def security_tls_v1_2_client_test(self):
+ test_results = self.test_client_tls('1.2')
+ print(str(test_results))
+ self.assertTrue(test_results[0])
+
+ def security_tls_v1_2_client_cipher_fail_test(self):
+ test_results = self.test_client_tls('1.2', disable_valid_ciphers=True)
+ print(str(test_results))
+ self.assertFalse(test_results[0])
+
+ # Scan a known capture without any TLS traffic to
+ # generate a skip result
+ def security_tls_client_skip_test(self):
+ print('security_tls_client_skip_test')
+ capture_file = os.path.join(TEST_FILES_DIR, 'no_tls.pcap')
+
+ # Run the client test
+ test_results = TLS_UTIL.validate_tls_client(client_ip='172.27.253.167',
+ tls_version='1.2',
+ capture_files=[capture_file])
+ print(str(test_results))
+ self.assertIsNone(test_results[0])
+
+ def security_tls_v1_3_client_test(self):
+ test_results = self.test_client_tls('1.3')
+ print(str(test_results))
+ self.assertTrue(test_results[0])
+
+ def client_hello_packets_test(self):
+ packet_fail = {
+ 'dst_ip': '10.10.10.1',
+ 'src_ip': '10.10.10.14',
+ 'dst_port': '443',
+ 'cipher_support': {
+ 'ecdh': False,
+ 'ecdsa': True
+ }
+ }
+ packet_success = {
+ 'dst_ip': '10.10.10.1',
+ 'src_ip': '10.10.10.14',
+ 'dst_port': '443',
+ 'cipher_support': {
+ 'ecdh': True,
+ 'ecdsa': True
+ }
+ }
+ hello_packets = [packet_fail, packet_success]
+ hello_results = TLS_UTIL.process_hello_packets(hello_packets, '1.2')
+ print('Hello packets test results: ' + str(hello_results))
+ expected = {'valid': [packet_success], 'invalid': []}
+ self.assertEqual(hello_results, expected)
+
+ def test_client_tls(self,
+ tls_version,
+ tls_generate=None,
+ disable_valid_ciphers=False):
+ # Make the capture file
+ os.makedirs(OUTPUT_DIR, exist_ok=True)
+ capture_file = OUTPUT_DIR + '/client_tls.pcap'
+
+ # Resolve the client ip used
+ client_ip = self.get_interface_ip('eth0')
+
+ # Genrate TLS outbound traffic
+ if tls_generate is None:
+ tls_generate = tls_version
+ self.generate_tls_traffic(capture_file, tls_generate, disable_valid_ciphers)
+
+ # Run the client test
+ return TLS_UTIL.validate_tls_client(client_ip=client_ip,
+ tls_version=tls_version,
+ capture_files=[capture_file])
+
+ def test_client_tls_with_non_tls_client(self):
+ print('\ntest_client_tls_with_non_tls_client')
+ capture_file = os.path.join(TEST_FILES_DIR, 'monitor.pcap')
+
+ # Run the client test
+ test_results = TLS_UTIL.validate_tls_client(client_ip='10.10.10.14',
+ tls_version='1.2',
+ capture_files=[capture_file])
+ print(str(test_results))
+ self.assertFalse(test_results[0])
+
+ # Scan a known capture without u unsupported TLS traffic to
+ # generate a fail result
+ def security_tls_client_unsupported_tls_client(self):
+ print('\nsecurity_tls_client_unsupported_tls_client')
+ capture_file = os.path.join(TEST_FILES_DIR, 'unsupported_tls.pcap')
+
+ # Run the client test
+ test_results = TLS_UTIL.validate_tls_client(client_ip='172.27.253.167',
+ tls_version='1.2',
+ capture_files=[capture_file])
+ print(str(test_results))
+ self.assertFalse(test_results[0])
+
+ def generate_tls_traffic(self,
+ capture_file,
+ tls_version,
+ disable_valid_ciphers=False):
+ capture_thread = self.start_capture_thread(10)
+ print('Capture Started')
+
+ # Generate some TLS 1.2 outbound traffic
+ while capture_thread.is_alive():
+ self.make_tls_connection('www.google.com', 443, tls_version,
+ disable_valid_ciphers)
+ time.sleep(1)
+
+ # Save the captured packets to the file.
+ wrpcap(capture_file, PACKET_CAPTURE)
+
+ def make_tls_connection(self,
+ hostname,
+ port,
+ tls_version,
+ disable_valid_ciphers=False):
+ # Create the SSL context with the desired TLS version and options
+ context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
+ context.check_hostname = False
+ context.verify_mode = ssl.CERT_NONE
+ context.options |= ssl.PROTOCOL_TLS
+
+ if disable_valid_ciphers:
+ # Create a list of ciphers that do not use ECDH or ECDSA
+ ciphers_str = [
+ 'TLS_AES_256_GCM_SHA384', 'TLS_CHACHA20_POLY1305_SHA256',
+ 'TLS_AES_128_GCM_SHA256', 'AES256-GCM-SHA384',
+ 'PSK-AES256-GCM-SHA384', 'PSK-CHACHA20-POLY1305',
+ 'RSA-PSK-AES128-GCM-SHA256', 'DHE-PSK-AES128-GCM-SHA256',
+ 'AES128-GCM-SHA256', 'PSK-AES128-GCM-SHA256', 'AES256-SHA256',
+ 'AES128-SHA'
+ ]
+ context.set_ciphers(':'.join(ciphers_str))
+
+ if tls_version != '1.1':
+ context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0
+ context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1
+ else:
+ context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2
+ context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3
+
+ if tls_version == '1.3':
+ context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2
+ elif tls_version == '1.2':
+ context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3
+
+ # Create the HTTPS connection with the SSL context
+ connection = http.client.HTTPSConnection(hostname, port, context=context)
+
+ # Perform the TLS handshake manually
+ try:
+ connection.connect()
+ except ssl.SSLError as e:
+ print('Failed to make connection: ' + str(e))
+
+ # At this point, the TLS handshake is complete.
+ # You can do any further processing or just close the connection.
+ connection.close()
+
+ def start_capture(self, timeout):
+ global PACKET_CAPTURE
+ PACKET_CAPTURE = sniff(iface='eth0', timeout=timeout)
+
+ def start_capture_thread(self, timeout):
+ # Start the packet capture in a separate thread to avoid blocking.
+ capture_thread = threading.Thread(target=self.start_capture,
+ args=(timeout, ))
+ capture_thread.start()
+
+ return capture_thread
+
+ def get_interface_ip(self, interface_name):
+ try:
+ addresses = netifaces.ifaddresses(interface_name)
+ ipv4 = addresses[netifaces.AF_INET][0]['addr']
+ return ipv4
+ except (ValueError, KeyError) as e:
+ print(f'Error: {e}')
+ return None
+
+if __name__ == '__main__':
+ suite = unittest.TestSuite()
+ suite.addTest(TLSModuleTest('client_hello_packets_test'))
+ # TLS 1.2 server tests
+ suite.addTest(TLSModuleTest('security_tls_v1_2_server_test'))
+ suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_3_server_test'))
+ suite.addTest(TLSModuleTest('security_tls_v1_2_for_1_2_and_1_3_server_test'))
+ suite.addTest(
+ TLSModuleTest('security_tls_v1_2_for_1_2_and_1_3_fail_server_test'))
+ suite.addTest(
+ TLSModuleTest('security_tls_v1_2_for_1_3_and_1_2_fail_server_test'))
+ suite.addTest(TLSModuleTest('security_tls_v1_2_fail_server_test'))
+ suite.addTest(TLSModuleTest('security_tls_v1_2_none_server_test'))
+
+ # TLS 1.3 server tests
+ suite.addTest(TLSModuleTest('security_tls_v1_3_server_test'))
+ # TLS client tests
+ suite.addTest(TLSModuleTest('security_tls_v1_2_client_test'))
+ suite.addTest(TLSModuleTest('security_tls_v1_3_client_test'))
+ suite.addTest(TLSModuleTest('security_tls_client_skip_test'))
+ suite.addTest(TLSModuleTest('security_tls_v1_2_client_cipher_fail_test'))
+ suite.addTest(TLSModuleTest('test_client_tls_with_non_tls_client'))
+ suite.addTest(TLSModuleTest('security_tls_client_unsupported_tls_client'))
+
+ suite.addTest(TLSModuleTest('security_tls_server_results_test'))
+
+ runner = unittest.TextTestRunner()
+ runner.run(suite)
diff --git a/testing/unit/tls/unsupported_tls.pcap b/testing/unit/tls/unsupported_tls.pcap
new file mode 100644
index 000000000..08e6c3c04
Binary files /dev/null and b/testing/unit/tls/unsupported_tls.pcap differ