Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions modules/test/conn/conf/module_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@
]
}
},
{
"name": "connection.shared_address",
"description": "Ensure the device supports RFC 6598 IANA-Reserved IPv4 Prefix for Shared Address Space",
"expected_behavior": "The device under test accepts IP addresses within the ranges specified in RFC 6598 and communicates using these addresses",
"config": {
"ranges": [
{
"start": "100.64.0.1",
"end": "100.64.255.254"
}
]
}
},
{
"name": "connection.single_ip",
"description": "The network switch port connected to the device reports only one IP address for the device under test.",
Expand Down
150 changes: 78 additions & 72 deletions modules/test/conn/python/src/connection_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,80 +68,12 @@ def __init__(self, module):
# print("Set Range: " + str(response))

def _connection_private_address(self, config):
# Shutdown the secondary DHCP Server
LOGGER.info('Running connection.private_address')
return self._run_subnet_test(config)

# Resolve the configured dhcp subnet ranges
ranges = None
if 'ranges' in config:
ranges = config['ranges']
else:
LOGGER.error('No subnet ranges configured for test. Skipping')
return None, 'No subnet ranges configured for test. Skipping'

response = self.dhcp1_client.get_dhcp_range()
cur_range = {}
if response.code == 200:
cur_range['start'] = response.start
cur_range['end'] = response.end
LOGGER.info('Current DHCP subnet range: ' + str(cur_range))
else:
LOGGER.error('Failed to resolve current subnet range required '
'for restoring network')
return None, ('Failed to resolve current subnet range required '
'for restoring network')

results = []
dhcp_setup = self.setup_single_dhcp_server()
if dhcp_setup[0]:
LOGGER.info(dhcp_setup[1])
lease = self._get_cur_lease()
if lease is not None:
if self._is_lease_active(lease):
results = self.test_subnets(ranges)
else:
return None, 'Failed to confirm a valid active lease for the device'
else:
LOGGER.error(dhcp_setup[1])
return None, 'Failed to setup DHCP server for test'

# Process and return final results
final_result = None
final_result_details = ''
for result in results:
if final_result is None:
final_result = result['result']
else:
final_result &= result['result']
final_result_details += result['details'] + '\n'

try:
# Restore failover configuration of DHCP servers
self.restore_failover_dhcp_server(cur_range)

# Wait for the current lease to expire
self._wait_for_lease_expire(self._get_cur_lease())

# Wait for a new lease to be provided before exiting test
# to prevent other test modules from failing
for _ in range(5):
LOGGER.info('Checking for new lease')
lease = self._get_cur_lease()
if lease is not None:
LOGGER.info('New Lease found: ' + str(lease))
LOGGER.info('Validating subnet for new lease...')
in_range = self.is_ip_in_range(lease['ip'], cur_range['start'],
cur_range['end'])
LOGGER.info('Lease within subnet: ' + str(in_range))
break
else:
LOGGER.info('New lease not found. Waiting to check again')
time.sleep(5)

except Exception as e: # pylint: disable=W0718
LOGGER.error('Failed to restore DHCP server configuration: ' + str(e))

return final_result, final_result_details
def _connection_shared_address(self, config):
LOGGER.info('Running connection.shared_address')
return self._run_subnet_test(config)

def _connection_dhcp_address(self):
LOGGER.info('Running connection.dhcp_address')
Expand Down Expand Up @@ -343,6 +275,80 @@ def is_ip_in_range(self, ip, start_ip, end_ip):

return start_int <= ip_int <= end_int

def _run_subnet_test(self,config):
# Resolve the configured dhcp subnet ranges
ranges = None
if 'ranges' in config:
ranges = config['ranges']
else:
LOGGER.error('No subnet ranges configured for test. Skipping')
return None, 'No subnet ranges configured for test. Skipping'

response = self.dhcp1_client.get_dhcp_range()
cur_range = {}
if response.code == 200:
cur_range['start'] = response.start
cur_range['end'] = response.end
LOGGER.info('Current DHCP subnet range: ' + str(cur_range))
else:
LOGGER.error('Failed to resolve current subnet range required '
'for restoring network')
return None, ('Failed to resolve current subnet range required '
'for restoring network')

results = []
dhcp_setup = self.setup_single_dhcp_server()
if dhcp_setup[0]:
LOGGER.info(dhcp_setup[1])
lease = self._get_cur_lease()
if lease is not None:
if self._is_lease_active(lease):
results = self.test_subnets(ranges)
else:
return None, 'Failed to confirm a valid active lease for the device'
else:
LOGGER.error(dhcp_setup[1])
return None, 'Failed to setup DHCP server for test'

# Process and return final results
final_result = None
final_result_details = ''
for result in results:
if final_result is None:
final_result = result['result']
else:
final_result &= result['result']
final_result_details += result['details'] + '\n'

try:
# Restore failover configuration of DHCP servers
self.restore_failover_dhcp_server(cur_range)

# Wait for the current lease to expire
self._wait_for_lease_expire(self._get_cur_lease())

# Wait for a new lease to be provided before exiting test
# to prevent other test modules from failing
for _ in range(5):
LOGGER.info('Checking for new lease')
lease = self._get_cur_lease()
if lease is not None:
LOGGER.info('New Lease found: ' + str(lease))
LOGGER.info('Validating subnet for new lease...')
in_range = self.is_ip_in_range(lease['ip'], cur_range['start'],
cur_range['end'])
LOGGER.info('Lease within subnet: ' + str(in_range))
break
else:
LOGGER.info('New lease not found. Waiting to check again')
time.sleep(5)

except Exception as e: # pylint: disable=W0718
LOGGER.error('Failed to restore DHCP server configuration: ' + str(e))

return final_result, final_result_details


def _test_subnet(self, subnet, lease):
if self._change_subnet(subnet):
expiration = datetime.strptime(lease['expires'], '%Y-%m-%d %H:%M:%S')
Expand Down