Skip to content
Merged

Nmap #38

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions net_orc/python/src/network_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def _device_discovered(self, mac_addr):
LOGGER.info(
f'Device with mac addr {device.mac_addr} has obtained IP address '
f'{device.ip_addr}')

self._start_device_monitor(device)

def _dhcp_lease_ack(self, packet):
Expand Down
3 changes: 2 additions & 1 deletion test_orc/modules/nmap/conf/module_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"tcp_ports": {
"22": {
"allowed": true,
"description": "Secure Shell (SSH) server"
"description": "Secure Shell (SSH) server",
"version": "2.0"
}
},
"description": "Check TELNET port 23 is disabled and TELNET is not running on any port",
Expand Down
170 changes: 143 additions & 27 deletions test_orc/modules/nmap/python/src/nmap_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

class NmapModule(TestModule):
"""NMAP Test module"""

def __init__(self, module):
super().__init__(module_name=module, log_name=LOG_NAME)
self._unallowed_ports = []
Expand Down Expand Up @@ -67,33 +68,87 @@ def _security_nmap_ports(self, config):
LOGGER.debug("UDP scan results: " + str(self._scan_udp_results))
LOGGER.debug("Service scan results: " + str(self._script_scan_results))
self._process_port_results(tests=config)
LOGGER.info("Unallowed Ports Detected: " + str(self._unallowed_ports))
self._check_unallowed_port(self._unallowed_ports,config)
LOGGER.info("Unallowed Ports: " + str(self._unallowed_ports))
LOGGER.info("Script scan results:\n" +
json.dumps(self._script_scan_results))
return len(self._unallowed_ports) == 0
else:
LOGGER.info("Device ip address not resolved, skipping")
return None

def _process_port_results(self, tests):
scan_results = {}
if self._scan_tcp_results is not None:
scan_results.update(self._scan_tcp_results)
if self._scan_udp_results is not None:
scan_results.update(self._scan_udp_results)
if self._script_scan_results is not None:
scan_results.update(self._script_scan_results)

self._check_unknown_ports(tests=tests,scan_results=scan_results)

for test in tests:
LOGGER.info("Checking results for test: " + str(test))
self._check_scan_results(test_config=tests[test])
self._check_scan_results(test_config=tests[test],scan_results=scan_results)

def _check_unknown_ports(self,tests,scan_results):
""" Check if any of the open ports detected are not defined
in the test configurations. If an open port is detected
without a configuration associated with it, the default behavior
is to mark it as an unallowed port.
"""
known_ports = []
for test in tests:
if "tcp_ports" in tests[test]:
for port in tests[test]['tcp_ports']:
known_ports.append(port)
if "udp_ports" in tests[test]:
for port in tests[test]['udp_ports']:
known_ports.append(port)

for port_result in scan_results:
if not port_result in known_ports:
LOGGER.info("Unknown port detected: " + port_result)
unallowed_port = {'port':port_result,
'service':scan_results[port_result]['service'],
'tcp_udp':scan_results[port_result]['tcp_udp']}
#self._unallowed_ports.append(unallowed_port)
self._add_unknown_ports(tests,unallowed_port)

def _add_unknown_ports(self,tests,unallowed_port):
known_service = False
result = {'description':"Undefined port",'allowed':False}
if unallowed_port['tcp_udp'] == 'tcp':
port_style = 'tcp_ports'
elif unallowed_port['tcp_udp'] == 'udp':
port_style = 'udp_ports'
for test in tests:
if unallowed_port['service'] in test:
known_service=True
for test_port in tests[test][port_style]:
if "version" in tests[test][port_style][test_port]:
result['version'] = tests[test][port_style][test_port]['version']
if "description" in tests[test][port_style][test_port]:
result['description'] = tests[test][port_style][test_port]['description']
result['inherited_from'] = test_port
if tests[test][port_style][test_port]['allowed']:
result['allowed'] = True
break

tests[test][port_style][unallowed_port['port']]=result

if not known_service:
service_name = "security.services.unknown." + str(unallowed_port['port'])
unknown_service = {port_style:{unallowed_port['port']:result}}
tests[service_name]=unknown_service

def _check_scan_results(self, test_config):
def _check_scan_results(self, test_config,scan_results):
port_config = {}
if "tcp_ports" in test_config:
port_config.update(test_config["tcp_ports"])
elif "udp_ports" in test_config:
port_config.update(test_config["udp_ports"])

scan_results = {}
if self._scan_tcp_results is not None:
scan_results.update(self._scan_tcp_results)
if self._scan_udp_results is not None:
scan_results.update(self._scan_udp_results)
if self._script_scan_results is not None:
scan_results.update(self._script_scan_results)
if port_config is not None:
for port, config in port_config.items():
result = None
Expand All @@ -103,11 +158,23 @@ def _check_scan_results(self, test_config):
if scan_results[port]["state"] == "open":
if not config["allowed"]:
LOGGER.info("Unallowed port open")
self._unallowed_ports.append(str(port))
self._unallowed_ports.append(
{"port":str(port),
"service":str(scan_results[port]["service"]),
'tcp_udp':scan_results[port]['tcp_udp']}
)
result = False
else:
LOGGER.info("Allowed port open")
result = True
if "version" in config and "version" in scan_results[port]:
version_check = self._check_version(scan_results[port]["service"],
scan_results[port]["version"],config["version"])
if version_check is not None:
result = version_check
else:
result = True
else:
result = True
else:
LOGGER.info("Port is closed")
result = True
Expand All @@ -120,6 +187,64 @@ def _check_scan_results(self, test_config):
else:
config["result"] = "skipped"

def _check_unallowed_port(self,unallowed_ports,tests):
service_allowed=False
allowed = False
version = None
service = None
for port in unallowed_ports:
LOGGER.info('Checking unallowed port: ' + port['port'])
LOGGER.info('Looking for service: ' + port['service'])
LOGGER.info('Unallowed Port Config: ' + str(port))
if port['tcp_udp'] == 'tcp':
port_style = 'tcp_ports'
elif port['tcp_udp'] == 'udp':
port_style = 'udp_ports'
for test in tests:
LOGGER.info('Checking test: ' + str(test))
if port['service'] in test:
service_config = tests[test]
service = port['service']
for service_port in service_config[port_style]:
port_config = service_config[port_style][service_port]
service_allowed |= port_config['allowed']
version = port_config['version'] if 'version' in port_config else None
if service_allowed:
LOGGER.info("Unallowed port detected for allowed service: " + service)
if version is not None:
allowed = self._check_version(service=service,
version_detected=self._scan_tcp_results[port['port']]['version'],
version_expected=version)
else:
allowed = True
if allowed:
LOGGER.info("Unallowed port exception for approved service: " + port['port'])
for u_port in self._unallowed_ports:
if port['port'] in u_port['port']:
self._unallowed_ports.remove(u_port)
break
break

def _check_version(self,service,version_detected,version_expected):
"""Check if the version specified for the service matches what was
detected by nmap. Since there is no consistency in how nmap service
results are returned, each service that needs a checked must be
implemented individually. If a service version is requested
that is not implemented, this test will provide a skip (None)
result.
"""
LOGGER.info("Checking version for service: " + service)
LOGGER.info("NMAP Version Detected: " + version_detected)
LOGGER.info("Version Expected: " + version_expected)
version_check = None
match service:
case "ssh":
version_check = f"protocol {version_expected}" in version_detected
case _:
LOGGER.info("No version check implemented for service: " + service + ". Skipping")
LOGGER.info("Version check result: " + str(version_check))
return version_check

def _scan_scripts(self, tests):
scan_results = {}
LOGGER.info("Checing for scan scripts")
Expand Down Expand Up @@ -169,25 +294,15 @@ def _scan_udp_with_script(self, script_name, ports=None):
nmap_results = util.run_command("nmap " + nmap_options +
self._device_ipv4_addr)[0]
LOGGER.info("Nmap UDP script scan complete")
LOGGER.info("nmap script results\n" + str(nmap_results))
return self._process_nmap_results(nmap_results=nmap_results)

def _scan_tcp_ports(self, tests):
max_port = 1000
ports = []
for test in tests:
test_config = tests[test]
if "tcp_ports" in test_config:
for port in test_config["tcp_ports"]:
if int(port) > max_port:
ports.append(port)
ports_to_scan = "1-" + str(max_port)
if len(ports) > 0:
ports_to_scan += "," + ",".join(ports)
LOGGER.info("Running nmap TCP port scan")
LOGGER.info("TCP ports: " + str(ports_to_scan))
nmap_results = util.run_command(f"""nmap -sT -sV -Pn -v -p {ports_to_scan}
nmap_results = util.run_command(
f"""nmap --open -sT -sV -Pn -v -p 1-{max_port}
--version-intensity 7 -T4 {self._device_ipv4_addr}""")[0]

LOGGER.info("TCP port scan complete")
self._scan_tcp_results = self._process_nmap_results(
nmap_results=nmap_results)
Expand All @@ -213,7 +328,7 @@ def _process_nmap_results(self, nmap_results):
results = {}
LOGGER.info("nmap results\n" + str(nmap_results))
if nmap_results:
if "Service Info" in nmap_results:
if "Service Info" in nmap_results and "MAC Address" not in nmap_results:
rows = nmap_results.split("PORT")[1].split("Service Info")[0].split(
"\n")
elif "PORT" in nmap_results:
Expand All @@ -232,6 +347,7 @@ def _process_nmap_results(self, nmap_results):
version = " ".join(cols[3:])
port_result = {
cols[0].split("/")[0]: {
"tcp_udp":cols[0].split("/")[1],
"state": cols[1],
"service": cols[2],
"version": version
Expand Down