Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions modules/network/dhcp-1/python/src/grpc_server/dhcp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ def enable_failover(self):
for subnet in self._subnets:
subnet.enable_peer()

def get_peer(self):
return self._peer

def get_subnets(self):
return self._subnets

def get_reserved_host(self, hw_addr):
for host in self._reserved_hosts:
if hw_addr == host.hw_addr:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ def test_resolve_config(self):
def test_disable_failover(self):
DHCP_CONFIG.disable_failover()
print('Test Disable Config:\n' + str(DHCP_CONFIG))
config_lines = str(DHCP_CONFIG._peer).split('\n')
config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertTrue(line.startswith('#'))

def test_enable_failover(self):
DHCP_CONFIG.enable_failover()
print('Test Enable Config:\n' + str(DHCP_CONFIG))
config_lines = str(DHCP_CONFIG._peer).split('\n')
config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertFalse(line.startswith('#'))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def GetDHCPRange(self, request, context): # pylint: disable=W0613
"""
LOGGER.info('Get DHCP range called')
try:
pool = self._get_dhcp_config()._subnets[0].pools[0]
pool = self._get_dhcp_config().get_subnets()[0].pools[0]
return pb2.DHCPRange(code=200, start=pool.range_start, end=pool.range_end)
except Exception as e: # pylint: disable=W0718
fail_message = 'Failed to get DHCP range: ' + str(e)
Expand Down
6 changes: 6 additions & 0 deletions modules/network/dhcp-2/python/src/grpc_server/dhcp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ def enable_failover(self):
for subnet in self._subnets:
subnet.enable_peer()

def get_peer(self):
return self._peer

def get_subnets(self):
return self._subnets

def get_reserved_host(self, hw_addr):
for host in self._reserved_hosts:
if hw_addr == host.hw_addr:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ def test_resolve_config(self):
def test_disable_failover(self):
DHCP_CONFIG.disable_failover()
print('Test Disable Config:\n' + str(DHCP_CONFIG))
config_lines = str(DHCP_CONFIG._peer).split('\n')
config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertTrue(line.startswith('#'))

def test_enable_failover(self):
DHCP_CONFIG.enable_failover()
print('Test Enable Config:\n' + str(DHCP_CONFIG))
config_lines = str(DHCP_CONFIG._peer).split('\n')
config_lines = str(DHCP_CONFIG.get_peer()).split('\n')
for line in config_lines:
self.assertFalse(line.startswith('#'))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def GetDHCPRange(self, request, context): # pylint: disable=W0613
"""
LOGGER.info('Get DHCP range called')
try:
pool = self._get_dhcp_config()._subnets[0].pools[0]
pool = self._get_dhcp_config().get_subnets()[0].pools[0]
return pb2.DHCPRange(code=200, start=pool.range_start, end=pool.range_end)
except Exception as e: # pylint: disable=W0718
fail_message = 'Failed to get DHCP range: ' + str(e)
Expand Down
3 changes: 2 additions & 1 deletion modules/network/ntp/python/src/chronyd.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ def is_running(self):
LOGGER.info('Checking chronyd server')
running = os.path.exists(PID_FILE)
LOGGER.info('chronyd server status: ' + str(running))
return running
return running

2 changes: 1 addition & 1 deletion modules/network/ntp/python/src/ntp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from common import logger
from chronyd import ChronydServer
import time

LOGGER = None
LOG_NAME = 'ntp_server'

class NTPServer:
Expand Down
35 changes: 22 additions & 13 deletions modules/test/base/python/src/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,31 @@
class TestModule:
"""An example test module."""

def __init__(self, module_name, log_name, log_dir=None,conf_file=CONF_FILE,results_dir=RESULTS_DIR):
def __init__(self,
module_name,
log_name,
log_dir=None,
conf_file=CONF_FILE,
results_dir=RESULTS_DIR):
self._module_name = module_name
self._results_dir=results_dir if results_dir is not None else RESULTS_DIR
self._device_mac = os.environ.get('DEVICE_MAC','')
self._ipv4_addr = os.environ.get('IPV4_ADDR','')
self._ipv4_subnet = os.environ.get('IPV4_SUBNET','')
self._ipv6_subnet = os.environ.get('IPV6_SUBNET','')
self._add_logger(log_name=log_name, module_name=module_name, log_dir=log_dir)
self._config = self._read_config(conf_file=conf_file if conf_file is not None else CONF_FILE)
self._results_dir = results_dir if results_dir is not None else RESULTS_DIR
self._device_mac = os.environ.get('DEVICE_MAC', '')
self._ipv4_addr = os.environ.get('IPV4_ADDR', '')
self._ipv4_subnet = os.environ.get('IPV4_SUBNET', '')
self._ipv6_subnet = os.environ.get('IPV6_SUBNET', '')
self._add_logger(log_name=log_name,
module_name=module_name,
log_dir=log_dir)
self._config = self._read_config(
conf_file=conf_file if conf_file is not None else CONF_FILE)
self._device_ipv4_addr = None
self._device_ipv6_addr = None

def _add_logger(self, log_name, module_name, log_dir=None):
global LOGGER
LOGGER = logger.get_logger(log_name, module_name, log_dir=log_dir)
LOGGER = logger.get_logger(name=log_name,
log_file=module_name,
log_dir=log_dir)

def generate_module_report(self):
pass
Expand Down Expand Up @@ -100,7 +110,7 @@ def run_tests(self):
result = getattr(self, test_method_name)(config=test['config'])
else:
result = getattr(self, test_method_name)()
except Exception as e:
except Exception as e: # pylint: disable=W0718
LOGGER.error(f'An error occurred whilst running {test["name"]}')
LOGGER.error(e)
else:
Expand Down Expand Up @@ -141,8 +151,7 @@ def run_tests(self):
test['description'] = 'An error occured whilst running this test'

# Remove the steps to resolve if compliant already
if (test['result'] == 'Compliant' and
'recommendations' in test):
if (test['result'] == 'Compliant' and 'recommendations' in test):
test.pop('recommendations')

test['end'] = datetime.now().isoformat()
Expand All @@ -153,7 +162,7 @@ def run_tests(self):
json_results = json.dumps({'results': tests}, indent=2)
self._write_results(json_results)

def _read_config(self,conf_file=CONF_FILE):
def _read_config(self, conf_file=CONF_FILE):
with open(conf_file, encoding='utf-8') as f:
config = json.load(f)
return config
Expand Down
19 changes: 8 additions & 11 deletions modules/test/conn/python/src/connection_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ def _connection_switch_arp_inspection(self):
no_arp = False

# Check MAC address matches IP address
if (arp_packet.hwsrc == self._device_mac and
(arp_packet.psrc != self._device_ipv4_addr
and arp_packet.psrc != '0.0.0.0')):
if (arp_packet.hwsrc == self._device_mac
and (arp_packet.psrc not in (self._device_ipv4_addr, '0.0.0.0'))):
LOGGER.info(f'Bad ARP packet detected for MAC: {self._device_mac}')
LOGGER.info(f'''ARP packet from IP {arp_packet.psrc}
does not match {self._device_ipv4_addr}''')
Expand Down Expand Up @@ -205,13 +204,11 @@ def _connection_single_ip(self):
LOGGER.info('Inspecting: ' + str(len(packets)) + ' packets')
for packet in packets:
if DHCP in packet:
for option in packet[DHCP].options:
# message-type, option 3 = DHCPREQUEST
if self._get_dhcp_type(packet) == 3:
mac_address = packet[Ether].src
LOGGER.info('DHCPREQUEST detected MAC address: ' + mac_address)
if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX):
mac_addresses.add(mac_address.upper())
if self._get_dhcp_type(packet) == 3:
mac_address = packet[Ether].src
LOGGER.info('DHCPREQUEST detected MAC address: ' + mac_address)
if not mac_address.startswith(TR_CONTAINER_MAC_PREFIX):
mac_addresses.add(mac_address.upper())

# Check if the device mac address is in the list of DHCPREQUESTs
result = self._device_mac.upper() in mac_addresses
Expand Down Expand Up @@ -637,4 +634,4 @@ def test_subnets(self, subnets):
LOGGER.error(traceback.format_exc())
result = {'result': False, 'details': 'Subnet test failed: ' + str(e)}
results.append(result)
return results
return results
12 changes: 6 additions & 6 deletions modules/test/dns/python/src/dns_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ def __init__(self,
log_dir=None,
conf_file=None,
results_dir=None,
DNS_SERVER_CAPTURE_FILE=DNS_SERVER_CAPTURE_FILE,
STARTUP_CAPTURE_FILE=STARTUP_CAPTURE_FILE,
MONITOR_CAPTURE_FILE=MONITOR_CAPTURE_FILE):
dns_server_capture_file=DNS_SERVER_CAPTURE_FILE,
startup_capture_file=STARTUP_CAPTURE_FILE,
monitor_capture_file=MONITOR_CAPTURE_FILE):
super().__init__(module_name=module,
log_name=LOG_NAME,
log_dir=log_dir,
conf_file=conf_file,
results_dir=results_dir)
self.dns_server_capture_file=DNS_SERVER_CAPTURE_FILE
self.startup_capture_file=STARTUP_CAPTURE_FILE
self.monitor_capture_file=MONITOR_CAPTURE_FILE
self.dns_server_capture_file=dns_server_capture_file
self.startup_capture_file=startup_capture_file
self.monitor_capture_file=monitor_capture_file
self._dns_server = '10.10.10.4'
global LOGGER
LOGGER = self._get_logger()
Expand Down
Loading