diff --git a/.gitignore b/.gitignore index e168ec07a..5a216522f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ venv/ error pylint.out __pycache__/ -build/ \ No newline at end of file +build/ +testing/unit_test/temp \ No newline at end of file diff --git a/local/.gitignore b/local/.gitignore index 4fb365c03..f13ce8d85 100644 --- a/local/.gitignore +++ b/local/.gitignore @@ -1,2 +1,3 @@ system.json -devices \ No newline at end of file +devices +root_certs \ No newline at end of file diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py index 6ff4f815b..e949976fa 100644 --- a/modules/test/base/python/src/test_module.py +++ b/modules/test/base/python/src/test_module.py @@ -61,6 +61,8 @@ def _get_device_tests(self, device_test_module): if 'tests' in device_test_module: if test['name'] in device_test_module['tests']: dev_test_config = device_test_module['tests'][test['name']] + if 'enabled' in dev_test_config: + test['enabled'] = dev_test_config['enabled'] if 'config' in test and 'config' in dev_test_config: test['config'].update(dev_test_config['config']) return module_tests @@ -81,17 +83,19 @@ def run_tests(self): test_method_name = '_' + test['name'].replace('.', '_') result = None test['start'] = datetime.now().isoformat() - LOGGER.info('Attempting to run test: ' + test['name']) - # Resolve the correct python method by test name and run test - if hasattr(self, test_method_name): - if 'config' in test: - result = getattr(self, test_method_name)(config=test['config']) + if ('enabled' in test and test['enabled']) or 'enabled' not in test: + LOGGER.info('Attempting to run test: ' + test['name']) + # Resolve the correct python method by test name and run test + if hasattr(self, test_method_name): + if 'config' in test: + result = getattr(self, test_method_name)(config=test['config']) + else: + result = getattr(self, test_method_name)() else: - result = getattr(self, test_method_name)() + LOGGER.info(f'Test {test["name"]} not resolved. Skipping') + result = None else: - LOGGER.info(f'Test {test["name"]} not resolved. Skipping') - result = None - + LOGGER.info(f'Test {test["name"]} disabled. Skipping') if result is not None: if isinstance(result, bool): test['result'] = 'compliant' if result else 'non-compliant' diff --git a/modules/test/conn/conf/module_config.json b/modules/test/conn/conf/module_config.json index 86e1849af..3e06cc891 100644 --- a/modules/test/conn/conf/module_config.json +++ b/modules/test/conn/conf/module_config.json @@ -71,6 +71,16 @@ "description": "The device under test responds to an ICMP echo (ping) request.", "expected_behavior": "The device under test responds to an ICMP echo (ping) request." }, + { + "name": "connection.ipaddr.ip_change", + "description": "The device responds to a ping (ICMP echo request) to the new IP address it has received after the initial dHCP lease has expired.", + "expected_behavior": "If the lease expires before the client receiveds a DHCPACK, the client moves to INIT state, MUST immediately stop any other network processing and requires network initialization parameters as if the client were uninitialized. If the client then receives a DHCPACK allocating the client its previous network addres, the client SHOULD continue network processing. If the client is given a new network address, it MUST NOT continue using the previous network address and SHOULD notify the local users of the problem." + }, + { + "name": "connection.ipaddr.dhcp_failover", + "description": "The device has requested a DHCPREQUEST/REBIND to the DHCP failover server after the primary DHCP server has been brought down.", + "expected_behavior": "" + }, { "name": "connection.ipv6_slaac", "description": "The device forms a valid IPv6 address as a combination of the IPv6 router prefix and the device interface identifier", diff --git a/modules/test/conn/python/src/connection_module.py b/modules/test/conn/python/src/connection_module.py index d432d2131..169fb98c3 100644 --- a/modules/test/conn/python/src/connection_module.py +++ b/modules/test/conn/python/src/connection_module.py @@ -20,6 +20,7 @@ from test_module import TestModule from dhcp1.client import Client as DHCPClient1 from dhcp2.client import Client as DHCPClient2 +from dhcp_util import DHCPUtil LOG_NAME = 'test_connection' LOGGER = None @@ -39,6 +40,7 @@ def __init__(self, module): LOGGER = self._get_logger() self.dhcp1_client = DHCPClient1() self.dhcp2_client = DHCPClient2() + self._dhcp_util = DHCPUtil(self.dhcp1_client, self.dhcp2_client, LOGGER) # ToDo: Move this into some level of testing, leave for # reference until tests are implemented with these calls @@ -131,7 +133,7 @@ def _connection_single_ip(self): LOGGER.info('Inspecting: ' + str(len(packets)) + ' packets') for packet in packets: # Option[1] = message-type, option 3 = DHCPREQUEST - if DHCP in packet and packet[DHCP].options[0][1] == 3: + if DHCP in packet and packet[DHCP].options[0][1] == 3: mac_address = packet[Ether].src if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX): mac_addresses.add(mac_address.upper()) @@ -151,7 +153,7 @@ def _connection_target_ping(self): # If the ipv4 address wasn't resolved yet, try again if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4(self) + self._device_ipv4_addr = self._get_device_ipv4() if self._device_ipv4_addr is None: LOGGER.error('No device IP could be resolved') @@ -159,6 +161,85 @@ def _connection_target_ping(self): else: return self._ping(self._device_ipv4_addr) + def _connection_ipaddr_ip_change(self): + result = None + LOGGER.info('Running connection.ipaddr.ip_change') + if self._dhcp_util.setup_single_dhcp_server(): + lease = self._dhcp_util.get_cur_lease(self._device_mac) + if lease is not None: + LOGGER.info('Current device lease resolved: ' + str(lease)) + # Figure out how to calculate a valid IP address + ip_address = '10.10.10.30' + if self._dhcp_util.add_reserved_lease(lease['hostname'], + lease['hw_addr'], ip_address): + self._dhcp_util.wait_for_lease_expire(lease) + LOGGER.info('Checking device accepted new ip') + for _ in range(5): + LOGGER.info('Pinging device at IP: ' + ip_address) + if self._ping(ip_address): + LOGGER.info('Ping Success') + LOGGER.info('Reserved lease confirmed active in device') + result = True, 'Device has accepted an IP address change' + LOGGER.info('Restoring DHCP failover configuration') + break + else: + LOGGER.info('Device did not respond to ping') + result = False, 'Device did not accept IP address change' + time.sleep(5) # Wait 5 seconds before trying again + self._dhcp_util.delete_reserved_lease(lease['hw_addr']) + else: + result = None, 'Failed to create reserved lease for device' + else: + result = None, 'Device has no current DHCP lease' + # Restore the network + self._dhcp_util.restore_failover_dhcp_server() + LOGGER.info("Waiting 30 seconds for reserved lease to expire") + time.sleep(30) + self._dhcp_util.get_new_lease(self._device_mac) + else: + result = None, 'Failed to configure network for test' + return result + + def _connection_ipaddr_dhcp_failover(self): + result = None + # Confirm that both servers are online + primary_status = self._dhcp_util.get_dhcp_server_status( + dhcp_server_primary=True) + secondary_status = self._dhcp_util.get_dhcp_server_status( + dhcp_server_primary=False) + if primary_status and secondary_status: + lease = self._dhcp_util.get_cur_lease(self._device_mac) + if lease is not None: + LOGGER.info('Current device lease resolved: ' + str(lease)) + if self._dhcp_util.is_lease_active(lease): + # Shutdown the primary server + if self._dhcp_util.stop_dhcp_server(dhcp_server_primary=True): + # Wait until the current lease is expired + self._dhcp_util.wait_for_lease_expire(lease) + # Make sure the device has received a new lease from the + # secondary server + if self._dhcp_util.get_new_lease(self._device_mac, + dhcp_server_primary=False): + if self._dhcp_util.is_lease_active(lease): + result = True, ('Secondary DHCP server lease confirmed active ' + 'in device') + else: + result = False, 'Could not validate lease is active in device' + else: + result = False, ('Device did not recieve a new lease from ' + 'secondary DHCP server') + self._dhcp_util.start_dhcp_server(dhcp_server_primary=True) + else: + result = None, 'Failed to shutdown primary DHCP server' + else: + result = False, 'Device did not respond to ping' + else: + result = None, 'Device has no current DHCP lease' + else: + LOGGER.error('Network is not ready for this test. Skipping') + result = None, 'Network is not ready for this test' + return result + def _get_oui_manufacturer(self, mac_address): # Do some quick fixes on the format of the mac_address # to match the oui file pattern @@ -207,7 +288,7 @@ def _connection_ipv6_ping(self): return False def _ping(self, host): - cmd = "ping -c 1 " + str(host) + cmd = 'ping -c 1 ' + str(host) success = util.run_command(cmd, output=False) return success @@ -275,7 +356,7 @@ def is_ip_in_range(self, ip, start_ip, end_ip): return start_int <= ip_int <= end_int - def _run_subnet_test(self,config): + def _run_subnet_test(self, config): # Resolve the configured dhcp subnet ranges ranges = None if 'ranges' in config: @@ -292,9 +373,9 @@ def _run_subnet_test(self,config): LOGGER.info('Current DHCP subnet range: ' + str(cur_range)) else: LOGGER.error('Failed to resolve current subnet range required ' - 'for restoring network') + 'for restoring network') return None, ('Failed to resolve current subnet range required ' - 'for restoring network') + 'for restoring network') results = [] dhcp_setup = self.setup_single_dhcp_server() @@ -343,7 +424,7 @@ def _run_subnet_test(self,config): LOGGER.info('New lease not found. Waiting to check again') time.sleep(5) - except Exception as e: # pylint: disable=W0718 + except Exception as e: # pylint: disable=W0718 LOGGER.error('Failed to restore DHCP server configuration: ' + str(e)) return final_result, final_result_details @@ -401,7 +482,7 @@ def _get_cur_lease(self): LOGGER.info('Checking current device lease') response = self.dhcp1_client.get_lease(self._device_mac) if response.code == 200: - lease = eval(response.message) # pylint: disable=W0123 + lease = eval(response.message) # pylint: disable=W0123 if lease: # Check if non-empty lease return lease else: @@ -439,7 +520,7 @@ def test_subnets(self, subnets): 'details': 'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' failed' } - except Exception as e: # pylint: disable=W0718 + except Exception as e: # pylint: disable=W0718 result = {'result': False, 'details': 'Subnet test failed: ' + str(e)} results.append(result) return results diff --git a/modules/test/conn/python/src/dhcp_util.py b/modules/test/conn/python/src/dhcp_util.py new file mode 100644 index 000000000..6bc4d8401 --- /dev/null +++ b/modules/test/conn/python/src/dhcp_util.py @@ -0,0 +1,214 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module that contains various methods for validating the DHCP +device behaviors""" + +import time +from datetime import datetime +import util + +LOG_NAME = 'dhcp_util' +LOGGER = None + +class DHCPUtil(): + """Helper class for various tests concerning DHCP behavior""" + + def __init__(self, dhcp_primary_client, dhcp_secondary_client, logger): + global LOGGER + LOGGER = logger + self._dhcp1_client = dhcp_primary_client + self._dhcp2_client = dhcp_secondary_client + + # Move primary DHCP server from failover into a single DHCP server config + def disable_failover(self, dhcp_server_primary=True): + LOGGER.info('Disabling primary DHCP server failover') + response = self.get_dhcp_client(dhcp_server_primary).disable_failover() + if response.code == 200: + LOGGER.info('Primary DHCP server failover disabled') + return True + else: + LOGGER.error('Failed to disable primary DHCP server failover') + return False + + # Move primary DHCP server to primary failover + def enable_failover(self, dhcp_server_primary=True): + LOGGER.info('Enabling primary failover DHCP server') + response = self.get_dhcp_client(dhcp_server_primary).enable_failover() + if response.code == 200: + LOGGER.info('Primary DHCP server failover enabled') + return True + else: + LOGGER.error('Failed to enable primary DHCP server failover') + return False + + # Resolve the requested dhcp client + def get_dhcp_client(self, dhcp_server_primary=True): + if dhcp_server_primary: + return self._dhcp1_client + else: + return self._dhcp2_client + + # Read the DHCP range + def get_dhcp_range(self, dhcp_server_primary=True): + response = self.get_dhcp_client(dhcp_server_primary).get_dhcp_range() + cur_range = None + if response.code == 200: + cur_range = {} + cur_range['start'] = response.start + cur_range['end'] = response.end + LOGGER.info('Current DHCP subnet range: ' + str(cur_range)) + else: + LOGGER.error('Failed to resolve current subnet range required ' + 'for restoring network') + return cur_range + + def restore_failover_dhcp_server(self): + if self.enable_failover(): + response = self.get_dhcp_client(False).start_dhcp_server() + if response.code == 200: + LOGGER.info('Secondary DHCP server started') + return True + else: + LOGGER.error('Failed to start secondary DHCP server') + return False + else: + LOGGER.error('Failed to enabled failover in primary DHCP server') + return False + + # Resolve the requested dhcp client + def start_dhcp_server(self, dhcp_server_primary=True): + LOGGER.info('Starting DHCP server') + response = self.get_dhcp_client(dhcp_server_primary).start_dhcp_server() + if response.code == 200: + LOGGER.info('DHCP server start command success') + return True + else: + LOGGER.error('DHCP server start command failed') + return False + + # Resolve the requested dhcp client + def stop_dhcp_server(self, dhcp_server_primary=True): + LOGGER.info('Stopping DHCP server') + response = self.get_dhcp_client(dhcp_server_primary).stop_dhcp_server() + if response.code == 200: + LOGGER.info('DHCP server stop command success') + return True + else: + LOGGER.error('DHCP server stop command failed') + return False + + def get_dhcp_server_status(self, dhcp_server_primary=True): + LOGGER.info('Checking DHCP server status') + response = self.get_dhcp_client(dhcp_server_primary).get_status() + if response.code == 200: + LOGGER.info('DHCP server status: ' + str(response.message)) + status = eval(response.message) # pylint: disable=W0123 + return status['dhcpStatus'] + else: + return False + + def get_cur_lease(self, mac_address, dhcp_server_primary=True): + LOGGER.info('Checking current device lease') + response = self.get_dhcp_client(dhcp_server_primary).get_lease(mac_address) + if response.code == 200: + lease = eval(response.message) # pylint: disable=W0123 + if lease: # Check if non-empty lease + return lease + else: + return None + + def get_new_lease(self, mac_address, dhcp_server_primary=True): + lease = None + for _ in range(5): + LOGGER.info('Checking for new lease') + if lease is None: + lease = self.get_cur_lease(mac_address,dhcp_server_primary) + LOGGER.info('New Lease found: ' + str(lease)) + break + else: + LOGGER.info('New lease not found. Waiting to check again') + time.sleep(5) + return lease + + def is_lease_active(self, lease): + if 'ip' in lease: + ip_addr = lease['ip'] + LOGGER.info('Lease IP Resolved: ' + ip_addr) + LOGGER.info('Attempting to ping device...') + ping_success = self.ping(ip_addr) + LOGGER.info('Ping Success: ' + str(ping_success)) + LOGGER.info('Current lease confirmed active in device') + else: + LOGGER.error('Failed to confirm a valid active lease for the device') + return ping_success + + def ping(self, host): + cmd = 'ping -c 1 ' + str(host) + success = util.run_command(cmd, output=False) + return success + + def add_reserved_lease(self, + hostname, + mac_address, + ip_address, + dhcp_server_primary=True): + response = self.get_dhcp_client(dhcp_server_primary).add_reserved_lease( + hostname, mac_address, ip_address) + if response.code == 200: + LOGGER.info('Reserved lease ' + ip_address + ' added for ' + mac_address) + return True + else: + LOGGER.error('Failed to add reserved lease for ' + mac_address) + return False + + def delete_reserved_lease(self, mac_address, dhcp_server_primary=True): + response = self.get_dhcp_client(dhcp_server_primary).delete_reserved_lease( + mac_address) + if response.code == 200: + LOGGER.info('Reserved lease deleted for ' + mac_address) + return True + else: + LOGGER.error('Failed to delete reserved lease for ' + mac_address) + return False + + def setup_single_dhcp_server(self): + # Shutdown the secondary DHCP Server + LOGGER.info('Stopping secondary DHCP server') + if self.stop_dhcp_server(False): + LOGGER.info('Secondary DHCP server stop command success') + time.sleep(3) # Give some time for the server to stop + if not self.get_dhcp_server_status(False): + LOGGER.info('Secondary DHCP server stopped') + if self.disable_failover(True): + LOGGER.info('Primary DHCP server failover disabled') + return True + else: + LOGGER.error('Failed to disable primary DHCP server failover') + return False + else: + LOGGER.error('Secondary DHCP server still running') + return False + else: + LOGGER.error('Failed to stop secondary DHCP server') + return False + + def wait_for_lease_expire(self, lease): + expiration = datetime.strptime(lease['expires'], '%Y-%m-%d %H:%M:%S') + time_to_expire = expiration - datetime.now() + LOGGER.info('Time until lease expiration: ' + str(time_to_expire)) + LOGGER.info('Waiting for current lease to expire: ' + str(expiration)) + if time_to_expire.total_seconds() > 0: + time.sleep(time_to_expire.total_seconds() + + 5) # Wait until the expiration time and padd 5 seconds + LOGGER.info('Current lease expired.')