diff --git a/framework/test_runner.py b/framework/test_runner.py index 95f3e4208..0733d4353 100644 --- a/framework/test_runner.py +++ b/framework/test_runner.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """Wrapper for the TestRun that simplifies virtual testing procedure by allowing direct calling from the command line. @@ -16,11 +15,15 @@ LOGGER = logger.get_logger("runner") + class TestRunner: """Controls and starts the Test Run application.""" - def __init__(self, config_file=None, validate=True, - net_only=False, single_intf=False): + def __init__(self, + config_file=None, + validate=True, + net_only=False, + single_intf=False): self._register_exits() self.test_run = TestRun(config_file=config_file, validate=validate, @@ -50,22 +53,34 @@ def start(self): self.test_run.start() LOGGER.info("Test Run has finished") -def parse_args(argv): - parser = argparse.ArgumentParser(description="Test Run", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("-f", "--config-file", default=None, - help="Define the configuration file for Test Run and Network Orchestrator") - parser.add_argument("--no-validate", action="store_true", - help="Turn off the validation of the network after network boot") - parser.add_argument("-net", "--net-only", action="store_true", - help="Run the network only, do not run tests") - parser.add_argument("--single-intf", action="store_true", - help="Single interface mode (experimental)") + +def parse_args(): + parser = argparse.ArgumentParser( + description="Test Run", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + "-f", + "--config-file", + default=None, + help="Define the configuration file for Test Run and Network Orchestrator" + ) + parser.add_argument( + "--no-validate", + action="store_true", + help="Turn off the validation of the network after network boot") + parser.add_argument("-net", + "--net-only", + action="store_true", + help="Run the network only, do not run tests") + parser.add_argument("--single-intf", + action="store_true", + help="Single interface mode (experimental)") parsed_args = parser.parse_known_args()[0] return parsed_args + if __name__ == "__main__": - args = parse_args(sys.argv) + args = parse_args() runner = TestRunner(config_file=args.config_file, validate=not args.no_validate, net_only=args.net_only, diff --git a/net_orc/network/modules/ntp/ntp-server.py b/net_orc/network/modules/ntp/ntp-server.py deleted file mode 100644 index 9d6a6da8e..000000000 --- a/net_orc/network/modules/ntp/ntp-server.py +++ /dev/null @@ -1,307 +0,0 @@ -import datetime -import socket -import struct -import time -import queue - -import threading -import select - -taskQueue = queue.Queue() -stop_flag = False - -def system_to_ntp_time(timestamp): - """Convert a system time to a NTP time. - - Parameters: - timestamp -- timestamp in system time - - Returns: - corresponding NTP time - """ - return timestamp + NTP.NTP_DELTA - -def _to_int(timestamp): - """Return the integral part of a timestamp. - - Parameters: - timestamp -- NTP timestamp - - Retuns: - integral part - """ - return int(timestamp) - -def _to_frac(timestamp, n=32): - """Return the fractional part of a timestamp. - - Parameters: - timestamp -- NTP timestamp - n -- number of bits of the fractional part - - Retuns: - fractional part - """ - return int(abs(timestamp - _to_int(timestamp)) * 2**n) - -def _to_time(integ, frac, n=32): - """Return a timestamp from an integral and fractional part. - - Parameters: - integ -- integral part - frac -- fractional part - n -- number of bits of the fractional part - - Retuns: - timestamp - """ - return integ + float(frac)/2**n - -class NTPException(Exception): - """Exception raised by this module.""" - pass - -class NTP: - """Helper class defining constants.""" - - _SYSTEM_EPOCH = datetime.date(*time.gmtime(0)[0:3]) - """system epoch""" - _NTP_EPOCH = datetime.date(1900, 1, 1) - """NTP epoch""" - NTP_DELTA = (_SYSTEM_EPOCH - _NTP_EPOCH).days * 24 * 3600 - """delta between system and NTP time""" - - REF_ID_TABLE = { - 'DNC': "DNC routing protocol", - 'NIST': "NIST public modem", - 'TSP': "TSP time protocol", - 'DTS': "Digital Time Service", - 'ATOM': "Atomic clock (calibrated)", - 'VLF': "VLF radio (OMEGA, etc)", - 'callsign': "Generic radio", - 'LORC': "LORAN-C radionavidation", - 'GOES': "GOES UHF environment satellite", - 'GPS': "GPS UHF satellite positioning", - } - """reference identifier table""" - - STRATUM_TABLE = { - 0: "unspecified", - 1: "primary reference", - } - """stratum table""" - - MODE_TABLE = { - 0: "unspecified", - 1: "symmetric active", - 2: "symmetric passive", - 3: "client", - 4: "server", - 5: "broadcast", - 6: "reserved for NTP control messages", - 7: "reserved for private use", - } - """mode table""" - - LEAP_TABLE = { - 0: "no warning", - 1: "last minute has 61 seconds", - 2: "last minute has 59 seconds", - 3: "alarm condition (clock not synchronized)", - } - """leap indicator table""" - -class NTPPacket: - """NTP packet class. - - This represents an NTP packet. - """ - - _PACKET_FORMAT = "!B B B b 11I" - """packet format to pack/unpack""" - - def __init__(self, version=4, mode=3, tx_timestamp=0): - """Constructor. - - Parameters: - version -- NTP version - mode -- packet mode (client, server) - tx_timestamp -- packet transmit timestamp - """ - self.leap = 0 - """leap second indicator""" - self.version = version - """version""" - self.mode = mode - """mode""" - self.stratum = 0 - """stratum""" - self.poll = 0 - """poll interval""" - self.precision = 0 - """precision""" - self.root_delay = 0 - """root delay""" - self.root_dispersion = 0 - """root dispersion""" - self.ref_id = 0 - """reference clock identifier""" - self.ref_timestamp = 0 - """reference timestamp""" - self.orig_timestamp = 0 - self.orig_timestamp_high = 0 - self.orig_timestamp_low = 0 - """originate timestamp""" - self.recv_timestamp = 0 - """receive timestamp""" - self.tx_timestamp = tx_timestamp - self.tx_timestamp_high = 0 - self.tx_timestamp_low = 0 - """tansmit timestamp""" - - def to_data(self): - """Convert this NTPPacket to a buffer that can be sent over a socket. - - Returns: - buffer representing this packet - - Raises: - NTPException -- in case of invalid field - """ - try: - packed = struct.pack(NTPPacket._PACKET_FORMAT, - (self.leap << 6 | self.version << 3 | self.mode), - self.stratum, - self.poll, - self.precision, - _to_int(self.root_delay) << 16 | _to_frac(self.root_delay, 16), - _to_int(self.root_dispersion) << 16 | - _to_frac(self.root_dispersion, 16), - self.ref_id, - _to_int(self.ref_timestamp), - _to_frac(self.ref_timestamp), - #Change by lichen, avoid loss of precision - self.orig_timestamp_high, - self.orig_timestamp_low, - _to_int(self.recv_timestamp), - _to_frac(self.recv_timestamp), - _to_int(self.tx_timestamp), - _to_frac(self.tx_timestamp)) - except struct.error: - raise NTPException("Invalid NTP packet fields.") - return packed - - def from_data(self, data): - """Populate this instance from a NTP packet payload received from - the network. - - Parameters: - data -- buffer payload - - Raises: - NTPException -- in case of invalid packet format - """ - try: - unpacked = struct.unpack(NTPPacket._PACKET_FORMAT, - data[0:struct.calcsize(NTPPacket._PACKET_FORMAT)]) - except struct.error: - raise NTPException("Invalid NTP packet.") - - self.leap = unpacked[0] >> 6 & 0x3 - self.version = unpacked[0] >> 3 & 0x7 - self.mode = unpacked[0] & 0x7 - self.stratum = unpacked[1] - self.poll = unpacked[2] - self.precision = unpacked[3] - self.root_delay = float(unpacked[4])/2**16 - self.root_dispersion = float(unpacked[5])/2**16 - self.ref_id = unpacked[6] - self.ref_timestamp = _to_time(unpacked[7], unpacked[8]) - self.orig_timestamp = _to_time(unpacked[9], unpacked[10]) - self.orig_timestamp_high = unpacked[9] - self.orig_timestamp_low = unpacked[10] - self.recv_timestamp = _to_time(unpacked[11], unpacked[12]) - self.tx_timestamp = _to_time(unpacked[13], unpacked[14]) - self.tx_timestamp_high = unpacked[13] - self.tx_timestamp_low = unpacked[14] - - def GetTxTimeStamp(self): - return (self.tx_timestamp_high,self.tx_timestamp_low) - - def SetOriginTimeStamp(self,high,low): - self.orig_timestamp_high = high - self.orig_timestamp_low = low - -class RecvThread(threading.Thread): - - def __init__(self,socket): - threading.Thread.__init__(self) - self.socket = socket - - def run(self): - global t,stop_flag - while True: - if stop_flag == True: - print("RecvThread Ended") - break - rlist,wlist,elist = select.select([self.socket],[],[],1) - if len(rlist) != 0: - print("Received %d packets" % len(rlist)) - for tempSocket in rlist: - try: - data,addr = tempSocket.recvfrom(1024) - recvTimestamp = recvTimestamp = system_to_ntp_time(time.time()) - taskQueue.put((data,addr,recvTimestamp)) - except socket.error as msg: - print(msg) - -class WorkThread(threading.Thread): - - def __init__(self,socket): - threading.Thread.__init__(self) - self.socket = socket - - def run(self): - global taskQueue,stop_flag - while True: - if stop_flag is True: - print("WorkThread Ended") - break - try: - data,addr,recvTimestamp = taskQueue.get(timeout=1) - recvPacket = NTPPacket() - recvPacket.from_data(data) - timeStamp_high,timeStamp_low = recvPacket.GetTxTimeStamp() - sendPacket = NTPPacket(version=4,mode=4) - sendPacket.stratum = 2 - sendPacket.poll = 10 - sendPacket.ref_timestamp = recvTimestamp-5 - sendPacket.SetOriginTimeStamp(timeStamp_high,timeStamp_low) - sendPacket.recv_timestamp = recvTimestamp - sendPacket.tx_timestamp = system_to_ntp_time(time.time()) - socket.sendto(sendPacket.to_data(),addr) - print("Sent to %s:%d" % (addr[0],addr[1])) - except queue.Empty: - continue - -listen_ip = "0.0.0.0" -listen_port = 123 -socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) -socket.bind((listen_ip,listen_port)) -print(f"local socket: {socket.getsockname()}") -recvThread = RecvThread(socket) -recvThread.start() -workThread = WorkThread(socket) -workThread.start() - -while True: - try: - time.sleep(0.5) - except KeyboardInterrupt: - print("Exiting...") - stop_flag = True - recvThread.join() - workThread.join() - #socket.close() - print("Exited") - break diff --git a/net_orc/network/modules/ovs/python/src/logger.py b/net_orc/network/modules/ovs/python/src/logger.py index 566a5c75e..23e697e43 100644 --- a/net_orc/network/modules/ovs/python/src/logger.py +++ b/net_orc/network/modules/ovs/python/src/logger.py @@ -1,14 +1,13 @@ -#!/usr/bin/env python3 - +"""Sets up the logger to be used for the ovs modules.""" import logging LOGGERS = {} -_LOG_FORMAT = "%(asctime)s %(name)-8s %(levelname)-7s %(message)s" +_LOG_FORMAT = '%(asctime)s %(name)-8s %(levelname)-7s %(message)s' _DATE_FORMAT = '%b %02d %H:%M:%S' # Set level to debug if set as runtime flag -logging.basicConfig(format=_LOG_FORMAT, - datefmt=_DATE_FORMAT, +logging.basicConfig(format=_LOG_FORMAT, + datefmt=_DATE_FORMAT, level=logging.INFO) def get_logger(name): diff --git a/net_orc/network/modules/ovs/python/src/ovs_control.py b/net_orc/network/modules/ovs/python/src/ovs_control.py index 53406cef2..765c50f92 100644 --- a/net_orc/network/modules/ovs/python/src/ovs_control.py +++ b/net_orc/network/modules/ovs/python/src/ovs_control.py @@ -1,32 +1,31 @@ -#!/usr/bin/env python3 - +"""OVS Control Module""" import json import logger import util -CONFIG_FILE = "/ovs/conf/system.json" -DEVICE_BRIDGE = "tr-d" -INTERNET_BRIDGE = "tr-c" +CONFIG_FILE = '/ovs/conf/system.json' +DEVICE_BRIDGE = 'tr-d' +INTERNET_BRIDGE = 'tr-c' LOGGER = logger.get_logger('ovs_ctrl') class OVSControl: - + """OVS Control""" def __init__(self): self._int_intf = None self._dev_intf = None self._load_config() def add_bridge(self, bridge_name): - LOGGER.info("Adding OVS Bridge: " + bridge_name) + LOGGER.info('Adding OVS Bridge: ' + bridge_name) # Create the bridge using ovs-vsctl commands # Uses the --may-exist option to prevent failures # if this bridge already exists by this name it won't fail # and will not modify the existing bridge - success=util.run_command("ovs-vsctl --may-exist add-br " + bridge_name) + success=util.run_command('ovs-vsctl --may-exist add-br ' + bridge_name) return success def add_port(self,port, bridge_name): - LOGGER.info("Adding Port " + port + " to OVS Bridge: " + bridge_name) + LOGGER.info('Adding Port ' + port + ' to OVS Bridge: ' + bridge_name) # Add a port to the bridge using ovs-vsctl commands # Uses the --may-exist option to prevent failures # if this port already exists on the bridge and will not @@ -36,7 +35,7 @@ def add_port(self,port, bridge_name): return success def create_net(self): - LOGGER.info("Creating baseline network") + LOGGER.info('Creating baseline network') # Create data plane self.add_bridge(DEVICE_BRIDGE) @@ -45,7 +44,7 @@ def create_net(self): self.add_bridge(INTERNET_BRIDGE) # Remove IP from internet adapter - self.set_interface_ip(self._int_intf,"0.0.0.0") + self.set_interface_ip(self._int_intf,'0.0.0.0') # Add external interfaces to data and control plane self.add_port(self._dev_intf,DEVICE_BRIDGE) @@ -56,48 +55,49 @@ def create_net(self): self.set_bridge_up(INTERNET_BRIDGE) def delete_bridge(self,bridge_name): - LOGGER.info("Deleting OVS Bridge: " + bridge_name) + LOGGER.info('Deleting OVS Bridge: ' + bridge_name) # Delete the bridge using ovs-vsctl commands # Uses the --if-exists option to prevent failures # if this bridge does not exists - success=util.run_command("ovs-vsctl --if-exists del-br " + bridge_name) + success=util.run_command('ovs-vsctl --if-exists del-br ' + bridge_name) return success def _load_config(self): - LOGGER.info("Loading Configuration: " + CONFIG_FILE) - config_json = json.load(open(CONFIG_FILE, "r", encoding="utf-8")) - self._int_intf = config_json["internet_intf"] - self._dev_intf = config_json["device_intf"] - LOGGER.info("Configuration Loaded") - LOGGER.info("Internet Interface: " + self._int_intf) - LOGGER.info("Device Interface: " + self._dev_intf) + LOGGER.info('Loading Configuration: ' + CONFIG_FILE) + with open(CONFIG_FILE, 'r', encoding='utf-8') as conf_file: + config_json = json.load(conf_file) + self._int_intf = config_json['internet_intf'] + self._dev_intf = config_json['device_intf'] + LOGGER.info('Configuration Loaded') + LOGGER.info('Internet Interface: ' + self._int_intf) + LOGGER.info('Device Interface: ' + self._dev_intf) def restore_net(self): - LOGGER.info("Restoring Network...") + LOGGER.info('Restoring Network...') # Delete data plane self.delete_bridge(DEVICE_BRIDGE) # Delete control plane self.delete_bridge(INTERNET_BRIDGE) - LOGGER.info("Network is restored") + LOGGER.info('Network is restored') def show_config(self): - LOGGER.info("Show current config of OVS") - success=util.run_command("ovs-vsctl show") + LOGGER.info('Show current config of OVS') + success=util.run_command('ovs-vsctl show') return success def set_bridge_up(self,bridge_name): - LOGGER.info("Setting Bridge device to up state: " + bridge_name) - success=util.run_command("ip link set dev " + bridge_name + " up") + LOGGER.info('Setting Bridge device to up state: ' + bridge_name) + success=util.run_command('ip link set dev ' + bridge_name + ' up') return success def set_interface_ip(self,interface, ip_addr): - LOGGER.info("Setting interface " + interface + " to " + ip_addr) + LOGGER.info('Setting interface ' + interface + ' to ' + ip_addr) # Remove IP from internet adapter - util.run_command("ifconfig " + interface + " 0.0.0.0") + util.run_command('ifconfig ' + interface + ' 0.0.0.0') -if __name__ == "__main__": +if __name__ == '__main__': ovs = OVSControl() ovs.create_net() ovs.show_config() diff --git a/net_orc/network/modules/ovs/python/src/run.py b/net_orc/network/modules/ovs/python/src/run.py index f91c2dfeb..5787a74e6 100644 --- a/net_orc/network/modules/ovs/python/src/run.py +++ b/net_orc/network/modules/ovs/python/src/run.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +"""Run OVS module""" import logger import signal import sys @@ -10,7 +9,7 @@ LOGGER = logger.get_logger('ovs_control_run') class OVSControlRun: - + """Run the OVS module.""" def __init__(self): signal.signal(signal.SIGINT, self.handler) @@ -18,7 +17,7 @@ def __init__(self): signal.signal(signal.SIGABRT, self.handler) signal.signal(signal.SIGQUIT, self.handler) - LOGGER.info("Starting OVS Control") + LOGGER.info('Starting OVS Control') # Get all components ready self._ovs_control = OVSControl() @@ -30,11 +29,11 @@ def __init__(self): self._ovs_control.show_config() # Get network ready (via Network orchestrator) - LOGGER.info("Network is ready. Waiting for device information...") + LOGGER.info('Network is ready. Waiting for device information...') #Loop forever until process is stopped while True: - LOGGER.info("OVS Running") + LOGGER.info('OVS Running') time.sleep(1000) # TODO: This time should be configurable (How long to hold before exiting, @@ -44,11 +43,11 @@ def __init__(self): # Tear down network #self._ovs_control.shutdown() - def handler(self, signum, frame): - LOGGER.info("SigtermEnum: " + str(signal.SIGTERM)) - LOGGER.info("Exit signal received: " + str(signum)) + def handler(self, signum): + LOGGER.info('SigtermEnum: ' + str(signal.SIGTERM)) + LOGGER.info('Exit signal received: ' + str(signum)) if (signum == 2 or signal == signal.SIGTERM): - LOGGER.info("Exit signal received. Restoring network...") + LOGGER.info('Exit signal received. Restoring network...') self._ovs_control.shutdown() sys.exit(1) diff --git a/net_orc/network/modules/ovs/python/src/util.py b/net_orc/network/modules/ovs/python/src/util.py index c9eba39ff..a3ebbb10a 100644 --- a/net_orc/network/modules/ovs/python/src/util.py +++ b/net_orc/network/modules/ovs/python/src/util.py @@ -1,21 +1,23 @@ +"""Provides basic utilities for a ovs module.""" import subprocess import logger +LOGGER = logger.get_logger('util') def run_command(cmd): success = False - LOGGER = logger.get_logger('util') - process = subprocess.Popen(cmd.split(), - stdout=subprocess.PIPE, + process = subprocess.Popen(cmd.split(), + stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() - if process.returncode !=0: - err_msg = "%s. Code: %s" % (stderr.strip(), process.returncode) - LOGGER.error("Command Failed: " + cmd) - LOGGER.error("Error: " + err_msg) + if process.returncode != 0: + err_msg = f'{stderr.strip()}. Code: {process.returncode}' + LOGGER.error('Command Failed: ' + cmd) + LOGGER.error('Error: ' + err_msg) else: - succ_msg = "%s. Code: %s" % (stdout.strip().decode('utf-8'), process.returncode) - LOGGER.info("Command Success: " + cmd) - LOGGER.info("Success: " + succ_msg) + msg = stdout.strip().decode('utf-8') + succ_msg = f'{msg}. Code: {process.returncode}' + LOGGER.info('Command Success: ' + cmd) + LOGGER.info('Success: ' + succ_msg) success = True return success diff --git a/net_orc/python/src/logger.py b/net_orc/python/src/logger.py deleted file mode 100644 index aaf690c8a..000000000 --- a/net_orc/python/src/logger.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Sets up the logger to be used for the network orchestrator.""" -import json -import logging -import os - -LOGGERS = {} -_LOG_FORMAT = '%(asctime)s %(name)-8s %(levelname)-7s %(message)s' -_DATE_FORMAT = '%b %02d %H:%M:%S' -_DEFAULT_LEVEL = logging.INFO -_CONF_DIR = 'conf' -_CONF_FILE_NAME = 'system.json' - -# Set log level -try: - - with open(os.path.join(_CONF_DIR, _CONF_FILE_NAME), - encoding='UTF-8') as config_json_file: - system_conf_json = json.load(config_json_file) - - log_level_str = system_conf_json['log_level'] - LOG_LEVEL = logging.getLevelName(log_level_str) -except OSError: - LOG_LEVEL = _DEFAULT_LEVEL - -logging.basicConfig(format=_LOG_FORMAT, datefmt=_DATE_FORMAT, level=LOG_LEVEL) - - -def get_logger(name): - if name not in LOGGERS: - LOGGERS[name] = logging.getLogger(name) - return LOGGERS[name] diff --git a/test_orc/modules/base/python/src/grpc/start_server.py b/test_orc/modules/base/python/src/grpc/start_server.py index 970da67fc..b4016c831 100644 --- a/test_orc/modules/base/python/src/grpc/start_server.py +++ b/test_orc/modules/base/python/src/grpc/start_server.py @@ -1,38 +1,37 @@ +"""Base class for starting the gRPC server for a network module.""" from concurrent import futures import grpc import proto.grpc_pb2_grpc as pb2_grpc -import proto.grpc_pb2 as pb2 from network_service import NetworkService -import sys import argparse -DEFAULT_PORT = "5001" +DEFAULT_PORT = '5001' -def serve(PORT): +def serve(port): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) pb2_grpc.add_NetworkModuleServicer_to_server(NetworkService(), server) - server.add_insecure_port("[::]:" + PORT) + server.add_insecure_port('[::]:' + port) server.start() server.wait_for_termination() -def run(argv): +def run(): parser = argparse.ArgumentParser( - description="GRPC Server for Network Module", + description='GRPC Server for Network Module', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("-p", - "--port", + parser.add_argument('-p', + '--port', default=DEFAULT_PORT, - help="Define the default port to run the server on.") + help='Define the default port to run the server on.') args = parser.parse_args() - PORT = args.port + port = args.port - print("gRPC server starting on port " + PORT) - serve(PORT) + print('gRPC server starting on port ' + port) + serve(port) -if __name__ == "__main__": - run(sys.argv) +if __name__ == '__main__': + run() diff --git a/test_orc/modules/base/python/src/test_module.py b/test_orc/modules/base/python/src/test_module.py index 34af4cbb4..8e10a3637 100644 --- a/test_orc/modules/base/python/src/test_module.py +++ b/test_orc/modules/base/python/src/test_module.py @@ -1,3 +1,4 @@ +"""Base class for all core test module functions""" import json import logger import os @@ -91,20 +92,18 @@ def run_tests(self): self._write_results(json_results) def _read_config(self): - f = open(CONF_FILE, encoding='utf-8') - config = json.load(f) - f.close() + with open(CONF_FILE, encoding='utf-8') as f: + config = json.load(f) return config def _write_results(self, results): results_file = RESULTS_DIR + self._module_name + '-result.json' LOGGER.info('Writing results to ' + results_file) - f = open(results_file, 'w', encoding='utf-8') - f.write(results) - f.close() + with open(results_file, 'w', encoding='utf-8') as f: + f.write(results) def _get_device_ipv4(self): - command = f"""/testrun/bin/get_ipv4_addr {self._ipv4_subnet} + command = f"""/testrun/bin/get_ipv4_addr {self._ipv4_subnet} {self._device_mac.upper()}""" text = util.run_command(command)[0] if text: diff --git a/test_orc/modules/base/python/src/util.py b/test_orc/modules/base/python/src/util.py index 557f450a6..d387db796 100644 --- a/test_orc/modules/base/python/src/util.py +++ b/test_orc/modules/base/python/src/util.py @@ -1,7 +1,9 @@ +"""Provides basic utilities for a test module.""" import subprocess import shlex import logger +LOGGER = logger.get_logger('util') # Runs a process at the os level # By default, returns the standard output and error output @@ -11,18 +13,17 @@ # by any return code from the process other than zero. def run_command(cmd, output=True): success = False - LOGGER = logger.get_logger('util') process = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() if process.returncode != 0 and output: - err_msg = "%s. Code: %s" % (stderr.strip(), process.returncode) - LOGGER.error("Command Failed: " + cmd) - LOGGER.error("Error: " + err_msg) + err_msg = f'{stderr.strip()}. Code: {process.returncode}' + LOGGER.error('Command Failed: ' + cmd) + LOGGER.error('Error: ' + err_msg) else: success = True if output: - return stdout.strip().decode("utf-8"), stderr + return stdout.strip().decode('utf-8'), stderr else: return success diff --git a/test_orc/modules/baseline/python/src/baseline_module.py b/test_orc/modules/baseline/python/src/baseline_module.py index 9816bd28a..083123436 100644 --- a/test_orc/modules/baseline/python/src/baseline_module.py +++ b/test_orc/modules/baseline/python/src/baseline_module.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +"""Baseline test module""" from test_module import TestModule LOG_NAME = "test_baseline" @@ -27,4 +26,3 @@ def _baseline_fail(self): def _baseline_skip(self): LOGGER.info("Running baseline pass test") LOGGER.info("Baseline pass test finished") - return None diff --git a/test_orc/modules/baseline/python/src/run.py b/test_orc/modules/baseline/python/src/run.py index 89b3a08e4..1892ed8ae 100644 --- a/test_orc/modules/baseline/python/src/run.py +++ b/test_orc/modules/baseline/python/src/run.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +"""Run Baseline module""" import argparse import signal import sys @@ -21,29 +20,29 @@ def __init__(self, module): signal.signal(signal.SIGABRT, self._handler) signal.signal(signal.SIGQUIT, self._handler) - LOGGER.info("Starting Baseline Module") + LOGGER.info('Starting Baseline Module') self._test_module = BaselineModule(module) self._test_module.run_tests() - def _handler(self, signum, *other): - LOGGER.debug("SigtermEnum: " + str(signal.SIGTERM)) - LOGGER.debug("Exit signal received: " + str(signum)) + def _handler(self, signum): + LOGGER.debug('SigtermEnum: ' + str(signal.SIGTERM)) + LOGGER.debug('Exit signal received: ' + str(signum)) if signum in (2, signal.SIGTERM): - LOGGER.info("Exit signal received. Stopping test module...") - LOGGER.info("Test module stopped") + LOGGER.info('Exit signal received. Stopping test module...') + LOGGER.info('Test module stopped') sys.exit(1) -def run(argv): +def run(): parser = argparse.ArgumentParser( - description="Baseline Module Help", + description='Baseline Module Help', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( - "-m", - "--module", - help="Define the module name to be used to create the log file") + '-m', + '--module', + help='Define the module name to be used to create the log file') args = parser.parse_args() @@ -52,5 +51,5 @@ def run(argv): BaselineModuleRunner(args.module.strip()) -if __name__ == "__main__": - run(sys.argv) +if __name__ == '__main__': + run() diff --git a/test_orc/modules/dns/python/src/dns_module.py b/test_orc/modules/dns/python/src/dns_module.py index b161805a5..58ce48123 100644 --- a/test_orc/modules/dns/python/src/dns_module.py +++ b/test_orc/modules/dns/python/src/dns_module.py @@ -1,52 +1,52 @@ -#!/usr/bin/env python3 - +"""DNS test module""" import subprocess from test_module import TestModule -LOG_NAME = "test_dns" -CAPTURE_FILE = "/runtime/network/dns.pcap" +LOG_NAME = 'test_dns' +CAPTURE_FILE = '/runtime/network/dns.pcap' LOGGER = None class DNSModule(TestModule): + """DNS Test module""" def __init__(self, module): super().__init__(module_name=module, log_name=LOG_NAME) - self._dns_server = "10.10.10.4" + self._dns_server = '10.10.10.4' global LOGGER LOGGER = self._get_logger() def _check_dns_traffic(self, tcpdump_filter): to_dns = self._exec_tcpdump(tcpdump_filter) num_query_dns = len(to_dns) - LOGGER.info("DNS queries found: " + str(num_query_dns)) + LOGGER.info('DNS queries found: ' + str(num_query_dns)) dns_traffic_detected = len(to_dns) > 0 - LOGGER.info("DNS traffic detected: " + str(dns_traffic_detected)) + LOGGER.info('DNS traffic detected: ' + str(dns_traffic_detected)) return dns_traffic_detected def _dns_network_from_dhcp(self): - LOGGER.info("Checking DNS traffic for configured DHCP DNS server: " + + LOGGER.info('Checking DNS traffic for configured DHCP DNS server: ' + self._dns_server) # Check if the device DNS traffic is to appropriate server - tcpdump_filter = "dst port 53 and dst host {} and ether src {}".format( - self._dns_server, self._device_mac) + tcpdump_filter = (f'dst port 53 and dst host {self._dns_server}', + f' and ether src {self._device_mac}') result = self._check_dns_traffic(tcpdump_filter=tcpdump_filter) - LOGGER.info("DNS traffic detected to configured DHCP DNS server: " + + LOGGER.info('DNS traffic detected to configured DHCP DNS server: ' + str(result)) return result def _dns_network_from_device(self): - LOGGER.info("Checking DNS traffic from device: " + self._device_mac) + LOGGER.info('Checking DNS traffic from device: ' + self._device_mac) # Check if the device DNS traffic is to appropriate server - tcpdump_filter = "dst port 53 and ether src {}".format(self._device_mac) + tcpdump_filter = f'dst port 53 and ether src {self._device_mac}' result = self._check_dns_traffic(tcpdump_filter=tcpdump_filter) - LOGGER.info("DNS traffic detected from device: " + str(result)) + LOGGER.info('DNS traffic detected from device: ' + str(result)) return result def _exec_tcpdump(self, tcpdump_filter): @@ -57,9 +57,9 @@ def _exec_tcpdump(self, tcpdump_filter): Returns List of packets matching the filter """ - command = "tcpdump -tttt -n -r {} {}".format(CAPTURE_FILE, tcpdump_filter) + command = f'tcpdump -tttt -n -r {CAPTURE_FILE} {tcpdump_filter}' - LOGGER.debug("tcpdump command: " + command) + LOGGER.debug('tcpdump command: ' + command) process = subprocess.Popen(command, universal_newlines=True, @@ -68,9 +68,9 @@ def _exec_tcpdump(self, tcpdump_filter): stderr=subprocess.PIPE) text = str(process.stdout.read()).rstrip() - LOGGER.debug("tcpdump response: " + text) + LOGGER.debug('tcpdump response: ' + text) if text: - return text.split("\n") + return text.split('\n') return [] diff --git a/test_orc/modules/dns/python/src/run.py b/test_orc/modules/dns/python/src/run.py index 06b8aa571..4cd991804 100644 --- a/test_orc/modules/dns/python/src/run.py +++ b/test_orc/modules/dns/python/src/run.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +"""Run DNS test module""" import argparse import signal import sys @@ -13,7 +12,7 @@ class DNSModuleRunner: - + """Run the DNS module tests.""" def __init__(self, module): signal.signal(signal.SIGINT, self._handler) @@ -33,7 +32,7 @@ def add_logger(self, module): global LOGGER LOGGER = logger.get_logger(LOG_NAME, module) - def _handler(self, signum, *other): + def _handler(self, signum): LOGGER.debug("SigtermEnum: " + str(signal.SIGTERM)) LOGGER.debug("Exit signal received: " + str(signum)) if signum in (2, signal.SIGTERM): @@ -42,7 +41,7 @@ def _handler(self, signum, *other): sys.exit(1) -def run(argv): +def run(): parser = argparse.ArgumentParser( description="Test Module DNS", formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -60,4 +59,4 @@ def run(argv): if __name__ == "__main__": - run(sys.argv) + run() diff --git a/test_orc/modules/nmap/python/src/nmap_module.py b/test_orc/modules/nmap/python/src/nmap_module.py index cd6ec276b..876343a0f 100644 --- a/test_orc/modules/nmap/python/src/nmap_module.py +++ b/test_orc/modules/nmap/python/src/nmap_module.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +"""NMAP test module""" import time import util import json @@ -11,7 +10,7 @@ class NmapModule(TestModule): - + """NMAP Test module""" def __init__(self, module): super().__init__(module_name=module, log_name=LOG_NAME) self._unallowed_ports = [] @@ -82,13 +81,13 @@ def _check_scan_results(self, test_config): if self._script_scan_results is not None: scan_results.update(self._script_scan_results) if port_config is not None: - for port in port_config: + for port, config in port_config.items(): result = None LOGGER.info("Checking port: " + str(port)) - LOGGER.debug("Port config: " + str(port_config[port])) + LOGGER.debug("Port config: " + str(config)) if port in scan_results: if scan_results[port]["state"] == "open": - if not port_config[port]["allowed"]: + if not config["allowed"]: LOGGER.info("Unallowed port open") self._unallowed_ports.append(str(port)) result = False @@ -103,10 +102,9 @@ def _check_scan_results(self, test_config): result = True if result is not None: - port_config[port][ - "result"] = "compliant" if result else "non-compliant" + config["result"] = "compliant" if result else "non-compliant" else: - port_config[port]["result"] = "skipped" + config["result"] = "skipped" def _scan_scripts(self, tests): scan_results = {} @@ -174,7 +172,7 @@ def _scan_tcp_ports(self, tests): ports_to_scan += "," + ",".join(ports) LOGGER.info("Running nmap TCP port scan") LOGGER.info("TCP ports: " + str(ports_to_scan)) - nmap_results = util.run_command(f"""nmap -sT -sV -Pn -v -p {ports_to_scan} + nmap_results = util.run_command(f"""nmap -sT -sV -Pn -v -p {ports_to_scan} --version-intensity 7 -T4 {self._device_ipv4_addr}""")[0] LOGGER.info("TCP port scan complete") self._scan_tcp_results = self._process_nmap_results( diff --git a/test_orc/modules/nmap/python/src/run.py b/test_orc/modules/nmap/python/src/run.py index 4ed1f533c..959e30f87 100644 --- a/test_orc/modules/nmap/python/src/run.py +++ b/test_orc/modules/nmap/python/src/run.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +"""Run NMAP test module""" import argparse import signal import sys @@ -20,29 +19,29 @@ def __init__(self, module): signal.signal(signal.SIGABRT, self._handler) signal.signal(signal.SIGQUIT, self._handler) - LOGGER.info("Starting nmap Module") + LOGGER.info('Starting nmap Module') self._test_module = NmapModule(module) self._test_module.run_tests() - def _handler(self, signum, *other): - LOGGER.debug("SigtermEnum: " + str(signal.SIGTERM)) - LOGGER.debug("Exit signal received: " + str(signum)) + def _handler(self, signum): + LOGGER.debug('SigtermEnum: ' + str(signal.SIGTERM)) + LOGGER.debug('Exit signal received: ' + str(signum)) if signum in (2, signal.SIGTERM): - LOGGER.info("Exit signal received. Stopping test module...") - LOGGER.info("Test module stopped") + LOGGER.info('Exit signal received. Stopping test module...') + LOGGER.info('Test module stopped') sys.exit(1) -def run(argv): +def run(): parser = argparse.ArgumentParser( - description="Nmap Module Help", + description='Nmap Module Help', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( - "-m", - "--module", - help="Define the module name to be used to create the log file") + '-m', + '--module', + help='Define the module name to be used to create the log file') args = parser.parse_args() @@ -51,5 +50,5 @@ def run(argv): NmapModuleRunner(args.module.strip()) -if __name__ == "__main__": - run(sys.argv) +if __name__ == '__main__': + run() diff --git a/test_orc/python/src/test_orchestrator.py b/test_orc/python/src/test_orchestrator.py index 5cc14ae85..4b65bae12 100644 --- a/test_orc/python/src/test_orchestrator.py +++ b/test_orc/python/src/test_orchestrator.py @@ -67,7 +67,7 @@ def _generate_results(self, device): container_runtime_dir = os.path.join( self._root_path, "runtime/test/" + device.mac_addr.replace(":", "") + "/" + module.name) - results_file = container_runtime_dir + "/" + module.name + "-result.json" + results_file = f"{container_runtime_dir}/{module.name}-result.json" try: with open(results_file, "r", encoding="UTF-8") as f: module_results = json.load(f) diff --git a/testing/test_baseline.py b/testing/test_baseline.py index 6f6240c27..b356983dd 100644 --- a/testing/test_baseline.py +++ b/testing/test_baseline.py @@ -23,7 +23,7 @@ def validator_results(): encoding='utf-8') as f: return json.load(f) -@pytest.mark.skip(reason="requires internet") +@pytest.mark.skip(reason='requires internet') def test_internet_connectivity(container_data): assert container_data['network']['internet'] == 200 @@ -47,7 +47,7 @@ def test_dns_server_resolves(container_data): assert re.match(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', container_data['dns_response']) -@pytest.mark.skip(reason="requires internet") +@pytest.mark.skip(reason='requires internet') def test_validator_results_compliant(validator_results): results = [True if x['result'] == 'compliant' else False for x in validator_results['results']]