From 17f3ac7b06fbbc69bb42f091ac37517b02d0859e Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Thu, 28 Mar 2024 09:19:05 +0000 Subject: [PATCH 1/2] Fix pylint score --- .../src/net_orc/network_orchestrator.py | 3 +- framework/python/src/net_orc/ovs_control.py | 33 +++++++++++-------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/framework/python/src/net_orc/network_orchestrator.py b/framework/python/src/net_orc/network_orchestrator.py index 5fad51ebc..e36a75624 100644 --- a/framework/python/src/net_orc/network_orchestrator.py +++ b/framework/python/src/net_orc/network_orchestrator.py @@ -211,7 +211,8 @@ def _device_discovered(self, mac_addr): LOGGER.info( f'Device with mac addr {device.mac_addr} has obtained IP address ' f'{device.ip_addr}') - # self._ovs.add_arp_inspection_filter(ip_address=device.ip_addr,mac_address=device.mac_addr) + self._ovs.add_arp_inspection_filter(ip_address=device.ip_addr, + mac_address=device.mac_addr) self._start_device_monitor(device) diff --git a/framework/python/src/net_orc/ovs_control.py b/framework/python/src/net_orc/ovs_control.py index 015d6b7a2..9f0373345 100644 --- a/framework/python/src/net_orc/ovs_control.py +++ b/framework/python/src/net_orc/ovs_control.py @@ -155,30 +155,36 @@ def add_dhcp_filters(self,dhcp_server_primary_ip,dhcp_server_secondary_ip): # Allow DHCP traffic from primary server allow_primary_dhcp_server = ( - f'table=0, dl_type=0x800, priority=65535, tp_src=67, tp_dst=68, nw_src={dhcp_server_primary_ip}, actions=normal') + f'''table=0, dl_type=0x800, priority=65535, + tp_src=67, tp_dst=68, nw_src={dhcp_server_primary_ip}, actions=normal''') self.add_flow(bridge_name=DEVICE_BRIDGE,flow=allow_primary_dhcp_server) # Allow DHCP traffic from secondary server allow_secondary_dhcp_server = ( - f'table=0, dl_type=0x800, priority=65535, tp_src=67, tp_dst=68, nw_src={dhcp_server_secondary_ip}, actions=normal') + f'''table=0, dl_type=0x800, priority=65535, tp_src=67, + tp_dst=68, nw_src={dhcp_server_secondary_ip}, actions=normal''') self.add_flow(bridge_name=DEVICE_BRIDGE,flow=allow_secondary_dhcp_server) # Drop DHCP packets not associated with known servers - drop_dhcp_flow = 'table=0, dl_type=0x800, priority=0, tp_src=67, tp_dst=68, actions=drop' + drop_dhcp_flow = '''table=0, dl_type=0x800, priority=0, tp_src=67, + tp_dst=68, actions=drop''' self.add_flow(bridge_name=DEVICE_BRIDGE,flow=drop_dhcp_flow) def add_arp_inspection_filter(self,ip_address,mac_address): # Allow ARP packets with known MAC-to-IP mappings - allow_known_arps= f'table=0, cookie={DEVICER_ARP_COOKIE}, priority=65535, arp, arp_tpa={ip_address}, arp_tha={mac_address}, action=normal' + allow_known_arps= f'''table=0, cookie={DEVICER_ARP_COOKIE}, priority=65535, + arp, arp_tpa={ip_address}, arp_tha={mac_address}, action=normal''' self.add_flow(bridge_name=DEVICE_BRIDGE,flow=allow_known_arps) - DHCP1_MAC = f'{CONTAINER_MAC_PREFIX}:02' - DHCP2_MAC = f'{CONTAINER_MAC_PREFIX}:03' - DHCP1_IP = '10.10.10.2' - DHCP2_IP = '10.10.10.3' + dhcp1_mac = f'{CONTAINER_MAC_PREFIX}:02' + dhcp2_mac = f'{CONTAINER_MAC_PREFIX}:03' + dhcp1_ip = '10.10.10.2' + dhcp2_ip = '10.10.10.3' - dhcp_1_arps= f'table=0, priority=65535, arp, arp_tpa={DHCP1_IP}, arp_tha={DHCP1_MAC}, action=normal' - dhcp_2_arps= f'table=0, priority=65535, arp, arp_tpa={DHCP2_IP}, arp_tha={DHCP2_MAC}, action=normal' + dhcp_1_arps= f'''table=0, priority=65535, arp, arp_tpa={dhcp1_ip}, + arp_tha={dhcp1_mac}, action=normal''' + dhcp_2_arps= f'''table=0, priority=65535, arp, arp_tpa={dhcp2_ip}, + arp_tha={dhcp2_mac}, action=normal''' self.add_flow(bridge_name=DEVICE_BRIDGE,flow=dhcp_1_arps) self.add_flow(bridge_name=DEVICE_BRIDGE,flow=dhcp_2_arps) @@ -190,9 +196,10 @@ def add_arp_inspection_filter(self,ip_address,mac_address): self.add_flow(bridge_name=DEVICE_BRIDGE,flow=drop_unknown_arps) def delete_arp_inspection_filter(self): - self.delete_flow(bridge_name=DEVICE_BRIDGE,flow=f'cookie={DEVICER_ARP_COOKIE}/-1') - self.delete_flow(bridge_name=DEVICE_BRIDGE,flow=f'cookie={UNKNOWN_ARP_COOKIE}/-1') - + self.delete_flow(bridge_name=DEVICE_BRIDGE, + flow=f'cookie={DEVICER_ARP_COOKIE}/-1') + self.delete_flow(bridge_name=DEVICE_BRIDGE, + flow=f'cookie={UNKNOWN_ARP_COOKIE}/-1') def delete_bridge(self, bridge_name): LOGGER.debug('Deleting OVS Bridge: ' + bridge_name) From 68c31264a4e68e1af60008afbb5a22c8bc5b4801 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Thu, 28 Mar 2024 14:37:09 +0000 Subject: [PATCH 2/2] Fix pylint and statuses --- .../src/net_orc/network_orchestrator.py | 4 +-- framework/python/src/net_orc/ovs_control.py | 26 ++++++++++--------- modules/test/base/bin/start_module | 4 +-- modules/test/base/python/src/test_module.py | 20 ++++++++++---- .../test/conn/python/src/connection_module.py | 15 ++++++++--- .../protocol/python/src/protocol_module.py | 8 ++++++ modules/test/tls/python/src/tls_module.py | 8 +++--- 7 files changed, 57 insertions(+), 28 deletions(-) diff --git a/framework/python/src/net_orc/network_orchestrator.py b/framework/python/src/net_orc/network_orchestrator.py index e36a75624..90dce8500 100644 --- a/framework/python/src/net_orc/network_orchestrator.py +++ b/framework/python/src/net_orc/network_orchestrator.py @@ -211,8 +211,8 @@ def _device_discovered(self, mac_addr): LOGGER.info( f'Device with mac addr {device.mac_addr} has obtained IP address ' f'{device.ip_addr}') - self._ovs.add_arp_inspection_filter(ip_address=device.ip_addr, - mac_address=device.mac_addr) + #self._ovs.add_arp_inspection_filter(ip_address=device.ip_addr, + # mac_address=device.mac_addr) self._start_device_monitor(device) diff --git a/framework/python/src/net_orc/ovs_control.py b/framework/python/src/net_orc/ovs_control.py index 9f0373345..08faa52c1 100644 --- a/framework/python/src/net_orc/ovs_control.py +++ b/framework/python/src/net_orc/ovs_control.py @@ -155,25 +155,27 @@ def add_dhcp_filters(self,dhcp_server_primary_ip,dhcp_server_secondary_ip): # Allow DHCP traffic from primary server allow_primary_dhcp_server = ( - f'''table=0, dl_type=0x800, priority=65535, - tp_src=67, tp_dst=68, nw_src={dhcp_server_primary_ip}, actions=normal''') + 'table=0, dl_type=0x800, priority=65535, tp_src=67, ' + + f'tp_dst=68, nw_src={dhcp_server_primary_ip}, actions=normal') self.add_flow(bridge_name=DEVICE_BRIDGE,flow=allow_primary_dhcp_server) # Allow DHCP traffic from secondary server allow_secondary_dhcp_server = ( - f'''table=0, dl_type=0x800, priority=65535, tp_src=67, - tp_dst=68, nw_src={dhcp_server_secondary_ip}, actions=normal''') + 'table=0, dl_type=0x800, priority=65535, ' + + f'tp_src=67, tp_dst=68, nw_src={dhcp_server_secondary_ip},' + + ' actions=normal''') self.add_flow(bridge_name=DEVICE_BRIDGE,flow=allow_secondary_dhcp_server) # Drop DHCP packets not associated with known servers - drop_dhcp_flow = '''table=0, dl_type=0x800, priority=0, tp_src=67, - tp_dst=68, actions=drop''' + drop_dhcp_flow = ('table=0, dl_type=0x800, priority=0, ' + + 'tp_src=67, tp_dst=68, actions=drop') self.add_flow(bridge_name=DEVICE_BRIDGE,flow=drop_dhcp_flow) def add_arp_inspection_filter(self,ip_address,mac_address): # Allow ARP packets with known MAC-to-IP mappings - allow_known_arps= f'''table=0, cookie={DEVICER_ARP_COOKIE}, priority=65535, - arp, arp_tpa={ip_address}, arp_tha={mac_address}, action=normal''' + allow_known_arps= (f'table=0, cookie={DEVICER_ARP_COOKIE}, ' + + f'priority=65535, arp, arp_tpa={ip_address}, ' + + f'arp_tha={mac_address}, action=normal') self.add_flow(bridge_name=DEVICE_BRIDGE,flow=allow_known_arps) dhcp1_mac = f'{CONTAINER_MAC_PREFIX}:02' @@ -181,10 +183,10 @@ def add_arp_inspection_filter(self,ip_address,mac_address): dhcp1_ip = '10.10.10.2' dhcp2_ip = '10.10.10.3' - dhcp_1_arps= f'''table=0, priority=65535, arp, arp_tpa={dhcp1_ip}, - arp_tha={dhcp1_mac}, action=normal''' - dhcp_2_arps= f'''table=0, priority=65535, arp, arp_tpa={dhcp2_ip}, - arp_tha={dhcp2_mac}, action=normal''' + dhcp_1_arps= ('table=0, priority=65535, arp, ' + + f'arp_tpa={dhcp1_ip}, arp_tha={dhcp1_mac}, action=normal') + dhcp_2_arps= ('table=0, priority=65535, arp, ' + + f'arp_tpa={dhcp2_ip}, arp_tha={dhcp2_mac}, action=normal') self.add_flow(bridge_name=DEVICE_BRIDGE,flow=dhcp_1_arps) self.add_flow(bridge_name=DEVICE_BRIDGE,flow=dhcp_2_arps) diff --git a/modules/test/base/bin/start_module b/modules/test/base/bin/start_module index 69f399feb..0ee68fa6a 100644 --- a/modules/test/base/bin/start_module +++ b/modules/test/base/bin/start_module @@ -98,5 +98,5 @@ fi # Small pause to let all core services stabalize sleep 3 -# Start the networking service -$BIN_DIR/start_test_module $MODULE_NAME $IFACE > /runtime/output/container.log \ No newline at end of file +# Start the test module +$BIN_DIR/start_test_module $MODULE_NAME $IFACE \ No newline at end of file diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py index beedaf7bf..a81b97caf 100644 --- a/modules/test/base/python/src/test_module.py +++ b/modules/test/base/python/src/test_module.py @@ -82,7 +82,7 @@ def run_tests(self): if self._config['config']['network']: self._device_ipv4_addr = self._get_device_ipv4() - LOGGER.info('Device IP Resolved: ' + str(self._device_ipv4_addr)) + LOGGER.info('Resolved device IP: ' + str(self._device_ipv4_addr)) tests = self._get_tests() for test in tests: @@ -109,22 +109,32 @@ def run_tests(self): LOGGER.debug(f'Test {test["name"]} is disabled') if result is not None: - # Compliant or non-compliant + # Compliant or non-compliant as a boolean only if isinstance(result, bool): test['result'] = 'Compliant' if result else 'Non-Compliant' test['description'] = 'No description was provided for this test' else: + # Skipped result if result[0] is None: test['result'] = 'Skipped' if len(result) > 1: test['description'] = result[1] else: test['description'] = 'An error occured whilst running this test' - else: + # Compliant / Non-Compliant result + elif isinstance(result[0], bool): test['result'] = 'Compliant' if result[0] else 'Non-Compliant' - test['description'] = result[1] + # Result may be a string, e.g error + elif result[0] == 'Error': + test['result'] = result[0] + + # Check that description is a string + if isinstance(result[1], str): + test['description'] = result[1] + else: + test['description'] = 'No description was provided for this test' else: - test['result'] = 'Skipped' + test['result'] = 'Error' test['description'] = 'An error occured whilst running this test' # Remove the steps to resolve if compliant already diff --git a/modules/test/conn/python/src/connection_module.py b/modules/test/conn/python/src/connection_module.py index 05d10e992..e0c7f1d1a 100644 --- a/modules/test/conn/python/src/connection_module.py +++ b/modules/test/conn/python/src/connection_module.py @@ -77,6 +77,14 @@ def __init__(self, module): def _connection_switch_arp_inspection(self): LOGGER.info('Running connection.switch.arp_inspection') + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is None: + self._device_ipv4_addr = self._get_device_ipv4() + + if self._device_ipv4_addr is None: + LOGGER.error('No device IP could be resolved') + return 'Error', 'Could not resolve device IP address' + no_arp = True # Read all the pcap files @@ -99,7 +107,8 @@ def _connection_switch_arp_inspection(self): if (arp_packet.hwsrc == self._device_mac and arp_packet.psrc != self._device_ipv4_addr): LOGGER.info(f'Bad ARP packet detected for MAC: {self._device_mac}') - LOGGER.info(f'ARP packet IP {arp_packet.psrc} does not match {self._device_ipv4_addr}') + LOGGER.info(f'''ARP packet from IP {arp_packet.psrc} does not match + {self._device_ipv4_addr}''') return False, 'Device is sending false ARP response' if no_arp: @@ -127,7 +136,7 @@ def _connection_switch_dhcp_snooping(self): dhcp_type = self._get_dhcp_type(packet) if dhcp_type in disallowed_dhcp_types: return False, 'Device has sent disallowed DHCP message' - + return True, 'Device does not act as a DHCP server' def _connection_private_address(self, config): @@ -230,7 +239,7 @@ def _connection_target_ping(self): if self._device_ipv4_addr is None: LOGGER.error('No device IP could be resolved') - return False, 'Could not resolve device IP' + return 'Error', 'Could not resolve device IP address' else: if self._ping(self._device_ipv4_addr): return True, 'Device responds to ping' diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index 7cbdabb2d..0c9936524 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -35,6 +35,14 @@ def _protocol_valid_bacnet(self): result = None interface_name = 'veth0' + # If the ipv4 address wasn't resolved yet, try again + if self._device_ipv4_addr is None: + self._device_ipv4_addr = self._get_device_ipv4() + + if self._device_ipv4_addr is None: + LOGGER.error('No device IP could be resolved') + return 'Error', 'Could not resolve device IP address' + # Resolve the appropriate IP for BACnet comms local_address = self.get_local_ip(interface_name) if local_address: diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index e5c87bf16..472d403b2 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -241,7 +241,7 @@ def _security_tls_v1_2_server(self): tls_1_3_results) else: LOGGER.error('Could not resolve device IP address. Skipping') - return None, 'Could not resolve device IP address' + return 'Error', 'Could not resolve device IP address' def _security_tls_v1_3_server(self): LOGGER.info('Running security.tls.v1_3_server') @@ -252,7 +252,7 @@ def _security_tls_v1_3_server(self): tls_version='1.3') else: LOGGER.error('Could not resolve device IP address. Skipping') - return None, 'Could not resolve device IP address' + return 'Error', 'Could not resolve device IP address' def _security_tls_v1_2_client(self): LOGGER.info('Running security.tls.v1_2_client') @@ -262,7 +262,7 @@ def _security_tls_v1_2_client(self): return self._validate_tls_client(self._device_ipv4_addr, '1.2') else: LOGGER.error('Could not resolve device IP address. Skipping') - return None, 'Could not resolve device IP address' + return 'Error', 'Could not resolve device IP address' def _security_tls_v1_3_client(self): LOGGER.info('Running security.tls.v1_3_client') @@ -272,7 +272,7 @@ def _security_tls_v1_3_client(self): return self._validate_tls_client(self._device_ipv4_addr, '1.3') else: LOGGER.error('Could not resolve device IP address. Skipping') - return None, 'Could not resolve device IP address' + return 'Error', 'Could not resolve device IP address' def _validate_tls_client(self, client_ip, tls_version): client_results = self._tls_util.validate_tls_client(