Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ venv/
error
pylint.out
__pycache__/
build/
build/
testing/unit_test/temp
3 changes: 2 additions & 1 deletion local/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
system.json
devices
devices
root_certs
22 changes: 13 additions & 9 deletions modules/test/base/python/src/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def _get_device_tests(self, device_test_module):
if 'tests' in device_test_module:
if test['name'] in device_test_module['tests']:
dev_test_config = device_test_module['tests'][test['name']]
if 'enabled' in dev_test_config:
test['enabled'] = dev_test_config['enabled']
if 'config' in test and 'config' in dev_test_config:
test['config'].update(dev_test_config['config'])
return module_tests
Expand All @@ -81,17 +83,19 @@ def run_tests(self):
test_method_name = '_' + test['name'].replace('.', '_')
result = None
test['start'] = datetime.now().isoformat()
LOGGER.info('Attempting to run test: ' + test['name'])
# Resolve the correct python method by test name and run test
if hasattr(self, test_method_name):
if 'config' in test:
result = getattr(self, test_method_name)(config=test['config'])
if ('enabled' in test and test['enabled']) or 'enabled' not in test:
LOGGER.info('Attempting to run test: ' + test['name'])
# Resolve the correct python method by test name and run test
if hasattr(self, test_method_name):
if 'config' in test:
result = getattr(self, test_method_name)(config=test['config'])
else:
result = getattr(self, test_method_name)()
else:
result = getattr(self, test_method_name)()
LOGGER.info(f'Test {test["name"]} not resolved. Skipping')
result = None
else:
LOGGER.info(f'Test {test["name"]} not resolved. Skipping')
result = None

LOGGER.info(f'Test {test["name"]} disabled. Skipping')
if result is not None:
if isinstance(result, bool):
test['result'] = 'compliant' if result else 'non-compliant'
Expand Down
10 changes: 10 additions & 0 deletions modules/test/conn/conf/module_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@
"description": "The device under test responds to an ICMP echo (ping) request.",
"expected_behavior": "The device under test responds to an ICMP echo (ping) request."
},
{
"name": "connection.ipaddr.ip_change",
"description": "The device responds to a ping (ICMP echo request) to the new IP address it has received after the initial dHCP lease has expired.",
"expected_behavior": "If the lease expires before the client receiveds a DHCPACK, the client moves to INIT state, MUST immediately stop any other network processing and requires network initialization parameters as if the client were uninitialized. If the client then receives a DHCPACK allocating the client its previous network addres, the client SHOULD continue network processing. If the client is given a new network address, it MUST NOT continue using the previous network address and SHOULD notify the local users of the problem."
},
{
"name": "connection.ipaddr.dhcp_failover",
"description": "The device has requested a DHCPREQUEST/REBIND to the DHCP failover server after the primary DHCP server has been brought down.",
"expected_behavior": ""
},
{
"name": "connection.ipv6_slaac",
"description": "The device forms a valid IPv6 address as a combination of the IPv6 router prefix and the device interface identifier",
Expand Down
99 changes: 90 additions & 9 deletions modules/test/conn/python/src/connection_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from test_module import TestModule
from dhcp1.client import Client as DHCPClient1
from dhcp2.client import Client as DHCPClient2
from dhcp_util import DHCPUtil

LOG_NAME = 'test_connection'
LOGGER = None
Expand All @@ -39,6 +40,7 @@ def __init__(self, module):
LOGGER = self._get_logger()
self.dhcp1_client = DHCPClient1()
self.dhcp2_client = DHCPClient2()
self._dhcp_util = DHCPUtil(self.dhcp1_client, self.dhcp2_client, LOGGER)

# ToDo: Move this into some level of testing, leave for
# reference until tests are implemented with these calls
Expand Down Expand Up @@ -131,7 +133,7 @@ def _connection_single_ip(self):
LOGGER.info('Inspecting: ' + str(len(packets)) + ' packets')
for packet in packets:
# Option[1] = message-type, option 3 = DHCPREQUEST
if DHCP in packet and packet[DHCP].options[0][1] == 3:
if DHCP in packet and packet[DHCP].options[0][1] == 3:
mac_address = packet[Ether].src
if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX):
mac_addresses.add(mac_address.upper())
Expand All @@ -151,14 +153,93 @@ def _connection_target_ping(self):

# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is None:
self._device_ipv4_addr = self._get_device_ipv4(self)
self._device_ipv4_addr = self._get_device_ipv4()

if self._device_ipv4_addr is None:
LOGGER.error('No device IP could be resolved')
sys.exit(1)
else:
return self._ping(self._device_ipv4_addr)

def _connection_ipaddr_ip_change(self):
result = None
LOGGER.info('Running connection.ipaddr.ip_change')
if self._dhcp_util.setup_single_dhcp_server():
lease = self._dhcp_util.get_cur_lease(self._device_mac)
if lease is not None:
LOGGER.info('Current device lease resolved: ' + str(lease))
# Figure out how to calculate a valid IP address
ip_address = '10.10.10.30'
if self._dhcp_util.add_reserved_lease(lease['hostname'],
lease['hw_addr'], ip_address):
self._dhcp_util.wait_for_lease_expire(lease)
LOGGER.info('Checking device accepted new ip')
for _ in range(5):
LOGGER.info('Pinging device at IP: ' + ip_address)
if self._ping(ip_address):
LOGGER.info('Ping Success')
LOGGER.info('Reserved lease confirmed active in device')
result = True, 'Device has accepted an IP address change'
LOGGER.info('Restoring DHCP failover configuration')
break
else:
LOGGER.info('Device did not respond to ping')
result = False, 'Device did not accept IP address change'
time.sleep(5) # Wait 5 seconds before trying again
self._dhcp_util.delete_reserved_lease(lease['hw_addr'])
else:
result = None, 'Failed to create reserved lease for device'
else:
result = None, 'Device has no current DHCP lease'
# Restore the network
self._dhcp_util.restore_failover_dhcp_server()
LOGGER.info("Waiting 30 seconds for reserved lease to expire")
time.sleep(30)
self._dhcp_util.get_new_lease(self._device_mac)
else:
result = None, 'Failed to configure network for test'
return result

def _connection_ipaddr_dhcp_failover(self):
result = None
# Confirm that both servers are online
primary_status = self._dhcp_util.get_dhcp_server_status(
dhcp_server_primary=True)
secondary_status = self._dhcp_util.get_dhcp_server_status(
dhcp_server_primary=False)
if primary_status and secondary_status:
lease = self._dhcp_util.get_cur_lease(self._device_mac)
if lease is not None:
LOGGER.info('Current device lease resolved: ' + str(lease))
if self._dhcp_util.is_lease_active(lease):
# Shutdown the primary server
if self._dhcp_util.stop_dhcp_server(dhcp_server_primary=True):
# Wait until the current lease is expired
self._dhcp_util.wait_for_lease_expire(lease)
# Make sure the device has received a new lease from the
# secondary server
if self._dhcp_util.get_new_lease(self._device_mac,
dhcp_server_primary=False):
if self._dhcp_util.is_lease_active(lease):
result = True, ('Secondary DHCP server lease confirmed active '
'in device')
else:
result = False, 'Could not validate lease is active in device'
else:
result = False, ('Device did not recieve a new lease from '
'secondary DHCP server')
self._dhcp_util.start_dhcp_server(dhcp_server_primary=True)
else:
result = None, 'Failed to shutdown primary DHCP server'
else:
result = False, 'Device did not respond to ping'
else:
result = None, 'Device has no current DHCP lease'
else:
LOGGER.error('Network is not ready for this test. Skipping')
result = None, 'Network is not ready for this test'
return result

def _get_oui_manufacturer(self, mac_address):
# Do some quick fixes on the format of the mac_address
# to match the oui file pattern
Expand Down Expand Up @@ -207,7 +288,7 @@ def _connection_ipv6_ping(self):
return False

def _ping(self, host):
cmd = "ping -c 1 " + str(host)
cmd = 'ping -c 1 ' + str(host)
success = util.run_command(cmd, output=False)
return success

Expand Down Expand Up @@ -275,7 +356,7 @@ def is_ip_in_range(self, ip, start_ip, end_ip):

return start_int <= ip_int <= end_int

def _run_subnet_test(self,config):
def _run_subnet_test(self, config):
# Resolve the configured dhcp subnet ranges
ranges = None
if 'ranges' in config:
Expand All @@ -292,9 +373,9 @@ def _run_subnet_test(self,config):
LOGGER.info('Current DHCP subnet range: ' + str(cur_range))
else:
LOGGER.error('Failed to resolve current subnet range required '
'for restoring network')
'for restoring network')
return None, ('Failed to resolve current subnet range required '
'for restoring network')
'for restoring network')

results = []
dhcp_setup = self.setup_single_dhcp_server()
Expand Down Expand Up @@ -343,7 +424,7 @@ def _run_subnet_test(self,config):
LOGGER.info('New lease not found. Waiting to check again')
time.sleep(5)

except Exception as e: # pylint: disable=W0718
except Exception as e: # pylint: disable=W0718
LOGGER.error('Failed to restore DHCP server configuration: ' + str(e))

return final_result, final_result_details
Expand Down Expand Up @@ -401,7 +482,7 @@ def _get_cur_lease(self):
LOGGER.info('Checking current device lease')
response = self.dhcp1_client.get_lease(self._device_mac)
if response.code == 200:
lease = eval(response.message) # pylint: disable=W0123
lease = eval(response.message) # pylint: disable=W0123
if lease: # Check if non-empty lease
return lease
else:
Expand Down Expand Up @@ -439,7 +520,7 @@ def test_subnets(self, subnets):
'details':
'Subnet ' + subnet['start'] + '-' + subnet['end'] + ' failed'
}
except Exception as e: # pylint: disable=W0718
except Exception as e: # pylint: disable=W0718
result = {'result': False, 'details': 'Subnet test failed: ' + str(e)}
results.append(result)
return results
Loading