Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions test_orc/modules/nmap/nmap.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Image name: test-run/baseline-test
FROM test-run/base-test:latest

#Load the requirements file
COPY modules/nmap/python/requirements.txt /testrun/python

#Install all python requirements for the module
RUN pip3 install -r /testrun/python/requirements.txt

# Copy over all configuration files
COPY modules/nmap/conf /testrun/conf

Expand Down
1 change: 1 addition & 0 deletions test_orc/modules/nmap/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
xmltodict
126 changes: 75 additions & 51 deletions test_orc/modules/nmap/python/src/nmap_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import util
import json
import threading
import xmltodict
import re
from test_module import TestModule

LOG_NAME = "test_nmap"
Expand All @@ -35,6 +37,7 @@ def __init__(self, module):
global LOGGER
LOGGER = self._get_logger()


def _security_nmap_ports(self, config):
LOGGER.info("Running security.nmap.ports test")

Expand Down Expand Up @@ -88,7 +91,7 @@ def _process_port_results(self, tests):
self._check_unknown_ports(tests=tests,scan_results=scan_results)

for test in tests:
LOGGER.info("Checking results for test: " + str(test))
LOGGER.info("Checking scan results for test: " + str(test))
self._check_scan_results(test_config=tests[test],scan_results=scan_results)

def _check_unknown_ports(self,tests,scan_results):
Expand Down Expand Up @@ -122,8 +125,16 @@ def _add_unknown_ports(self,tests,unallowed_port):
port_style = 'tcp_ports'
elif unallowed_port['tcp_udp'] == 'udp':
port_style = 'udp_ports'

LOGGER.info("Unknown Port Service: " + unallowed_port['service'])
for test in tests:
if unallowed_port['service'] in test:
LOGGER.debug("Checking for known service: " + test)
# Create a regular expression pattern to match the variable at the
# end of the string
port_service = r"\b" + re.escape(unallowed_port['service']) + r"\b$"
service_match = re.search(port_service, test)
if service_match:
LOGGER.info("Service Matched: " + test)
known_service=True
for test_port in tests[test][port_style]:
if "version" in tests[test][port_style][test_port]:
Expand All @@ -134,8 +145,8 @@ def _add_unknown_ports(self,tests,unallowed_port):
if tests[test][port_style][test_port]['allowed']:
result['allowed'] = True
break

tests[test][port_style][unallowed_port['port']]=result
break

if not known_service:
service_name = "security.services.unknown." + str(unallowed_port['port'])
Expand Down Expand Up @@ -195,14 +206,19 @@ def _check_unallowed_port(self,unallowed_ports,tests):
for port in unallowed_ports:
LOGGER.info('Checking unallowed port: ' + port['port'])
LOGGER.info('Looking for service: ' + port['service'])
LOGGER.info('Unallowed Port Config: ' + str(port))
LOGGER.debug('Unallowed Port Config: ' + str(port))
if port['tcp_udp'] == 'tcp':
port_style = 'tcp_ports'
elif port['tcp_udp'] == 'udp':
port_style = 'udp_ports'
for test in tests:
LOGGER.info('Checking test: ' + str(test))
if port['service'] in test:
LOGGER.debug('Checking test: ' + str(test))
# Create a regular expression pattern to match the variable at the
# end of the string
port_service = r"\b" + re.escape(port['service']) + r"\b$"
service_match = re.search(port_service, test)
if service_match:
LOGGER.info("Service Matched: " + test)
service_config = tests[test]
service = port['service']
for service_port in service_config[port_style]:
Expand Down Expand Up @@ -247,7 +263,7 @@ def _check_version(self,service,version_detected,version_expected):

def _scan_scripts(self, tests):
scan_results = {}
LOGGER.info("Checing for scan scripts")
LOGGER.info("Checking for scan scripts")
for test in tests:
test_config = tests[test]
if "tcp_ports" in test_config:
Expand All @@ -256,14 +272,15 @@ def _scan_scripts(self, tests):
if "service_scan" in port_config:
LOGGER.info("Service Scan Detected for: " + str(port))
svc = port_config["service_scan"]
scan_results.update(self._scan_tcp_with_script(svc["script"]))
result = self._scan_tcp_with_script(svc["script"])
scan_results.update(result)
if "udp_ports" in test_config:
for port in test_config["udp_ports"]:
if "service_scan" in port:
LOGGER.info("Service Scan Detected for: " + str(port))
svc = port["service_scan"]
self._scan_udp_with_script(svc["script"], port)
scan_results.update(self._scan_tcp_with_script(svc["script"]))
result = self._scan_udp_with_script(svc["script"], port)
scan_results.update(result)
self._script_scan_results = scan_results

def _scan_tcp_with_script(self, script_name, ports=None):
Expand All @@ -275,12 +292,12 @@ def _scan_tcp_with_script(self, script_name, ports=None):
else:
port_options += " -p" + ports + " "
results_file = f"/runtime/output/{self._module_name}-script_name.log"
nmap_options = scan_options + port_options + " -oG " + results_file
nmap_options = scan_options + port_options + " " + results_file + " -oX -"
nmap_results = util.run_command("nmap " + nmap_options + " " +
self._device_ipv4_addr)[0]
LOGGER.info("Nmap TCP script scan complete")
LOGGER.info("nmap script results\n" + str(nmap_results))
return self._process_nmap_results(nmap_results=nmap_results)
nmap_results_json = self._nmap_results_to_json(nmap_results)
return self._process_nmap_json_results(nmap_results_json=nmap_results_json)

def _scan_udp_with_script(self, script_name, ports=None):
LOGGER.info("Running UDP nmap scan with script " + script_name)
Expand All @@ -290,22 +307,24 @@ def _scan_udp_with_script(self, script_name, ports=None):
port_options += " -p- "
else:
port_options += " -p" + ports + " "
nmap_options = scan_options + port_options
nmap_options = scan_options + port_options + " -oX - "
nmap_results = util.run_command("nmap " + nmap_options +
self._device_ipv4_addr)[0]
LOGGER.info("Nmap UDP script scan complete")
return self._process_nmap_results(nmap_results=nmap_results)
nmap_results_json = self._nmap_results_to_json(nmap_results)
return self._process_nmap_json_results(nmap_results_json=nmap_results_json)

def _scan_tcp_ports(self, tests):
max_port = 1000
max_port = 65535
LOGGER.info("Running nmap TCP port scan")
nmap_results = util.run_command(
f"""nmap --open -sT -sV -Pn -v -p 1-{max_port}
--version-intensity 7 -T4 {self._device_ipv4_addr}""")[0]
--version-intensity 7 -T4 -oX - {self._device_ipv4_addr}""")[0]

LOGGER.info("TCP port scan complete")
self._scan_tcp_results = self._process_nmap_results(
nmap_results=nmap_results)
nmap_results_json = self._nmap_results_to_json(nmap_results)
self._scan_tcp_results = self._process_nmap_json_results(
nmap_results_json=nmap_results_json)

def _scan_udp_ports(self, tests):
ports = []
Expand All @@ -319,39 +338,44 @@ def _scan_udp_ports(self, tests):
LOGGER.info("Running nmap UDP port scan")
LOGGER.info("UDP ports: " + str(port_list))
nmap_results = util.run_command(
f"nmap -sU -sV -p {port_list} {self._device_ipv4_addr}")[0]
f"nmap -sU -sV -p {port_list} -oX - {self._device_ipv4_addr}")[0]
LOGGER.info("UDP port scan complete")
self._scan_udp_results = self._process_nmap_results(
nmap_results=nmap_results)
nmap_results_json = self._nmap_results_to_json(nmap_results)
self._scan_udp_results = self._process_nmap_json_results(
nmap_results_json=nmap_results_json)

def _process_nmap_results(self, nmap_results):
def _nmap_results_to_json(self,nmap_results):
try:
xml_data = xmltodict.parse(nmap_results)
json_data = json.dumps(xml_data, indent=4)
return json.loads(json_data)

except Exception as e:
LOGGER.error(f"Error parsing Nmap output: {e}")

def _process_nmap_json_results(self,nmap_results_json):
LOGGER.debug("nmap results\n" + json.dumps(nmap_results_json,indent=2))
results = {}
LOGGER.info("nmap results\n" + str(nmap_results))
if nmap_results:
if "Service Info" in nmap_results and "MAC Address" not in nmap_results:
rows = nmap_results.split("PORT")[1].split("Service Info")[0].split(
"\n")
elif "PORT" in nmap_results:
rows = nmap_results.split("PORT")[1].split("MAC Address")[0].split("\n")
if rows:
for result in rows[1:-1]: # Iterate skipping the header and tail rows
cols = result.split()
port = cols[0].split("/")[0]
# If results do not start with a a port number,
# it is likely a bleed over from previous result so
# we need to ignore it
if port.isdigit():
version = ""
if len(cols) > 3:
# recombine full version information that may contain spaces
version = " ".join(cols[3:])
port_result = {
cols[0].split("/")[0]: {
"tcp_udp":cols[0].split("/")[1],
"state": cols[1],
"service": cols[2],
"version": version
}
}
results.update(port_result)
if "ports" in nmap_results_json["nmaprun"]["host"]:
ports = nmap_results_json["nmaprun"]["host"]["ports"]
# Checking if an object is a JSON object
if isinstance(ports["port"], dict):
results.update(self._json_port_to_dict(ports["port"]))
elif isinstance(ports["port"], list):
for port in ports["port"]:
results.update(self._json_port_to_dict(port))
return results

def _json_port_to_dict(self,port_json):
port_result = {}
port = {}
port["tcp_udp"] = port_json["@protocol"]
port["state"] = port_json["state"]["@state"]
port["service"] = port_json["service"]["@name"]
port["version"] = ""
if "@version" in port_json["service"]:
port["version"] += port_json["service"]["@version"]
if "@extrainfo" in port_json["service"]:
port["version"] += " " + port_json["service"]["@extrainfo"]
port_result = {port_json["@portid"]:port}
return port_result