Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion modules/test/conn/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pyOpenSSL
scapy
scapy
python-dateutil
131 changes: 61 additions & 70 deletions modules/test/conn/python/src/connection_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""Connection test module"""
import util
import time
import datetime
import traceback
from scapy.all import rdpcap, DHCP, Ether, IPv6, ICMPv6ND_NS
from test_module import TestModule
Expand Down Expand Up @@ -85,7 +84,8 @@ def _connection_shared_address(self, config):

def _connection_dhcp_address(self):
LOGGER.info('Running connection.dhcp_address')
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec)
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=self._lease_wait_time_sec)
if lease is not None:
if 'ip' in lease:
ip_addr = lease['ip']
Expand Down Expand Up @@ -176,23 +176,25 @@ def _connection_target_ping(self):
else:
return False, 'Device does not respond to ping'

def _connection_ipaddr_ip_change(self,config):
def _connection_ipaddr_ip_change(self, config):
result = None
LOGGER.info('Running connection.ipaddr.ip_change')
# Resolve the configured lease wait time
if 'lease_wait_time_sec' in config:
self._lease_wait_time_sec = config['lease_wait_time_sec']

if self._dhcp_util.setup_single_dhcp_server():
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec)
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=self._lease_wait_time_sec)
if lease is not None:
LOGGER.info('Current device lease resolved')
LOGGER.debug(str(lease))
# Figure out how to calculate a valid IP address
ip_address = '10.10.10.30'
if self._dhcp_util.add_reserved_lease(lease['hostname'],
lease['hw_addr'], ip_address):
self._dhcp_util.wait_for_lease_expire(lease)
self._dhcp_util.wait_for_lease_expire(lease,
self._lease_wait_time_sec)
LOGGER.info('Checking device accepted new ip')
for _ in range(5):
LOGGER.info('Pinging device at IP: ' + ip_address)
Expand All @@ -216,12 +218,13 @@ def _connection_ipaddr_ip_change(self,config):
self._dhcp_util.restore_failover_dhcp_server()
LOGGER.info('Waiting 30 seconds for reserved lease to expire')
time.sleep(30)
self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=self._lease_wait_time_sec)
self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=self._lease_wait_time_sec)
else:
result = None, 'Failed to configure network for test'
return result

def _connection_ipaddr_dhcp_failover(self,config):
def _connection_ipaddr_dhcp_failover(self, config):
result = None
LOGGER.info('Running connection.ipaddr.dhcp_failover')

Expand All @@ -235,17 +238,20 @@ def _connection_ipaddr_dhcp_failover(self,config):
secondary_status = self._dhcp_util.get_dhcp_server_status(
dhcp_server_primary=False)
if primary_status and secondary_status:
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec)
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=self._lease_wait_time_sec)
if lease is not None:
LOGGER.info('Current device lease resolved')
if self._dhcp_util.is_lease_active(lease):
# Shutdown the primary server
if self._dhcp_util.stop_dhcp_server(dhcp_server_primary=True):
# Wait until the current lease is expired
self._dhcp_util.wait_for_lease_expire(lease)
self._dhcp_util.wait_for_lease_expire(lease,
self._lease_wait_time_sec)
# Make sure the device has received a new lease from the
# secondary server
if self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=self._lease_wait_time_sec):
if self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=self._lease_wait_time_sec):
if self._dhcp_util.is_lease_active(lease):
result = True, ('Secondary DHCP server lease confirmed active '
'in device')
Expand Down Expand Up @@ -294,9 +300,8 @@ def _connection_ipv6_slaac(self):
return result

def _has_slaac_addres(self):
packet_capture = (rdpcap(STARTUP_CAPTURE_FILE)
+ rdpcap(MONITOR_CAPTURE_FILE)
+ rdpcap(DHCP_CAPTURE_FILE))
packet_capture = (rdpcap(STARTUP_CAPTURE_FILE) +
rdpcap(MONITOR_CAPTURE_FILE) + rdpcap(DHCP_CAPTURE_FILE))
sends_ipv6 = False
for packet_number, packet in enumerate(packet_capture, start=1):
if IPv6 in packet and packet.src == self._device_mac:
Expand Down Expand Up @@ -366,7 +371,8 @@ def setup_single_dhcp_server(self):
if response.code == 200:
LOGGER.info('Secondary DHCP server stopped')
LOGGER.info('Configuring primary DHCP server')
# Move primary DHCP server from failover into a single DHCP server config
# Move primary DHCP server from failover into
# a single DHCP server config
response = self.dhcp1_client.disable_failover()
if response.code == 200:
LOGGER.info('Primary DHCP server failover disabled')
Expand Down Expand Up @@ -410,7 +416,7 @@ def _run_subnet_test(self, config):

# Resolve the configured lease wait time
if 'lease_wait_time_sec' in config:
self._lease_wait_time_sec = config['lease_wait_time_sec']
self._lease_wait_time_sec = config['lease_wait_time_sec']

response = self.dhcp1_client.get_dhcp_range()
cur_range = {}
Expand All @@ -428,12 +434,13 @@ def _run_subnet_test(self, config):
dhcp_setup = self.setup_single_dhcp_server()
if dhcp_setup[0]:
LOGGER.info(dhcp_setup[1])
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec)
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=self._lease_wait_time_sec)
if lease is not None:
if self._dhcp_util.is_lease_active(lease):
results = self.test_subnets(ranges)
else:
LOGGER.info("Failed to confirm a valid active lease for the device")
LOGGER.info('Failed to confirm a valid active lease for the device')
return None, 'Failed to confirm a valid active lease for the device'
else:
LOGGER.error(dhcp_setup[1])
Expand All @@ -459,15 +466,18 @@ def _run_subnet_test(self, config):
self.restore_failover_dhcp_server(cur_range)

# Wait for the current lease to expire
self._wait_for_lease_expire(self._dhcp_util.get_cur_lease(
self._device_mac,timeout=self._lease_wait_time_sec))
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=self._lease_wait_time_sec)
self._dhcp_util.wait_for_lease_expire(lease, self._lease_wait_time_sec)

# Wait for a new lease to be provided before exiting test
# to prevent other test modules from failing
LOGGER.info('Checking for new lease')
# Subnet changes tend to take longer to pick up so we'll allow
# for twice the lease wait time
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=2*self._lease_wait_time_sec)
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=2 *
self._lease_wait_time_sec)
if lease is not None:
LOGGER.info('Validating subnet for new lease...')
in_range = self.is_ip_in_range(lease['ip'], cur_range['start'],
Expand All @@ -482,47 +492,28 @@ def _run_subnet_test(self, config):
return final_result, final_result_details

def _test_subnet(self, subnet, lease):
LOGGER.info("Testing subnet: " + str(subnet))
LOGGER.info('Testing subnet: ' + str(subnet))
if self._change_subnet(subnet):
expiration = datetime.datetime.strptime(
lease['expires'], '%Y-%m-%d %H:%M:%S')
now_utc = datetime.datetime.now(
datetime.timezone.utc).replace(tzinfo=None)
time_to_expire = (expiration - now_utc).total_seconds()
LOGGER.debug('Time until lease expiration: ' + str(time_to_expire))
LOGGER.info('Waiting for current lease to expire: ' + str(expiration))
if time_to_expire > 0:
# Wait until the expiration time and add 5 seconds
time.sleep(time_to_expire + 5)
LOGGER.debug('Current lease expired. Checking for new lease')
LOGGER.debug('Checking for new lease')
# Subnet changes tend to take longer to pick up so we'll allow
# for twice the lease wait time
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,timeout=2*self._lease_wait_time_sec)
if lease is not None:
LOGGER.debug('New lease found: ' + str(lease))
LOGGER.debug('Validating subnet for new lease...')
in_range = self.is_ip_in_range(lease['ip'], subnet['start'],
subnet['end'])
LOGGER.info('Lease within subnet: ' + str(in_range))
return in_range
else:
LOGGER.info('Device did not receive lease in subnet')
return False
self._dhcp_util.wait_for_lease_expire(lease, self._lease_wait_time_sec)
LOGGER.debug('Checking for new lease')
# Subnet changes tend to take longer to pick up so we'll allow
# for twice the lease wait time
lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac,
timeout=2 *
self._lease_wait_time_sec)
if lease is not None:
LOGGER.debug('New lease found: ' + str(lease))
LOGGER.debug('Validating subnet for new lease...')
in_range = self.is_ip_in_range(lease['ip'], subnet['start'],
subnet['end'])
LOGGER.info('Lease within subnet: ' + str(in_range))
return in_range
else:
LOGGER.info('Device did not receive lease in subnet')
return False
else:
LOGGER.error('Failed to change subnet')

def _wait_for_lease_expire(self, lease):
expiration = datetime.datetime.strptime(
lease['expires'], '%Y-%m-%d %H:%M:%S')
time_to_expire = expiration - datetime.datetime.now()
LOGGER.info('Time until lease expiration: ' + str(time_to_expire))
LOGGER.info('Waiting for current lease to expire: ' + str(expiration))
if time_to_expire.total_seconds() > 0:
time.sleep(time_to_expire.total_seconds() +
5) # Wait until the expiration time and padd 5 seconds
LOGGER.info('Current lease expired.')

def _change_subnet(self, subnet):
LOGGER.info('Changing subnet to: ' + str(subnet))
response = self.dhcp1_client.set_dhcp_range(subnet['start'], subnet['end'])
Expand All @@ -549,25 +540,25 @@ def test_subnets(self, subnets):
result = self._test_subnet(subnet, lease)
if result:
result = {
'result':
True,
'details':
'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' passed'
'result':
True,
'details':
'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' passed'
}
else:
result = {
'result':
False,
'details':
'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' failed'
'result':
False,
'details':
'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' failed'
}
else:
result = {
'result':
None,
'details':
'Device does not have active lease, cannot test subnet change. ' +
'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' skipped'
'result':
None,
'details':
'Device does not have active lease, cannot test subnet change. ' +
'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' skipped'
}
except Exception as e: # pylint: disable=W0718
LOGGER.error('Subnet test failed: ' + str(e))
Expand Down
67 changes: 47 additions & 20 deletions modules/test/conn/python/src/dhcp_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
from datetime import datetime
import util
from dateutil import tz

LOG_NAME = 'dhcp_util'
LOGGER = None
Expand Down Expand Up @@ -125,17 +126,22 @@ def get_cur_lease(self, mac_address, timeout):
Retrieve the current lease for a given MAC address with retries.

Args:
mac_address (str): The MAC address of the client whose lease is being queried.
timeout (int): The maximum time (in seconds) to wait for a lease to be found.
mac_address (str): The MAC address of the client whose
lease is being queried.
timeout (int): The maximum time (in seconds) to wait
for a lease to be found.

Returns:
str or None: The lease information as a string if found, or None if no lease is found within the timeout.
str or None: The lease information as a string if found,
or None if no lease is found within the timeout.

Note:
This method will attempt to query both primary and secondary DHCP servers for the lease,
with a 5-second pause between retries until the `timeout` is reached.
This method will attempt to query both primary and secondary
DHCP servers for the lease, with a 5-second pause between
retries until the `timeout` is reached.
"""
LOGGER.info('Resolving current lease with max wait time of ' + str(timeout) + ' seconds')
LOGGER.info('Resolving current lease with max wait time of ' +
str(timeout) + ' seconds')
start_time = time.time()

while True:
Expand All @@ -146,24 +152,27 @@ def get_cur_lease(self, mac_address, timeout):

def _get_cur_lease(self, mac_address):
"""
Retrieve the current lease for a given MAC address from both primary and secondary DHCP servers.
Retrieve the current lease for a given MAC address from both
primary and secondary DHCP servers.

Args:
mac_address (str): The MAC address of the client whose lease is being queried.
mac_address (str): The MAC address of the client whose
lease is being queried.

Returns:
str or None: The lease information as a string if found, or None if no lease is found.
str or None: The lease information as a string if found,
or None if no lease is found.
"""
primary = False
lease = self._get_cur_lease_from_server(mac_address=mac_address,
dhcp_server_primary=True)
if lease is not None:
primary=True
primary = True
else:
lease = self._get_cur_lease_from_server(mac_address=mac_address,
dhcp_server_primary=False)
if lease is not None:
lease['primary']=primary
lease['primary'] = primary
log_msg = 'DHCP lease resolved from '
log_msg += 'primary' if lease['primary'] else 'secondary'
log_msg += ' server'
Expand All @@ -176,7 +185,8 @@ def _get_cur_lease_from_server(self, mac_address, dhcp_server_primary=True):
# Check if the server is online first, old lease files can still return
# lease information that is no longer valid when a dhcp server is shutdown
if self.get_dhcp_server_status(dhcp_server_primary):
response = self.get_dhcp_client(dhcp_server_primary).get_lease(mac_address)
response = self.get_dhcp_client(dhcp_server_primary).get_lease(
mac_address)
if response.code == 200:
lease_resp = eval(response.message) # pylint: disable=W0123
if lease_resp: # Check if non-empty lease
Expand Down Expand Up @@ -245,12 +255,29 @@ def setup_single_dhcp_server(self):
LOGGER.error('Failed to stop secondary DHCP server')
return False

def wait_for_lease_expire(self, lease):
expiration = datetime.strptime(lease['expires'], '%Y-%m-%d %H:%M:%S')
time_to_expire = expiration - datetime.now()
LOGGER.info('Time until lease expiration: ' + str(time_to_expire))
def wait_for_lease_expire(self, lease, max_wait_time=30):
expiration_utc = datetime.strptime(lease['expires'], '%Y-%m-%d %H:%M:%S')
# lease information stored in UTC so we need to convert to local time
expiration = self.utc_to_local(expiration_utc)
time_to_expire = expiration - datetime.now(tz=tz.tzlocal())
# Wait until the expiration time and padd 5 seconds
# If wait time is longer than max_wait_time, only wait
# for the max wait time
wait_time = min(max_wait_time,
time_to_expire.total_seconds() +
5) if time_to_expire.total_seconds() > 0 else 0
LOGGER.info('Time until lease expiration: ' + str(wait_time))
LOGGER.info('Waiting for current lease to expire: ' + str(expiration))
if time_to_expire.total_seconds() > 0:
time.sleep(time_to_expire.total_seconds() +
5) # Wait until the expiration time and padd 5 seconds
LOGGER.info('Current lease expired.')
if wait_time > 0:
time.sleep(wait_time)
LOGGER.info('Current lease expired.')

# Convert from a UTC datetime to the local time zone
def utc_to_local(self, utc_datetime):
# Set the time zone for the UTC datetime
utc = utc_datetime.replace(tzinfo=tz.tzutc())

# Convert to local time zone
local_datetime = utc.astimezone(tz.tzlocal())

return local_datetime