Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions modules/test/tls/bin/get_client_hello_packets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@ TLS_VERSION="$3"
TSHARK_OUTPUT="-T json -e ip.src -e tcp.dstport -e ip.dst"
TSHARK_FILTER="ssl.handshake.type==1 and ip.src==$SRC_IP"

if [[ $TLS_VERSION == '1.2' || -z $TLS_VERSION ]];then
if [[ $TLS_VERSION == '1.0' ]]; then
TSHARK_FILTER="$TSHARK_FILTER and ssl.handshake.version==0x0301"
elif [[ $TLS_VERSION == '1.1' ]]; then
TSHARK_FILTER="$TSHARK_FILTER and ssl.handshake.version==0x0302"
elif [[ $TLS_VERSION == '1.2' || -z $TLS_VERSION ]]; then
TSHARK_FILTER="$TSHARK_FILTER and ssl.handshake.version==0x0303"
elif [ $TLS_VERSION == '1.3' ];then
elif [[ $TLS_VERSION == '1.3' ]]; then
TSHARK_FILTER="$TSHARK_FILTER and (ssl.handshake.version==0x0304 or tls.handshake.extensions.supported_version==0x0304)"
else
echo "Unsupported TLS version: $TLS_VERSION"
exit 1
fi

response=$(tshark -r "$CAPTURE_FILE" $TSHARK_OUTPUT $TSHARK_FILTER)

echo "$response"


72 changes: 39 additions & 33 deletions modules/test/tls/bin/get_handshake_complete.sh
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
#!/bin/bash

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

CAPTURE_FILE="$1"
SRC_IP="$2"
DST_IP="$3"
TLS_VERSION="$4"

TSHARK_FILTER="ip.src==$SRC_IP and ip.dst==$DST_IP "

if [[ $TLS_VERSION == '1.2' || -z $TLS_VERSION ]];then
TSHARK_FILTER=$TSHARK_FILTER " and ssl.handshake.type==2 and tls.handshake.type==14 "
elif [ $TLS_VERSION == '1.2' ];then
TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.type==2 and tls.handshake.extensions.supported_version==0x0304"
fi

response=$(tshark -r "$CAPTURE_FILE" $TSHARK_FILTER)

echo "$response"

#!/bin/bash

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

CAPTURE_FILE="$1"
SRC_IP="$2"
DST_IP="$3"
TLS_VERSION="$4"

TSHARK_FILTER="ip.src==$SRC_IP and ip.dst==$DST_IP"

if [[ $TLS_VERSION == '1.0' ]]; then
TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.type==2 and tls.handshake.type==14"
elif [[ $TLS_VERSION == '1.1' ]]; then
TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.type==2 and tls.handshake.type==14"
elif [[ $TLS_VERSION == '1.2' || -z $TLS_VERSION ]]; then
TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.type==2 and tls.handshake.type==14"
elif [[ $TLS_VERSION == '1.3' ]]; then
TSHARK_FILTER=$TSHARK_FILTER "and ssl.handshake.type==2 and tls.handshake.extensions.supported_version==0x0304"
else
echo "Unsupported TLS version: $TLS_VERSION"
exit 1
fi

response=$(tshark -r "$CAPTURE_FILE" $TSHARK_FILTER)

echo "$response"
9 changes: 9 additions & 0 deletions modules/test/tls/conf/module_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@
"timeout": 300
},
"tests":[
{
"name": "security.tls.v1_0_client",
"test_description": "Device uses TLS with connection to an external service on port 443 (or any other port which could be running the webserver-HTTPS)",
"expected_behavior": "The packet indicates a TLS connection with at least TLS 1.0 and support",
"recommendations": [
"Disable connections to unsecure services",
"Ensure any URLs connected to are secure (https)"
]
},
{
"name": "security.tls.v1_2_server",
"test_description": "Check the device web server TLS 1.2 & certificate is valid",
Expand Down
64 changes: 55 additions & 9 deletions modules/test/tls/python/src/tls_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
GATEWAY_CAPTURE_FILE = '/runtime/network/gateway.pcap'
LOGGER = None


class TLSModule(TestModule):
"""The TLS testing module."""

Expand Down Expand Up @@ -234,8 +235,8 @@ def _security_tls_v1_2_server(self):
self._device_ipv4_addr, tls_version='1.2')
tls_1_3_results = self._tls_util.validate_tls_server(
self._device_ipv4_addr, tls_version='1.3')
results = self._tls_util.process_tls_server_results(tls_1_2_results,
tls_1_3_results)
results = self._tls_util.process_tls_server_results(
tls_1_2_results, tls_1_3_results)
# Determine results and return proper messaging and details
description = ''
if results[0] is None:
Expand All @@ -244,7 +245,7 @@ def _security_tls_v1_2_server(self):
description = 'TLS 1.2 certificate is valid'
else:
description = 'TLS 1.2 certificate is invalid'
return results[0], description,results[1]
return results[0], description, results[1]

else:
LOGGER.error('Could not resolve device IP address. Skipping')
Expand All @@ -256,7 +257,7 @@ def _security_tls_v1_3_server(self):
# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is not None:
results = self._tls_util.validate_tls_server(self._device_ipv4_addr,
tls_version='1.3')
tls_version='1.3')
# Determine results and return proper messaging and details
description = ''
if results[0] is None:
Expand All @@ -265,18 +266,57 @@ def _security_tls_v1_3_server(self):
description = 'TLS 1.3 certificate is valid'
else:
description = 'TLS 1.3 certificate is invalid'
return results[0], description,results[1]
return results[0], description, results[1]

else:
LOGGER.error('Could not resolve device IP address')
return 'Error', 'Could not resolve device IP address'

def _security_tls_v1_0_client(self):
LOGGER.info('Running security.tls.v1_0_client')
self._resolve_device_ip()
# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is not None:
tls_1_0_valid = self._validate_tls_client(self._device_ipv4_addr, '1.0')
tls_1_1_valid = self._validate_tls_client(self._device_ipv4_addr, '1.1')
tls_1_2_valid = self._validate_tls_client(self._device_ipv4_addr, '1.2')
tls_1_3_valid = self._validate_tls_client(self._device_ipv4_addr, '1.3')
states = [
tls_1_0_valid[0], tls_1_1_valid[0], tls_1_2_valid[0], tls_1_3_valid[0]
]
if any(state is True for state in states):
# If any state is True, return True
result_state = True
result_message = 'TLS 1.0 or higher detected'
elif all(state == 'Feature Not Detected' for state in states):
# If all states are "Feature not Detected"
result_state = 'Feature Not Detected'
result_message = tls_1_0_valid[1]
elif all(state == 'Error' for state in states):
# If all states are "Error"
result_state = 'Error'
result_message = ''
else:
result_state = False
result_message = 'TLS 1.0 or higher was not detected'
result_details = tls_1_0_valid[2] + tls_1_1_valid[2] + tls_1_2_valid[
2] + tls_1_3_valid[2]
result_tags = list(
set(tls_1_0_valid[3] + tls_1_1_valid[3] + tls_1_2_valid[3] +
tls_1_3_valid[3]))
return result_state, result_message, result_details, result_tags
else:
LOGGER.error('Could not resolve device IP address. Skipping')
return 'Error', 'Could not resolve device IP address'

def _security_tls_v1_2_client(self):
LOGGER.info('Running security.tls.v1_2_client')
self._resolve_device_ip()
# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is not None:
return self._validate_tls_client(self._device_ipv4_addr, '1.2')
return self._validate_tls_client(self._device_ipv4_addr,
'1.2',
unsupported_versions=['1.0', '1.1'])
else:
LOGGER.error('Could not resolve device IP address. Skipping')
return 'Error', 'Could not resolve device IP address'
Expand All @@ -286,18 +326,24 @@ def _security_tls_v1_3_client(self):
self._resolve_device_ip()
# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is not None:
return self._validate_tls_client(self._device_ipv4_addr, '1.3')
return self._validate_tls_client(self._device_ipv4_addr,
'1.3',
unsupported_versions=['1.0', '1.1'])
else:
LOGGER.error('Could not resolve device IP address. Skipping')
return 'Error', 'Could not resolve device IP address'

def _validate_tls_client(self, client_ip, tls_version):
def _validate_tls_client(self,
client_ip,
tls_version,
unsupported_versions=None):
client_results = self._tls_util.validate_tls_client(
client_ip=client_ip,
tls_version=tls_version,
capture_files=[
MONITOR_CAPTURE_FILE, STARTUP_CAPTURE_FILE, TLS_CAPTURE_FILE
])
],
unsupported_versions=unsupported_versions)

# Generate results based on the state
result_state = None
Expand Down
62 changes: 31 additions & 31 deletions modules/test/tls/python/src/tls_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ def process_hello_packets(self,
f'\nAllowing {protocol_name} traffic to {packet["dst_ip"]}')
client_hello_results['valid'].append(packet)
else:
# No cipher check for TLS 1.3
# No cipher check for TLS 1.0, 1.1 or TLS 1.3
client_hello_results['valid'] = hello_packets
return client_hello_results

Expand Down Expand Up @@ -589,36 +589,31 @@ def get_non_tls_client_connection_ips(self, client_ip, capture_files):
# we will assume any local connections using the same IP subnet as our
# local network are approved and only connections to IP addresses outside
# our network will be flagged.
def get_unsupported_tls_ips(self, client_ip, capture_files):
def get_unsupported_tls_ips(self,
client_ip,
capture_files,
unsupported_versions=None):
LOGGER.info('Checking client for unsupported TLS client connections')
tls_1_0_packets = self.get_tls_packets(capture_files, client_ip, '1.0')
tls_1_1_packets = self.get_tls_packets(capture_files, client_ip, '1.1')

unsupported_tls_dst_ips = {}
if len(tls_1_0_packets) > 0:
for packet in tls_1_0_packets:
dst_ip = packet['dst_ip']
tls_version = '1.0'
if dst_ip not in unsupported_tls_dst_ips:
LOGGER.info(f'''Unsupported TLS {tls_version}
connections detected to {dst_ip}''')
unsupported_tls_dst_ips[dst_ip] = [tls_version]

if len(tls_1_1_packets) > 0:
for packet in tls_1_1_packets:
dst_ip = packet['dst_ip']
tls_version = '1.1'
# Check if the IP is already in the dictionary
if dst_ip in unsupported_tls_dst_ips:
# If the IP is already present, append the new TLS version to the
# list
unsupported_tls_dst_ips[dst_ip].append(tls_version)
else:
# If the IP is not present, create a new list with the current
# TLS version
LOGGER.info(f'''Unsupported TLS {tls_version} connections detected
to {dst_ip}''')
unsupported_tls_dst_ips[dst_ip] = [tls_version]
if unsupported_versions is not None:
for unsupported_version in unsupported_versions:
tls_packets = self.get_tls_packets(capture_files, client_ip, '1.0')
if len(tls_packets) > 0:
for packet in tls_packets:
dst_ip = packet['dst_ip']
tls_version = unsupported_version
if dst_ip not in unsupported_tls_dst_ips:
# If the IP is already present, append the new TLS version to the
# list
LOGGER.info(f'''Unsupported TLS {tls_version}
connections detected to {dst_ip}''')
unsupported_tls_dst_ips[dst_ip] = [tls_version]
else:
# If the IP is not present, create a new list with the current
# TLS version
LOGGER.info(f'''Unsupported TLS {tls_version} connections detected
to {dst_ip}''')
unsupported_tls_dst_ips[dst_ip] = [tls_version]
return unsupported_tls_dst_ips

# Check if the device has made any outbound connections that use any
Expand Down Expand Up @@ -657,7 +652,11 @@ def is_private_ip(self, ip):
return True
return False

def validate_tls_client(self, client_ip, tls_version, capture_files):
def validate_tls_client(self,
client_ip,
tls_version,
capture_files,
unsupported_versions=None):
LOGGER.info('Validating client for TLS: ' + tls_version)
hello_packets = self.get_hello_packets(capture_files, client_ip,
tls_version)
Expand Down Expand Up @@ -760,7 +759,8 @@ def validate_tls_client(self, client_ip, tls_version, capture_files):
LOGGER.info(f'''TLS connection detected to {ip}.
Ignoring non-TLS traffic detected to this IP''')

unsupported_tls_ips = self.get_unsupported_tls_ips(client_ip, capture_files)
unsupported_tls_ips = self.get_unsupported_tls_ips(client_ip, capture_files,
unsupported_versions)
if len(unsupported_tls_ips) > 0:
tls_client_valid = False
for ip, tls_versions in unsupported_tls_ips.items():
Expand Down
2 changes: 1 addition & 1 deletion modules/ws/conf/mosquitto.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
log_dest stdout
log_type all
log_timestamp true
connection_messages true
connection_messages false

## MQTT Listener

Expand Down
4 changes: 4 additions & 0 deletions resources/test_packs/qualification.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@
"name": "ntp.network.ntp_server",
"required_result": "Required"
},
{
"name": "security.tls.v1_0_client",
"required_result": "Informational"
},
{
"name": "security.tls.v1_2_server",
"required_result": "Required if Applicable"
Expand Down