From 917639e3d56f772d7f18cf5d0e25c8848c600500 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Thu, 11 Jan 2024 12:28:03 -0700 Subject: [PATCH] Implement PR223 for clean merge into release --- modules/test/conn/python/requirements.txt | 3 +- .../test/conn/python/src/connection_module.py | 131 ++++++++---------- modules/test/conn/python/src/dhcp_util.py | 67 ++++++--- 3 files changed, 110 insertions(+), 91 deletions(-) diff --git a/modules/test/conn/python/requirements.txt b/modules/test/conn/python/requirements.txt index c2275b3e0..7244e9e75 100644 --- a/modules/test/conn/python/requirements.txt +++ b/modules/test/conn/python/requirements.txt @@ -1,2 +1,3 @@ pyOpenSSL -scapy \ No newline at end of file +scapy +python-dateutil \ No newline at end of file diff --git a/modules/test/conn/python/src/connection_module.py b/modules/test/conn/python/src/connection_module.py index 3fc0e6765..9429541de 100644 --- a/modules/test/conn/python/src/connection_module.py +++ b/modules/test/conn/python/src/connection_module.py @@ -14,7 +14,6 @@ """Connection test module""" import util import time -import datetime import traceback from scapy.all import rdpcap, DHCP, Ether, IPv6, ICMPv6ND_NS from test_module import TestModule @@ -85,7 +84,8 @@ def _connection_shared_address(self, config): def _connection_dhcp_address(self): LOGGER.info('Running connection.dhcp_address') - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) if lease is not None: if 'ip' in lease: ip_addr = lease['ip'] @@ -176,7 +176,7 @@ def _connection_target_ping(self): else: return False, 'Device does not respond to ping' - def _connection_ipaddr_ip_change(self,config): + def _connection_ipaddr_ip_change(self, config): result = None LOGGER.info('Running connection.ipaddr.ip_change') # Resolve the configured lease wait time @@ -184,7 +184,8 @@ def _connection_ipaddr_ip_change(self,config): self._lease_wait_time_sec = config['lease_wait_time_sec'] if self._dhcp_util.setup_single_dhcp_server(): - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) if lease is not None: LOGGER.info('Current device lease resolved') LOGGER.debug(str(lease)) @@ -192,7 +193,8 @@ def _connection_ipaddr_ip_change(self,config): ip_address = '10.10.10.30' if self._dhcp_util.add_reserved_lease(lease['hostname'], lease['hw_addr'], ip_address): - self._dhcp_util.wait_for_lease_expire(lease) + self._dhcp_util.wait_for_lease_expire(lease, + self._lease_wait_time_sec) LOGGER.info('Checking device accepted new ip') for _ in range(5): LOGGER.info('Pinging device at IP: ' + ip_address) @@ -216,12 +218,13 @@ def _connection_ipaddr_ip_change(self,config): self._dhcp_util.restore_failover_dhcp_server() LOGGER.info('Waiting 30 seconds for reserved lease to expire') time.sleep(30) - self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=self._lease_wait_time_sec) + self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) else: result = None, 'Failed to configure network for test' return result - def _connection_ipaddr_dhcp_failover(self,config): + def _connection_ipaddr_dhcp_failover(self, config): result = None LOGGER.info('Running connection.ipaddr.dhcp_failover') @@ -235,17 +238,20 @@ def _connection_ipaddr_dhcp_failover(self,config): secondary_status = self._dhcp_util.get_dhcp_server_status( dhcp_server_primary=False) if primary_status and secondary_status: - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) if lease is not None: LOGGER.info('Current device lease resolved') if self._dhcp_util.is_lease_active(lease): # Shutdown the primary server if self._dhcp_util.stop_dhcp_server(dhcp_server_primary=True): # Wait until the current lease is expired - self._dhcp_util.wait_for_lease_expire(lease) + self._dhcp_util.wait_for_lease_expire(lease, + self._lease_wait_time_sec) # Make sure the device has received a new lease from the # secondary server - if self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=self._lease_wait_time_sec): + if self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec): if self._dhcp_util.is_lease_active(lease): result = True, ('Secondary DHCP server lease confirmed active ' 'in device') @@ -294,9 +300,8 @@ def _connection_ipv6_slaac(self): return result def _has_slaac_addres(self): - packet_capture = (rdpcap(STARTUP_CAPTURE_FILE) - + rdpcap(MONITOR_CAPTURE_FILE) - + rdpcap(DHCP_CAPTURE_FILE)) + packet_capture = (rdpcap(STARTUP_CAPTURE_FILE) + + rdpcap(MONITOR_CAPTURE_FILE) + rdpcap(DHCP_CAPTURE_FILE)) sends_ipv6 = False for packet_number, packet in enumerate(packet_capture, start=1): if IPv6 in packet and packet.src == self._device_mac: @@ -366,7 +371,8 @@ def setup_single_dhcp_server(self): if response.code == 200: LOGGER.info('Secondary DHCP server stopped') LOGGER.info('Configuring primary DHCP server') - # Move primary DHCP server from failover into a single DHCP server config + # Move primary DHCP server from failover into + # a single DHCP server config response = self.dhcp1_client.disable_failover() if response.code == 200: LOGGER.info('Primary DHCP server failover disabled') @@ -410,7 +416,7 @@ def _run_subnet_test(self, config): # Resolve the configured lease wait time if 'lease_wait_time_sec' in config: - self._lease_wait_time_sec = config['lease_wait_time_sec'] + self._lease_wait_time_sec = config['lease_wait_time_sec'] response = self.dhcp1_client.get_dhcp_range() cur_range = {} @@ -428,12 +434,13 @@ def _run_subnet_test(self, config): dhcp_setup = self.setup_single_dhcp_server() if dhcp_setup[0]: LOGGER.info(dhcp_setup[1]) - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) if lease is not None: if self._dhcp_util.is_lease_active(lease): results = self.test_subnets(ranges) else: - LOGGER.info("Failed to confirm a valid active lease for the device") + LOGGER.info('Failed to confirm a valid active lease for the device') return None, 'Failed to confirm a valid active lease for the device' else: LOGGER.error(dhcp_setup[1]) @@ -459,15 +466,18 @@ def _run_subnet_test(self, config): self.restore_failover_dhcp_server(cur_range) # Wait for the current lease to expire - self._wait_for_lease_expire(self._dhcp_util.get_cur_lease( - self._device_mac,timeout=self._lease_wait_time_sec)) + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) + self._dhcp_util.wait_for_lease_expire(lease, self._lease_wait_time_sec) # Wait for a new lease to be provided before exiting test # to prevent other test modules from failing LOGGER.info('Checking for new lease') # Subnet changes tend to take longer to pick up so we'll allow # for twice the lease wait time - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=2*self._lease_wait_time_sec) + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=2 * + self._lease_wait_time_sec) if lease is not None: LOGGER.info('Validating subnet for new lease...') in_range = self.is_ip_in_range(lease['ip'], cur_range['start'], @@ -482,47 +492,28 @@ def _run_subnet_test(self, config): return final_result, final_result_details def _test_subnet(self, subnet, lease): - LOGGER.info("Testing subnet: " + str(subnet)) + LOGGER.info('Testing subnet: ' + str(subnet)) if self._change_subnet(subnet): - expiration = datetime.datetime.strptime( - lease['expires'], '%Y-%m-%d %H:%M:%S') - now_utc = datetime.datetime.now( - datetime.timezone.utc).replace(tzinfo=None) - time_to_expire = (expiration - now_utc).total_seconds() - LOGGER.debug('Time until lease expiration: ' + str(time_to_expire)) - LOGGER.info('Waiting for current lease to expire: ' + str(expiration)) - if time_to_expire > 0: - # Wait until the expiration time and add 5 seconds - time.sleep(time_to_expire + 5) - LOGGER.debug('Current lease expired. Checking for new lease') - LOGGER.debug('Checking for new lease') - # Subnet changes tend to take longer to pick up so we'll allow - # for twice the lease wait time - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=2*self._lease_wait_time_sec) - if lease is not None: - LOGGER.debug('New lease found: ' + str(lease)) - LOGGER.debug('Validating subnet for new lease...') - in_range = self.is_ip_in_range(lease['ip'], subnet['start'], - subnet['end']) - LOGGER.info('Lease within subnet: ' + str(in_range)) - return in_range - else: - LOGGER.info('Device did not receive lease in subnet') - return False + self._dhcp_util.wait_for_lease_expire(lease, self._lease_wait_time_sec) + LOGGER.debug('Checking for new lease') + # Subnet changes tend to take longer to pick up so we'll allow + # for twice the lease wait time + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=2 * + self._lease_wait_time_sec) + if lease is not None: + LOGGER.debug('New lease found: ' + str(lease)) + LOGGER.debug('Validating subnet for new lease...') + in_range = self.is_ip_in_range(lease['ip'], subnet['start'], + subnet['end']) + LOGGER.info('Lease within subnet: ' + str(in_range)) + return in_range + else: + LOGGER.info('Device did not receive lease in subnet') + return False else: LOGGER.error('Failed to change subnet') - def _wait_for_lease_expire(self, lease): - expiration = datetime.datetime.strptime( - lease['expires'], '%Y-%m-%d %H:%M:%S') - time_to_expire = expiration - datetime.datetime.now() - LOGGER.info('Time until lease expiration: ' + str(time_to_expire)) - LOGGER.info('Waiting for current lease to expire: ' + str(expiration)) - if time_to_expire.total_seconds() > 0: - time.sleep(time_to_expire.total_seconds() + - 5) # Wait until the expiration time and padd 5 seconds - LOGGER.info('Current lease expired.') - def _change_subnet(self, subnet): LOGGER.info('Changing subnet to: ' + str(subnet)) response = self.dhcp1_client.set_dhcp_range(subnet['start'], subnet['end']) @@ -549,25 +540,25 @@ def test_subnets(self, subnets): result = self._test_subnet(subnet, lease) if result: result = { - 'result': - True, - 'details': - 'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' passed' + 'result': + True, + 'details': + 'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' passed' } else: result = { - 'result': - False, - 'details': - 'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' failed' + 'result': + False, + 'details': + 'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' failed' } else: result = { - 'result': - None, - 'details': - 'Device does not have active lease, cannot test subnet change. ' + - 'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' skipped' + 'result': + None, + 'details': + 'Device does not have active lease, cannot test subnet change. ' + + 'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' skipped' } except Exception as e: # pylint: disable=W0718 LOGGER.error('Subnet test failed: ' + str(e)) diff --git a/modules/test/conn/python/src/dhcp_util.py b/modules/test/conn/python/src/dhcp_util.py index 6d063d62a..7dc543820 100644 --- a/modules/test/conn/python/src/dhcp_util.py +++ b/modules/test/conn/python/src/dhcp_util.py @@ -17,6 +17,7 @@ import time from datetime import datetime import util +from dateutil import tz LOG_NAME = 'dhcp_util' LOGGER = None @@ -125,17 +126,22 @@ def get_cur_lease(self, mac_address, timeout): Retrieve the current lease for a given MAC address with retries. Args: - mac_address (str): The MAC address of the client whose lease is being queried. - timeout (int): The maximum time (in seconds) to wait for a lease to be found. + mac_address (str): The MAC address of the client whose + lease is being queried. + timeout (int): The maximum time (in seconds) to wait + for a lease to be found. Returns: - str or None: The lease information as a string if found, or None if no lease is found within the timeout. + str or None: The lease information as a string if found, + or None if no lease is found within the timeout. Note: - This method will attempt to query both primary and secondary DHCP servers for the lease, - with a 5-second pause between retries until the `timeout` is reached. + This method will attempt to query both primary and secondary + DHCP servers for the lease, with a 5-second pause between + retries until the `timeout` is reached. """ - LOGGER.info('Resolving current lease with max wait time of ' + str(timeout) + ' seconds') + LOGGER.info('Resolving current lease with max wait time of ' + + str(timeout) + ' seconds') start_time = time.time() while True: @@ -146,24 +152,27 @@ def get_cur_lease(self, mac_address, timeout): def _get_cur_lease(self, mac_address): """ - Retrieve the current lease for a given MAC address from both primary and secondary DHCP servers. + Retrieve the current lease for a given MAC address from both + primary and secondary DHCP servers. Args: - mac_address (str): The MAC address of the client whose lease is being queried. + mac_address (str): The MAC address of the client whose + lease is being queried. Returns: - str or None: The lease information as a string if found, or None if no lease is found. + str or None: The lease information as a string if found, + or None if no lease is found. """ primary = False lease = self._get_cur_lease_from_server(mac_address=mac_address, dhcp_server_primary=True) if lease is not None: - primary=True + primary = True else: lease = self._get_cur_lease_from_server(mac_address=mac_address, dhcp_server_primary=False) if lease is not None: - lease['primary']=primary + lease['primary'] = primary log_msg = 'DHCP lease resolved from ' log_msg += 'primary' if lease['primary'] else 'secondary' log_msg += ' server' @@ -176,7 +185,8 @@ def _get_cur_lease_from_server(self, mac_address, dhcp_server_primary=True): # Check if the server is online first, old lease files can still return # lease information that is no longer valid when a dhcp server is shutdown if self.get_dhcp_server_status(dhcp_server_primary): - response = self.get_dhcp_client(dhcp_server_primary).get_lease(mac_address) + response = self.get_dhcp_client(dhcp_server_primary).get_lease( + mac_address) if response.code == 200: lease_resp = eval(response.message) # pylint: disable=W0123 if lease_resp: # Check if non-empty lease @@ -245,12 +255,29 @@ def setup_single_dhcp_server(self): LOGGER.error('Failed to stop secondary DHCP server') return False - def wait_for_lease_expire(self, lease): - expiration = datetime.strptime(lease['expires'], '%Y-%m-%d %H:%M:%S') - time_to_expire = expiration - datetime.now() - LOGGER.info('Time until lease expiration: ' + str(time_to_expire)) + def wait_for_lease_expire(self, lease, max_wait_time=30): + expiration_utc = datetime.strptime(lease['expires'], '%Y-%m-%d %H:%M:%S') + # lease information stored in UTC so we need to convert to local time + expiration = self.utc_to_local(expiration_utc) + time_to_expire = expiration - datetime.now(tz=tz.tzlocal()) + # Wait until the expiration time and padd 5 seconds + # If wait time is longer than max_wait_time, only wait + # for the max wait time + wait_time = min(max_wait_time, + time_to_expire.total_seconds() + + 5) if time_to_expire.total_seconds() > 0 else 0 + LOGGER.info('Time until lease expiration: ' + str(wait_time)) LOGGER.info('Waiting for current lease to expire: ' + str(expiration)) - if time_to_expire.total_seconds() > 0: - time.sleep(time_to_expire.total_seconds() + - 5) # Wait until the expiration time and padd 5 seconds - LOGGER.info('Current lease expired.') + if wait_time > 0: + time.sleep(wait_time) + LOGGER.info('Current lease expired.') + + # Convert from a UTC datetime to the local time zone + def utc_to_local(self, utc_datetime): + # Set the time zone for the UTC datetime + utc = utc_datetime.replace(tzinfo=tz.tzutc()) + + # Convert to local time zone + local_datetime = utc.astimezone(tz.tzlocal()) + + return local_datetime \ No newline at end of file