diff --git a/test_orc/modules/nmap/nmap.Dockerfile b/test_orc/modules/nmap/nmap.Dockerfile index 12f23dde7..3a8728d9f 100644 --- a/test_orc/modules/nmap/nmap.Dockerfile +++ b/test_orc/modules/nmap/nmap.Dockerfile @@ -1,6 +1,12 @@ # Image name: test-run/baseline-test FROM test-run/base-test:latest +#Load the requirements file +COPY modules/nmap/python/requirements.txt /testrun/python + +#Install all python requirements for the module +RUN pip3 install -r /testrun/python/requirements.txt + # Copy over all configuration files COPY modules/nmap/conf /testrun/conf diff --git a/test_orc/modules/nmap/python/requirements.txt b/test_orc/modules/nmap/python/requirements.txt new file mode 100644 index 000000000..42669b12c --- /dev/null +++ b/test_orc/modules/nmap/python/requirements.txt @@ -0,0 +1 @@ +xmltodict \ No newline at end of file diff --git a/test_orc/modules/nmap/python/src/nmap_module.py b/test_orc/modules/nmap/python/src/nmap_module.py index 6b5477489..ea013f413 100644 --- a/test_orc/modules/nmap/python/src/nmap_module.py +++ b/test_orc/modules/nmap/python/src/nmap_module.py @@ -17,6 +17,8 @@ import util import json import threading +import xmltodict +import re from test_module import TestModule LOG_NAME = "test_nmap" @@ -35,6 +37,7 @@ def __init__(self, module): global LOGGER LOGGER = self._get_logger() + def _security_nmap_ports(self, config): LOGGER.info("Running security.nmap.ports test") @@ -88,7 +91,7 @@ def _process_port_results(self, tests): self._check_unknown_ports(tests=tests,scan_results=scan_results) for test in tests: - LOGGER.info("Checking results for test: " + str(test)) + LOGGER.info("Checking scan results for test: " + str(test)) self._check_scan_results(test_config=tests[test],scan_results=scan_results) def _check_unknown_ports(self,tests,scan_results): @@ -122,8 +125,16 @@ def _add_unknown_ports(self,tests,unallowed_port): port_style = 'tcp_ports' elif unallowed_port['tcp_udp'] == 'udp': port_style = 'udp_ports' + + LOGGER.info("Unknown Port Service: " + unallowed_port['service']) for test in tests: - if unallowed_port['service'] in test: + LOGGER.debug("Checking for known service: " + test) + # Create a regular expression pattern to match the variable at the + # end of the string + port_service = r"\b" + re.escape(unallowed_port['service']) + r"\b$" + service_match = re.search(port_service, test) + if service_match: + LOGGER.info("Service Matched: " + test) known_service=True for test_port in tests[test][port_style]: if "version" in tests[test][port_style][test_port]: @@ -134,8 +145,8 @@ def _add_unknown_ports(self,tests,unallowed_port): if tests[test][port_style][test_port]['allowed']: result['allowed'] = True break - tests[test][port_style][unallowed_port['port']]=result + break if not known_service: service_name = "security.services.unknown." + str(unallowed_port['port']) @@ -195,14 +206,19 @@ def _check_unallowed_port(self,unallowed_ports,tests): for port in unallowed_ports: LOGGER.info('Checking unallowed port: ' + port['port']) LOGGER.info('Looking for service: ' + port['service']) - LOGGER.info('Unallowed Port Config: ' + str(port)) + LOGGER.debug('Unallowed Port Config: ' + str(port)) if port['tcp_udp'] == 'tcp': port_style = 'tcp_ports' elif port['tcp_udp'] == 'udp': port_style = 'udp_ports' for test in tests: - LOGGER.info('Checking test: ' + str(test)) - if port['service'] in test: + LOGGER.debug('Checking test: ' + str(test)) + # Create a regular expression pattern to match the variable at the + # end of the string + port_service = r"\b" + re.escape(port['service']) + r"\b$" + service_match = re.search(port_service, test) + if service_match: + LOGGER.info("Service Matched: " + test) service_config = tests[test] service = port['service'] for service_port in service_config[port_style]: @@ -247,7 +263,7 @@ def _check_version(self,service,version_detected,version_expected): def _scan_scripts(self, tests): scan_results = {} - LOGGER.info("Checing for scan scripts") + LOGGER.info("Checking for scan scripts") for test in tests: test_config = tests[test] if "tcp_ports" in test_config: @@ -256,14 +272,15 @@ def _scan_scripts(self, tests): if "service_scan" in port_config: LOGGER.info("Service Scan Detected for: " + str(port)) svc = port_config["service_scan"] - scan_results.update(self._scan_tcp_with_script(svc["script"])) + result = self._scan_tcp_with_script(svc["script"]) + scan_results.update(result) if "udp_ports" in test_config: for port in test_config["udp_ports"]: if "service_scan" in port: LOGGER.info("Service Scan Detected for: " + str(port)) svc = port["service_scan"] - self._scan_udp_with_script(svc["script"], port) - scan_results.update(self._scan_tcp_with_script(svc["script"])) + result = self._scan_udp_with_script(svc["script"], port) + scan_results.update(result) self._script_scan_results = scan_results def _scan_tcp_with_script(self, script_name, ports=None): @@ -275,12 +292,12 @@ def _scan_tcp_with_script(self, script_name, ports=None): else: port_options += " -p" + ports + " " results_file = f"/runtime/output/{self._module_name}-script_name.log" - nmap_options = scan_options + port_options + " -oG " + results_file + nmap_options = scan_options + port_options + " " + results_file + " -oX -" nmap_results = util.run_command("nmap " + nmap_options + " " + self._device_ipv4_addr)[0] LOGGER.info("Nmap TCP script scan complete") - LOGGER.info("nmap script results\n" + str(nmap_results)) - return self._process_nmap_results(nmap_results=nmap_results) + nmap_results_json = self._nmap_results_to_json(nmap_results) + return self._process_nmap_json_results(nmap_results_json=nmap_results_json) def _scan_udp_with_script(self, script_name, ports=None): LOGGER.info("Running UDP nmap scan with script " + script_name) @@ -290,22 +307,24 @@ def _scan_udp_with_script(self, script_name, ports=None): port_options += " -p- " else: port_options += " -p" + ports + " " - nmap_options = scan_options + port_options + nmap_options = scan_options + port_options + " -oX - " nmap_results = util.run_command("nmap " + nmap_options + self._device_ipv4_addr)[0] LOGGER.info("Nmap UDP script scan complete") - return self._process_nmap_results(nmap_results=nmap_results) + nmap_results_json = self._nmap_results_to_json(nmap_results) + return self._process_nmap_json_results(nmap_results_json=nmap_results_json) def _scan_tcp_ports(self, tests): - max_port = 1000 + max_port = 65535 LOGGER.info("Running nmap TCP port scan") nmap_results = util.run_command( f"""nmap --open -sT -sV -Pn -v -p 1-{max_port} - --version-intensity 7 -T4 {self._device_ipv4_addr}""")[0] + --version-intensity 7 -T4 -oX - {self._device_ipv4_addr}""")[0] LOGGER.info("TCP port scan complete") - self._scan_tcp_results = self._process_nmap_results( - nmap_results=nmap_results) + nmap_results_json = self._nmap_results_to_json(nmap_results) + self._scan_tcp_results = self._process_nmap_json_results( + nmap_results_json=nmap_results_json) def _scan_udp_ports(self, tests): ports = [] @@ -319,39 +338,44 @@ def _scan_udp_ports(self, tests): LOGGER.info("Running nmap UDP port scan") LOGGER.info("UDP ports: " + str(port_list)) nmap_results = util.run_command( - f"nmap -sU -sV -p {port_list} {self._device_ipv4_addr}")[0] + f"nmap -sU -sV -p {port_list} -oX - {self._device_ipv4_addr}")[0] LOGGER.info("UDP port scan complete") - self._scan_udp_results = self._process_nmap_results( - nmap_results=nmap_results) + nmap_results_json = self._nmap_results_to_json(nmap_results) + self._scan_udp_results = self._process_nmap_json_results( + nmap_results_json=nmap_results_json) - def _process_nmap_results(self, nmap_results): + def _nmap_results_to_json(self,nmap_results): + try: + xml_data = xmltodict.parse(nmap_results) + json_data = json.dumps(xml_data, indent=4) + return json.loads(json_data) + + except Exception as e: + LOGGER.error(f"Error parsing Nmap output: {e}") + + def _process_nmap_json_results(self,nmap_results_json): + LOGGER.debug("nmap results\n" + json.dumps(nmap_results_json,indent=2)) results = {} - LOGGER.info("nmap results\n" + str(nmap_results)) - if nmap_results: - if "Service Info" in nmap_results and "MAC Address" not in nmap_results: - rows = nmap_results.split("PORT")[1].split("Service Info")[0].split( - "\n") - elif "PORT" in nmap_results: - rows = nmap_results.split("PORT")[1].split("MAC Address")[0].split("\n") - if rows: - for result in rows[1:-1]: # Iterate skipping the header and tail rows - cols = result.split() - port = cols[0].split("/")[0] - # If results do not start with a a port number, - # it is likely a bleed over from previous result so - # we need to ignore it - if port.isdigit(): - version = "" - if len(cols) > 3: - # recombine full version information that may contain spaces - version = " ".join(cols[3:]) - port_result = { - cols[0].split("/")[0]: { - "tcp_udp":cols[0].split("/")[1], - "state": cols[1], - "service": cols[2], - "version": version - } - } - results.update(port_result) + if "ports" in nmap_results_json["nmaprun"]["host"]: + ports = nmap_results_json["nmaprun"]["host"]["ports"] + # Checking if an object is a JSON object + if isinstance(ports["port"], dict): + results.update(self._json_port_to_dict(ports["port"])) + elif isinstance(ports["port"], list): + for port in ports["port"]: + results.update(self._json_port_to_dict(port)) return results + + def _json_port_to_dict(self,port_json): + port_result = {} + port = {} + port["tcp_udp"] = port_json["@protocol"] + port["state"] = port_json["state"]["@state"] + port["service"] = port_json["service"]["@name"] + port["version"] = "" + if "@version" in port_json["service"]: + port["version"] += port_json["service"]["@version"] + if "@extrainfo" in port_json["service"]: + port["version"] += " " + port_json["service"]["@extrainfo"] + port_result = {port_json["@portid"]:port} + return port_result \ No newline at end of file