diff --git a/net_orc/python/src/network_orchestrator.py b/net_orc/python/src/network_orchestrator.py index bb8d77f3d..2852f1565 100644 --- a/net_orc/python/src/network_orchestrator.py +++ b/net_orc/python/src/network_orchestrator.py @@ -190,6 +190,7 @@ def _device_discovered(self, mac_addr): LOGGER.info( f'Device with mac addr {device.mac_addr} has obtained IP address ' f'{device.ip_addr}') + self._start_device_monitor(device) def _dhcp_lease_ack(self, packet): diff --git a/test_orc/modules/nmap/conf/module_config.json b/test_orc/modules/nmap/conf/module_config.json index 5449327a1..aafde4c03 100644 --- a/test_orc/modules/nmap/conf/module_config.json +++ b/test_orc/modules/nmap/conf/module_config.json @@ -34,7 +34,8 @@ "tcp_ports": { "22": { "allowed": true, - "description": "Secure Shell (SSH) server" + "description": "Secure Shell (SSH) server", + "version": "2.0" } }, "description": "Check TELNET port 23 is disabled and TELNET is not running on any port", diff --git a/test_orc/modules/nmap/python/src/nmap_module.py b/test_orc/modules/nmap/python/src/nmap_module.py index 028471bb9..6b5477489 100644 --- a/test_orc/modules/nmap/python/src/nmap_module.py +++ b/test_orc/modules/nmap/python/src/nmap_module.py @@ -25,6 +25,7 @@ class NmapModule(TestModule): """NMAP Test module""" + def __init__(self, module): super().__init__(module_name=module, log_name=LOG_NAME) self._unallowed_ports = [] @@ -67,33 +68,87 @@ def _security_nmap_ports(self, config): LOGGER.debug("UDP scan results: " + str(self._scan_udp_results)) LOGGER.debug("Service scan results: " + str(self._script_scan_results)) self._process_port_results(tests=config) + LOGGER.info("Unallowed Ports Detected: " + str(self._unallowed_ports)) + self._check_unallowed_port(self._unallowed_ports,config) LOGGER.info("Unallowed Ports: " + str(self._unallowed_ports)) - LOGGER.info("Script scan results:\n" + - json.dumps(self._script_scan_results)) return len(self._unallowed_ports) == 0 else: LOGGER.info("Device ip address not resolved, skipping") return None def _process_port_results(self, tests): + scan_results = {} + if self._scan_tcp_results is not None: + scan_results.update(self._scan_tcp_results) + if self._scan_udp_results is not None: + scan_results.update(self._scan_udp_results) + if self._script_scan_results is not None: + scan_results.update(self._script_scan_results) + + self._check_unknown_ports(tests=tests,scan_results=scan_results) + for test in tests: LOGGER.info("Checking results for test: " + str(test)) - self._check_scan_results(test_config=tests[test]) + self._check_scan_results(test_config=tests[test],scan_results=scan_results) + + def _check_unknown_ports(self,tests,scan_results): + """ Check if any of the open ports detected are not defined + in the test configurations. If an open port is detected + without a configuration associated with it, the default behavior + is to mark it as an unallowed port. + """ + known_ports = [] + for test in tests: + if "tcp_ports" in tests[test]: + for port in tests[test]['tcp_ports']: + known_ports.append(port) + if "udp_ports" in tests[test]: + for port in tests[test]['udp_ports']: + known_ports.append(port) + + for port_result in scan_results: + if not port_result in known_ports: + LOGGER.info("Unknown port detected: " + port_result) + unallowed_port = {'port':port_result, + 'service':scan_results[port_result]['service'], + 'tcp_udp':scan_results[port_result]['tcp_udp']} + #self._unallowed_ports.append(unallowed_port) + self._add_unknown_ports(tests,unallowed_port) + + def _add_unknown_ports(self,tests,unallowed_port): + known_service = False + result = {'description':"Undefined port",'allowed':False} + if unallowed_port['tcp_udp'] == 'tcp': + port_style = 'tcp_ports' + elif unallowed_port['tcp_udp'] == 'udp': + port_style = 'udp_ports' + for test in tests: + if unallowed_port['service'] in test: + known_service=True + for test_port in tests[test][port_style]: + if "version" in tests[test][port_style][test_port]: + result['version'] = tests[test][port_style][test_port]['version'] + if "description" in tests[test][port_style][test_port]: + result['description'] = tests[test][port_style][test_port]['description'] + result['inherited_from'] = test_port + if tests[test][port_style][test_port]['allowed']: + result['allowed'] = True + break + + tests[test][port_style][unallowed_port['port']]=result + + if not known_service: + service_name = "security.services.unknown." + str(unallowed_port['port']) + unknown_service = {port_style:{unallowed_port['port']:result}} + tests[service_name]=unknown_service - def _check_scan_results(self, test_config): + def _check_scan_results(self, test_config,scan_results): port_config = {} if "tcp_ports" in test_config: port_config.update(test_config["tcp_ports"]) elif "udp_ports" in test_config: port_config.update(test_config["udp_ports"]) - scan_results = {} - if self._scan_tcp_results is not None: - scan_results.update(self._scan_tcp_results) - if self._scan_udp_results is not None: - scan_results.update(self._scan_udp_results) - if self._script_scan_results is not None: - scan_results.update(self._script_scan_results) if port_config is not None: for port, config in port_config.items(): result = None @@ -103,11 +158,23 @@ def _check_scan_results(self, test_config): if scan_results[port]["state"] == "open": if not config["allowed"]: LOGGER.info("Unallowed port open") - self._unallowed_ports.append(str(port)) + self._unallowed_ports.append( + {"port":str(port), + "service":str(scan_results[port]["service"]), + 'tcp_udp':scan_results[port]['tcp_udp']} + ) result = False else: LOGGER.info("Allowed port open") - result = True + if "version" in config and "version" in scan_results[port]: + version_check = self._check_version(scan_results[port]["service"], + scan_results[port]["version"],config["version"]) + if version_check is not None: + result = version_check + else: + result = True + else: + result = True else: LOGGER.info("Port is closed") result = True @@ -120,6 +187,64 @@ def _check_scan_results(self, test_config): else: config["result"] = "skipped" + def _check_unallowed_port(self,unallowed_ports,tests): + service_allowed=False + allowed = False + version = None + service = None + for port in unallowed_ports: + LOGGER.info('Checking unallowed port: ' + port['port']) + LOGGER.info('Looking for service: ' + port['service']) + LOGGER.info('Unallowed Port Config: ' + str(port)) + if port['tcp_udp'] == 'tcp': + port_style = 'tcp_ports' + elif port['tcp_udp'] == 'udp': + port_style = 'udp_ports' + for test in tests: + LOGGER.info('Checking test: ' + str(test)) + if port['service'] in test: + service_config = tests[test] + service = port['service'] + for service_port in service_config[port_style]: + port_config = service_config[port_style][service_port] + service_allowed |= port_config['allowed'] + version = port_config['version'] if 'version' in port_config else None + if service_allowed: + LOGGER.info("Unallowed port detected for allowed service: " + service) + if version is not None: + allowed = self._check_version(service=service, + version_detected=self._scan_tcp_results[port['port']]['version'], + version_expected=version) + else: + allowed = True + if allowed: + LOGGER.info("Unallowed port exception for approved service: " + port['port']) + for u_port in self._unallowed_ports: + if port['port'] in u_port['port']: + self._unallowed_ports.remove(u_port) + break + break + + def _check_version(self,service,version_detected,version_expected): + """Check if the version specified for the service matches what was + detected by nmap. Since there is no consistency in how nmap service + results are returned, each service that needs a checked must be + implemented individually. If a service version is requested + that is not implemented, this test will provide a skip (None) + result. + """ + LOGGER.info("Checking version for service: " + service) + LOGGER.info("NMAP Version Detected: " + version_detected) + LOGGER.info("Version Expected: " + version_expected) + version_check = None + match service: + case "ssh": + version_check = f"protocol {version_expected}" in version_detected + case _: + LOGGER.info("No version check implemented for service: " + service + ". Skipping") + LOGGER.info("Version check result: " + str(version_check)) + return version_check + def _scan_scripts(self, tests): scan_results = {} LOGGER.info("Checing for scan scripts") @@ -169,25 +294,15 @@ def _scan_udp_with_script(self, script_name, ports=None): nmap_results = util.run_command("nmap " + nmap_options + self._device_ipv4_addr)[0] LOGGER.info("Nmap UDP script scan complete") - LOGGER.info("nmap script results\n" + str(nmap_results)) return self._process_nmap_results(nmap_results=nmap_results) def _scan_tcp_ports(self, tests): max_port = 1000 - ports = [] - for test in tests: - test_config = tests[test] - if "tcp_ports" in test_config: - for port in test_config["tcp_ports"]: - if int(port) > max_port: - ports.append(port) - ports_to_scan = "1-" + str(max_port) - if len(ports) > 0: - ports_to_scan += "," + ",".join(ports) LOGGER.info("Running nmap TCP port scan") - LOGGER.info("TCP ports: " + str(ports_to_scan)) - nmap_results = util.run_command(f"""nmap -sT -sV -Pn -v -p {ports_to_scan} + nmap_results = util.run_command( + f"""nmap --open -sT -sV -Pn -v -p 1-{max_port} --version-intensity 7 -T4 {self._device_ipv4_addr}""")[0] + LOGGER.info("TCP port scan complete") self._scan_tcp_results = self._process_nmap_results( nmap_results=nmap_results) @@ -213,7 +328,7 @@ def _process_nmap_results(self, nmap_results): results = {} LOGGER.info("nmap results\n" + str(nmap_results)) if nmap_results: - if "Service Info" in nmap_results: + if "Service Info" in nmap_results and "MAC Address" not in nmap_results: rows = nmap_results.split("PORT")[1].split("Service Info")[0].split( "\n") elif "PORT" in nmap_results: @@ -232,6 +347,7 @@ def _process_nmap_results(self, nmap_results): version = " ".join(cols[3:]) port_result = { cols[0].split("/")[0]: { + "tcp_udp":cols[0].split("/")[1], "state": cols[1], "service": cols[2], "version": version