From 5f2b3018affa2cff7a853199d1270af2172468db Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Wed, 5 Jun 2024 09:25:14 +0100 Subject: [PATCH 1/6] Work towards creating profiles --- framework/python/src/api/api.py | 42 +++++++ framework/python/src/common/risk_profile.py | 37 +++---- framework/python/src/common/session.py | 117 ++++++++++++++++++++ 3 files changed, 176 insertions(+), 20 deletions(-) diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index 6931df36c..2f39592ba 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -119,6 +119,9 @@ def __init__(self, test_run): self.get_profiles_format) self._router.add_api_route("/profiles", self.get_profiles) + self._router.add_api_route("/profiles", + self.update_profile, + methods=["POST"]) # Allow all origins to access the API origins = ["*"] @@ -626,6 +629,45 @@ def get_profiles_format(self, response: Response): def get_profiles(self): return self.get_session().get_profiles() + async def update_profile(self, request: Request, response: Response): + + LOGGER.debug("Received profile update request") + + req_raw = (await request.body()).decode("UTF-8") + req_json = json.loads(req_raw) + + # Check that profile is valid + valid_profile = self.get_session().validate_profile(req_json) + if not valid_profile: + response.status_code = status.HTTP_400_BAD_REQUEST + return self._generate_msg(False, "Invalid profile request received") + + profile_name = req_json.get("name") + + # Check if profile exists + profile = self.get_session().get_profile(profile_name) + + if profile is None: + # Create new profile + profile = self.get_session().update_profile(req_json) + + if profile is not None: + response.status_code = status.HTTP_201_CREATED + return self._generate_msg(True, "Successfully created a new profile") + LOGGER.error("An error occurred whilst creating a new profile") + + else: + # Update existing profile + profile = self.get_session().update_profile(req_json) + + if profile is not None: + response.status_code = status.HTTP_200_OK + return self._generate_msg(True, "Successfully updated that profile") + LOGGER.error("An error occurred whilst updating a profile") + + response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return self._generate_msg(False, "An error occurred whilst creating or updating a profile") + # Certificates def get_certs(self): LOGGER.debug("Received certs list request") diff --git a/framework/python/src/common/risk_profile.py b/framework/python/src/common/risk_profile.py index bd82c076c..06ea41a43 100644 --- a/framework/python/src/common/risk_profile.py +++ b/framework/python/src/common/risk_profile.py @@ -15,30 +15,27 @@ from datetime import datetime -SECONDS_IN_YEAR = 31536000 - class RiskProfile(): def __init__(self, json_data): self.name = json_data['name'] - self.status = json_data['status'] - self.created = json_data['created'] - self.version = json_data['version'] - self.questions = json_data['questions'] - - # Check the profile has not expired - self.check_status() - def check_status(self): - if self.status == 'Valid': + if 'status' in json_data: + self.status = json_data['status'] + else: + self.status = 'Draft' - # Check expiry - created_date = datetime.strptime( - self.created, "%Y-%m-%d %H:%M:%S").timestamp() - - today = datetime.now().timestamp() - - if created_date < (today - SECONDS_IN_YEAR): - self.status = 'Expired' + self.created = datetime.strptime(json_data['created'], + '%Y-%m-%d') + self.version = json_data['version'] + self.questions = json_data['questions'] - return self.status + def to_json(self): + json = { + 'name': self.name, + 'version': self.version, + 'created': self.created.strftime('%Y-%m-%d'), + 'status': self.status, + 'questions': self.questions + } + return json \ No newline at end of file diff --git a/framework/python/src/common/session.py b/framework/python/src/common/session.py index ca5015c52..a58411478 100644 --- a/framework/python/src/common/session.py +++ b/framework/python/src/common/session.py @@ -37,6 +37,7 @@ MAX_DEVICE_REPORTS_KEY = 'max_device_reports' CERTS_PATH = 'local/root_certs' CONFIG_FILE_PATH = 'local/system.json' +SECONDS_IN_YEAR = 31536000 PROFILE_FORMAT_PATH = 'resources/risk_assessment.json' PROFILES_DIR = 'local/risk_profiles' @@ -372,7 +373,9 @@ def _load_profiles(self): ), encoding='utf-8') as f: json_data = json.load(f) risk_profile = RiskProfile(json_data) + risk_profile.status = self.check_profile_status(risk_profile) self._profiles.append(risk_profile) + except Exception as e: LOGGER.error('An error occurred whilst loading risk profiles') @@ -383,6 +386,120 @@ def get_profiles_format(self): def get_profiles(self): return self._profiles + + def get_profile(self, name): + for profile in self._profiles: + if profile.name.lower() == name.lower(): + return profile + return None + + def validate_profile(self, profile_json): + + # Check name field is present + if 'name' not in profile_json: + return False + + # Check questions field is present + if 'questions' not in profile_json: + return False + + # Check all questions are present + for format_q in self.get_profiles_format(): + if self._get_profile_question(profile_json, format_q.get('question')) is None: + print('Missing question: ' + format_q.get('question')) + return False + + return True + + def _get_profile_question(self, profile_json, question): + + for q in profile_json.get('questions'): + if question.lower() == q.get('question').lower(): + return question + + return None + + def update_profile(self, profile_json): + + profile_name = profile_json['name'] + + # Add version, timestamp and status + profile_json['version'] = self.get_version() + profile_json['created'] = datetime.datetime.now().strftime('%Y-%m-%d') + + if 'status' in profile_json and profile_json.get('status') == 'Valid': + # Attempting to submit a risk profile, we need to check it + + # Check all questions have been answered + all_questions_answered = True + + for question in self.get_profiles_format(): + + # Check question is present + profile_question = self._get_profile_question( + profile_json, question.get('question') + ) + + if profile_question is not None: + + # Check answer is present + if 'answer' not in profile_question: + print(profile_question) + print("Missing answer for question: " + question.get('question')) + all_questions_answered = False + + else: + print("Missing question: " + question.get('question')) + all_questions_answered = False + + if not all_questions_answered: + print('Not all questions answered') + return None + + else: + profile_json['status'] = 'Draft' + + risk_profile = self.get_profile(profile_name) + + if risk_profile is None: + + # Create a new risk profile + risk_profile = RiskProfile(profile_json) + self._profiles.append(risk_profile) + + else: + + # Check if name has changed + if 'rename' in profile_json: + new_name = profile_json.get('rename') + + # Delete the original file + os.remove(os.path.join(PROFILES_DIR, risk_profile.name + '.json')) + + risk_profile.name = new_name + + # Update questions and answers + risk_profile.questions = profile_json.get('questions') + + # Write file to disk + with open(os.path.join(PROFILES_DIR, risk_profile.name + '.json'), 'w') as f: + f.write(json.dumps(risk_profile.to_json())) + + return risk_profile + + def check_profile_status(self, profile): + + if profile.status == 'Valid': + + # Check expiry + created_date = profile.created.timestamp() + + today = datetime.datetime.now().timestamp() + + if created_date < (today - SECONDS_IN_YEAR): + profile.status = 'Expired' + + return profile.status def reset(self): self.set_status('Idle') From b661c3bc06490b4a5fd3d2c21b876f1125d15eaf Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Thu, 6 Jun 2024 21:01:52 +0100 Subject: [PATCH 2/6] Add delete profile endpoint --- framework/python/src/api/api.py | 60 ++++++++++++++++++++++++-- framework/python/src/common/session.py | 29 ++++++++++--- 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index 2f39592ba..8a5de24ec 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -122,6 +122,9 @@ def __init__(self, test_run): self._router.add_api_route("/profiles", self.update_profile, methods=["POST"]) + self._router.add_api_route("/profiles", + self.delete_profile, + methods=["DELETE"]) # Allow all origins to access the API origins = ["*"] @@ -381,7 +384,12 @@ async def delete_report(self, request: Request, response: Response): response.status_code = 400 return self._generate_msg(False, "Invalid request received") - body_json = json.loads(body_raw) + try: + body_json = json.loads(body_raw) + except JSONDecodeError as e: + response.status_code = status.HTTP_400_BAD_REQUEST + return self._generate_msg(False, + "Invalid request received") if "mac_addr" not in body_json or "timestamp" not in body_json: response.status_code = 400 @@ -633,8 +641,13 @@ async def update_profile(self, request: Request, response: Response): LOGGER.debug("Received profile update request") - req_raw = (await request.body()).decode("UTF-8") - req_json = json.loads(req_raw) + try: + req_raw = (await request.body()).decode("UTF-8") + req_json = json.loads(req_raw) + except JSONDecodeError as e: + response.status_code = status.HTTP_400_BAD_REQUEST + return self._generate_msg(False, + "Invalid request received") # Check that profile is valid valid_profile = self.get_session().validate_profile(req_json) @@ -667,6 +680,47 @@ async def update_profile(self, request: Request, response: Response): response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR return self._generate_msg(False, "An error occurred whilst creating or updating a profile") + + async def delete_profile(self, request: Request, response: Response): + + LOGGER.debug("Received profile delete request") + + try: + req_raw = (await request.body()).decode("UTF-8") + req_json = json.loads(req_raw) + except JSONDecodeError as e: + response.status_code = status.HTTP_400_BAD_REQUEST + return self._generate_msg(False, + "Invalid request received") + + # Check name included in request + if 'name' not in req_json: + response.status_code = status.HTTP_400_BAD_REQUEST + return self._generate_msg(False, + "Invalid request received") + + # Get profile name + profile_name = req_json.get("name") + + # Fetch profile + profile = self.get_session().get_profile(profile_name) + + # Check if profile exists + if profile is None: + response.status_code = status.HTTP_404_NOT_FOUND + return self._generate_msg(False, + "A profile with that name could not be found") + + # Attempt to delete the profile + success = self.get_session().delete_profile(profile) + + if not success: + response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return self._generate_msg(False, + "An error occurred whilst deleting that profile") + + return self._generate_msg(True, + "Successfully deleted that profile") # Certificates def get_certs(self): diff --git a/framework/python/src/common/session.py b/framework/python/src/common/session.py index a58411478..cd138bca0 100644 --- a/framework/python/src/common/session.py +++ b/framework/python/src/common/session.py @@ -406,7 +406,7 @@ def validate_profile(self, profile_json): # Check all questions are present for format_q in self.get_profiles_format(): if self._get_profile_question(profile_json, format_q.get('question')) is None: - print('Missing question: ' + format_q.get('question')) + LOGGER.error('Missing question: ' + format_q.get('question')) return False return True @@ -415,7 +415,7 @@ def _get_profile_question(self, profile_json, question): for q in profile_json.get('questions'): if question.lower() == q.get('question').lower(): - return question + return q return None @@ -444,16 +444,15 @@ def update_profile(self, profile_json): # Check answer is present if 'answer' not in profile_question: - print(profile_question) - print("Missing answer for question: " + question.get('question')) + LOGGER.error("Missing answer for question: " + question.get('question')) all_questions_answered = False else: - print("Missing question: " + question.get('question')) + LOGGER.error("Missing question: " + question.get('question')) all_questions_answered = False if not all_questions_answered: - print('Not all questions answered') + LOGGER.error('Not all questions answered') return None else: @@ -500,6 +499,24 @@ def check_profile_status(self, profile): profile.status = 'Expired' return profile.status + + def delete_profile(self, profile): + + try: + profile_name = profile.name + file_name = profile_name + '.json' + + profile_path = os.path.join(PROFILES_DIR, file_name) + + os.remove(profile_path) + self._profiles.remove(profile) + + return True + + except Exception as e: + LOGGER.error('An error occurred whilst deleting a profile') + LOGGER.debug(e) + return False def reset(self): self.set_status('Idle') From bd39867ea896b9742ef87bb9f1e5654ea6f7c221 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Mon, 10 Jun 2024 10:12:48 +0100 Subject: [PATCH 3/6] Work on attaching profile to zip --- framework/python/src/api/api.py | 49 ++++++++++-- framework/python/src/common/risk_profile.py | 9 ++- framework/python/src/common/session.py | 6 ++ .../python/src/test_orc/test_orchestrator.py | 74 ++++++++++++------- 4 files changed, 105 insertions(+), 33 deletions(-) diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index 8a5de24ec..d2c8f785b 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -580,13 +580,52 @@ async def get_report(self, response: Response, response.status_code = 404 return self._generate_msg(False, "Report could not be found") - async def get_results(self, response: Response, - device_name, timestamp): - - file_path = os.path.join(DEVICES_PATH, device_name, "reports", - timestamp, "results.zip") + async def get_results(self, request: Request, + response: Response, + device_name, timestamp): + LOGGER.debug("Received get results " + f"request for {device_name} / {timestamp}") + + profile = None + + try: + req_raw = (await request.body()).decode("UTF-8") + req_json = json.loads(req_raw) + + # Check if profile has been specified + if "profile" in req_json: + profile_name = req_json.get("profile") + profile = self.get_session().get_profile(profile_name) + + if profile is None: + response.status_code = status.HTTP_404_NOT_FOUND + return self._generate_msg(False, + "A profile with that name could not be found") + + except JSONDecodeError as e: + # Profile not specified + pass + + # Check if device exists + device = self.get_session().get_device_by_name(device_name) + if device is None: + response.status_code = status.HTTP_404_NOT_FOUND + return self._generate_msg(False, + "A device with that name could not be found") + + file_path = self._get_test_run().get_test_orc().zip_results( + device, + timestamp, + profile + ) + + if file_path is None: + response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return self._generate_msg( + False, + "An error occurred whilst archiving test results") + if os.path.isfile(file_path): return FileResponse(file_path) else: diff --git a/framework/python/src/common/risk_profile.py b/framework/python/src/common/risk_profile.py index 06ea41a43..42a730a75 100644 --- a/framework/python/src/common/risk_profile.py +++ b/framework/python/src/common/risk_profile.py @@ -14,6 +14,9 @@ """Stores additional information about a device's risk""" from datetime import datetime +import os + +PROFILES_PATH = 'local/risk_profiles' class RiskProfile(): @@ -38,4 +41,8 @@ def to_json(self): 'status': self.status, 'questions': self.questions } - return json \ No newline at end of file + return json + + def get_file_path(self): + return os.path.join(PROFILES_PATH, + self.name + '.json') diff --git a/framework/python/src/common/session.py b/framework/python/src/common/session.py index cd138bca0..04844ac77 100644 --- a/framework/python/src/common/session.py +++ b/framework/python/src/common/session.py @@ -256,6 +256,12 @@ def set_target_device(self, device): def get_target_device(self): return self._device + + def get_device_by_name(self, device_name): + for device in self._device_repository: + if device.device_folder.lower() == device_name.lower(): + return device + return None def get_device_repository(self): return self._device_repository diff --git a/framework/python/src/test_orc/test_orchestrator.py b/framework/python/src/test_orc/test_orchestrator.py index 32135254d..0b6a56af6 100644 --- a/framework/python/src/test_orc/test_orchestrator.py +++ b/framework/python/src/test_orc/test_orchestrator.py @@ -125,9 +125,6 @@ def run_test_modules(self): # Move testing output from runtime to local device folder timestamp_dir = self._timestamp_results(device) - # Archive the runtime directory - self._zip_results(timestamp_dir) - LOGGER.debug("Cleaning old test results...") self._cleanup_old_test_results(device) @@ -257,30 +254,53 @@ def _timestamp_results(self, device): return completed_results_dir - def _zip_results(self, dest_path): - - try: - LOGGER.debug("Archiving test results") - - # Define where to save the zip file - zip_location = os.path.join(dest_path, "results") - - # The runtime directory to include in ZIP - path_to_zip = os.path.join( - self._root_path, - RUNTIME_DIR) - - # Create ZIP file - shutil.make_archive(zip_location, "zip", path_to_zip) - - # Check that the ZIP was successfully created - zip_file = zip_location + ".zip" - LOGGER.info(f'''Archive {'created at ' + zip_file - if os.path.exists(zip_file) - else'creation failed'}''') - - except Exception as error: # pylint: disable=W0703 - LOGGER.error(f"Failed to create zip file: {error}") + def zip_results(self, + device, + timestamp, + profile): + + # try: + LOGGER.debug("Archiving test results") + + src_path = os.path.join(LOCAL_DEVICE_REPORTS.replace( + '{device_folder}', + device.device_folder), + timestamp) + + print("src path = " + src_path) + + # Define where to save the zip file + zip_location = os.path.join(LOCAL_DEVICE_REPORTS.replace( + '{device_folder}', device.device_folder), timestamp + ) + + print("zip location = " + zip_location) + + # Include profile if specified + if profile is not None: + LOGGER.debug( + f'Copying profile {profile.name} to results directory') + shutil.copy(profile.get_file_path(), + os.path.join( + src_path, + "profile.json")) + + # Create ZIP file + if not os.path.exists(zip_location + ".zip"): + shutil.make_archive(zip_location, "zip", src_path) + + # Check that the ZIP was successfully created + zip_file = zip_location + ".zip" + LOGGER.info(f'''Archive {'created at ' + zip_file + if os.path.exists(zip_file) + else'creation failed'}''') + + return zip_file + + # except Exception as error: # pylint: disable=W0703 + # LOGGER.error("Failed to create zip file") + # LOGGER.debug(error) + # return None def test_in_progress(self): return self._test_in_progress From be7faecb29670dcef73705fa7438b21e6641fae8 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Thu, 13 Jun 2024 11:26:45 +0100 Subject: [PATCH 4/6] Pylint --- framework/python/src/common/testreport.py | 4 ++-- .../python/src/test_orc/test_orchestrator.py | 15 ++++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/framework/python/src/common/testreport.py b/framework/python/src/common/testreport.py index 427d83793..aae36f759 100644 --- a/framework/python/src/common/testreport.py +++ b/framework/python/src/common/testreport.py @@ -429,13 +429,13 @@ def generate_footer(self, page_num): def generate_results(self, json_data, page_num): successful_tests = 0 - for test in self._results: + for test in json_data['tests']['results']: if test['result'] != 'Error': successful_tests += 1 result_list = f'''
-

Results List ({len(successful_tests)}/{self._total_tests})

+

Results List ({(successful_tests)}/{self._total_tests})

Name
Description
diff --git a/framework/python/src/test_orc/test_orchestrator.py b/framework/python/src/test_orc/test_orchestrator.py index 0b6a56af6..4f5503a06 100644 --- a/framework/python/src/test_orc/test_orchestrator.py +++ b/framework/python/src/test_orc/test_orchestrator.py @@ -263,15 +263,15 @@ def zip_results(self, LOGGER.debug("Archiving test results") src_path = os.path.join(LOCAL_DEVICE_REPORTS.replace( - '{device_folder}', + "{device_folder}", device.device_folder), timestamp) - + print("src path = " + src_path) # Define where to save the zip file zip_location = os.path.join(LOCAL_DEVICE_REPORTS.replace( - '{device_folder}', device.device_folder), timestamp + "{device_folder}", device.device_folder), timestamp ) print("zip location = " + zip_location) @@ -279,22 +279,27 @@ def zip_results(self, # Include profile if specified if profile is not None: LOGGER.debug( - f'Copying profile {profile.name} to results directory') + f"Copying profile {profile.name} to results directory") + print("Copying from " + profile.get_file_path()) + print("Copying to " + os.path.join(src_path, "profile.json")) shutil.copy(profile.get_file_path(), os.path.join( src_path, "profile.json")) + print("Done copying") # Create ZIP file if not os.path.exists(zip_location + ".zip"): + print("Making zip file") shutil.make_archive(zip_location, "zip", src_path) + print("Finished making zip file") # Check that the ZIP was successfully created zip_file = zip_location + ".zip" LOGGER.info(f'''Archive {'created at ' + zip_file if os.path.exists(zip_file) else'creation failed'}''') - + return zip_file # except Exception as error: # pylint: disable=W0703 From ce1ca882f04c5ccbe2f0d1ce721ec4df664fdbf4 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Thu, 13 Jun 2024 11:34:14 +0100 Subject: [PATCH 5/6] Fix zipping --- .../src/net_orc/network_orchestrator.py | 2 + .../python/src/test_orc/test_orchestrator.py | 85 +++++++++---------- 2 files changed, 40 insertions(+), 47 deletions(-) diff --git a/framework/python/src/net_orc/network_orchestrator.py b/framework/python/src/net_orc/network_orchestrator.py index f5e43cb32..2099455c5 100644 --- a/framework/python/src/net_orc/network_orchestrator.py +++ b/framework/python/src/net_orc/network_orchestrator.py @@ -21,6 +21,7 @@ import subprocess import sys import docker +import time from docker.types import Mount from common import logger, util from net_orc.listener import Listener @@ -281,6 +282,7 @@ def _start_device_monitor(self, device): sniffer.start() while sniffer.running: + time.sleep(1) if not self._ip_ctrl.check_interface_status( self._session.get_device_interface()): sniffer.stop() diff --git a/framework/python/src/test_orc/test_orchestrator.py b/framework/python/src/test_orc/test_orchestrator.py index 4f5503a06..11b61ec47 100644 --- a/framework/python/src/test_orc/test_orchestrator.py +++ b/framework/python/src/test_orc/test_orchestrator.py @@ -259,53 +259,44 @@ def zip_results(self, timestamp, profile): - # try: - LOGGER.debug("Archiving test results") - - src_path = os.path.join(LOCAL_DEVICE_REPORTS.replace( - "{device_folder}", - device.device_folder), - timestamp) - - print("src path = " + src_path) - - # Define where to save the zip file - zip_location = os.path.join(LOCAL_DEVICE_REPORTS.replace( - "{device_folder}", device.device_folder), timestamp - ) - - print("zip location = " + zip_location) - - # Include profile if specified - if profile is not None: - LOGGER.debug( - f"Copying profile {profile.name} to results directory") - print("Copying from " + profile.get_file_path()) - print("Copying to " + os.path.join(src_path, "profile.json")) - shutil.copy(profile.get_file_path(), - os.path.join( - src_path, - "profile.json")) - print("Done copying") - - # Create ZIP file - if not os.path.exists(zip_location + ".zip"): - print("Making zip file") - shutil.make_archive(zip_location, "zip", src_path) - print("Finished making zip file") - - # Check that the ZIP was successfully created - zip_file = zip_location + ".zip" - LOGGER.info(f'''Archive {'created at ' + zip_file - if os.path.exists(zip_file) - else'creation failed'}''') - - return zip_file - - # except Exception as error: # pylint: disable=W0703 - # LOGGER.error("Failed to create zip file") - # LOGGER.debug(error) - # return None + try: + LOGGER.debug("Archiving test results") + + src_path = os.path.join(LOCAL_DEVICE_REPORTS.replace( + "{device_folder}", + device.device_folder), + timestamp) + + # Define where to save the zip file + zip_location = os.path.join(LOCAL_DEVICE_REPORTS.replace( + "{device_folder}", device.device_folder), timestamp + ) + + # Include profile if specified + if profile is not None: + LOGGER.debug( + f"Copying profile {profile.name} to results directory") + shutil.copy(profile.get_file_path(), + os.path.join( + src_path, + "profile.json")) + + # Create ZIP file + if not os.path.exists(zip_location + ".zip"): + shutil.make_archive(zip_location, "zip", src_path) + + # Check that the ZIP was successfully created + zip_file = zip_location + ".zip" + LOGGER.info(f'''Archive {'created at ' + zip_file + if os.path.exists(zip_file) + else'creation failed'}''') + + return zip_file + + except Exception as error: # pylint: disable=W0703 + LOGGER.error("Failed to create zip file") + LOGGER.debug(error) + return None def test_in_progress(self): return self._test_in_progress From db92fece14dd8315ef49cfa77930b1e079a72c15 Mon Sep 17 00:00:00 2001 From: Jacob Boddey Date: Thu, 13 Jun 2024 11:39:44 +0100 Subject: [PATCH 6/6] Pylint fixes --- framework/python/src/api/api.py | 12 +- framework/python/src/common/risk_profile.py | 2 +- framework/python/src/common/session.py | 147 ++------------------ framework/python/src/core/testrun.py | 2 +- testing/api/test_api.py | 2 +- 5 files changed, 17 insertions(+), 148 deletions(-) diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index c4bc5795f..19a008450 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -587,7 +587,6 @@ async def get_report(self, response: Response, async def get_results(self, request: Request, response: Response, device_name, timestamp): - LOGGER.debug("Received get results " + f"request for {device_name} / {timestamp}") @@ -604,13 +603,14 @@ async def get_results(self, request: Request, if profile is None: response.status_code = status.HTTP_404_NOT_FOUND - return self._generate_msg(False, - "A profile with that name could not be found") + return self._generate_msg( + False, + "A profile with that name could not be found") - except JSONDecodeError as e: + except JSONDecodeError: # Profile not specified pass - + # Check if device exists device = self.get_session().get_device_by_name(device_name) if device is None: @@ -629,7 +629,7 @@ async def get_results(self, request: Request, return self._generate_msg( False, "An error occurred whilst archiving test results") - + if os.path.isfile(file_path): return FileResponse(file_path) else: diff --git a/framework/python/src/common/risk_profile.py b/framework/python/src/common/risk_profile.py index 3c832dbfd..021a1c5b5 100644 --- a/framework/python/src/common/risk_profile.py +++ b/framework/python/src/common/risk_profile.py @@ -43,7 +43,7 @@ def to_json(self): 'questions': self.questions } return json - + def get_file_path(self): return os.path.join(PROFILES_PATH, self.name + '.json') diff --git a/framework/python/src/common/session.py b/framework/python/src/common/session.py index 0738e0f73..5127cc221 100644 --- a/framework/python/src/common/session.py +++ b/framework/python/src/common/session.py @@ -256,7 +256,7 @@ def set_target_device(self, device): def get_target_device(self): return self._device - + def get_device_by_name(self, device_name): for device in self._device_repository: if device.device_folder.lower() == device_name.lower(): @@ -380,8 +380,7 @@ def _load_profiles(self): json_data = json.load(f) risk_profile = RiskProfile(json_data) risk_profile.status = self.check_profile_status(risk_profile) - self._profiles.append(risk_profile) - + self._profiles.append(risk_profile) except Exception as e: LOGGER.error('An error occurred whilst loading risk profiles') @@ -392,137 +391,6 @@ def get_profiles_format(self): def get_profiles(self): return self._profiles - - def get_profile(self, name): - for profile in self._profiles: - if profile.name.lower() == name.lower(): - return profile - return None - - def validate_profile(self, profile_json): - - # Check name field is present - if 'name' not in profile_json: - return False - - # Check questions field is present - if 'questions' not in profile_json: - return False - - # Check all questions are present - for format_q in self.get_profiles_format(): - if self._get_profile_question(profile_json, format_q.get('question')) is None: - LOGGER.error('Missing question: ' + format_q.get('question')) - return False - - return True - - def _get_profile_question(self, profile_json, question): - - for q in profile_json.get('questions'): - if question.lower() == q.get('question').lower(): - return q - - return None - - def update_profile(self, profile_json): - - profile_name = profile_json['name'] - - # Add version, timestamp and status - profile_json['version'] = self.get_version() - profile_json['created'] = datetime.datetime.now().strftime('%Y-%m-%d') - - if 'status' in profile_json and profile_json.get('status') == 'Valid': - # Attempting to submit a risk profile, we need to check it - - # Check all questions have been answered - all_questions_answered = True - - for question in self.get_profiles_format(): - - # Check question is present - profile_question = self._get_profile_question( - profile_json, question.get('question') - ) - - if profile_question is not None: - - # Check answer is present - if 'answer' not in profile_question: - LOGGER.error("Missing answer for question: " + question.get('question')) - all_questions_answered = False - - else: - LOGGER.error("Missing question: " + question.get('question')) - all_questions_answered = False - - if not all_questions_answered: - LOGGER.error('Not all questions answered') - return None - - else: - profile_json['status'] = 'Draft' - - risk_profile = self.get_profile(profile_name) - - if risk_profile is None: - - # Create a new risk profile - risk_profile = RiskProfile(profile_json) - self._profiles.append(risk_profile) - - else: - - # Check if name has changed - if 'rename' in profile_json: - new_name = profile_json.get('rename') - - # Delete the original file - os.remove(os.path.join(PROFILES_DIR, risk_profile.name + '.json')) - - risk_profile.name = new_name - - # Update questions and answers - risk_profile.questions = profile_json.get('questions') - - # Write file to disk - with open(os.path.join(PROFILES_DIR, risk_profile.name + '.json'), 'w') as f: - f.write(json.dumps(risk_profile.to_json())) - - return risk_profile - - def check_profile_status(self, profile): - - if profile.status == 'Valid': - - # Check expiry - created_date = profile.created.timestamp() - - today = datetime.datetime.now().timestamp() - - if created_date < (today - SECONDS_IN_YEAR): - profile.status = 'Expired' - - return profile.status - - def delete_profile(self, profile): - - try: - profile_name = profile.name - file_name = profile_name + '.json' - - profile_path = os.path.join(PROFILES_DIR, file_name) - - os.remove(profile_path) - self._profiles.remove(profile) - - return True - - except Exception as e: - LOGGER.error('An error occurred whilst deleting a profile') - LOGGER.debug(e) - return False def get_profile(self, name): for profile in self._profiles: @@ -544,8 +412,7 @@ def validate_profile(self, profile_json): for format_q in self.get_profiles_format(): if self._get_profile_question( profile_json, format_q.get('question')) is None: - LOGGER.error( - 'Missing question: ' + format_q.get('question')) + LOGGER.error('Missing question: ' + format_q.get('question')) return False return True @@ -621,9 +488,11 @@ def update_profile(self, profile_json): risk_profile.questions = profile_json.get('questions') # Write file to disk - with open(os.path.join( - PROFILES_DIR, risk_profile.name + '.json'), 'w', - encoding='utf-8') as f: + with open( + os.path.join( + PROFILES_DIR, + risk_profile.name + '.json' + ), 'w', encoding='utf-8') as f: f.write(json.dumps(risk_profile.to_json())) return risk_profile diff --git a/framework/python/src/core/testrun.py b/framework/python/src/core/testrun.py index 9ce9ef417..5ca763c93 100644 --- a/framework/python/src/core/testrun.py +++ b/framework/python/src/core/testrun.py @@ -479,4 +479,4 @@ def _stop_ui(self): if container is not None: container.kill() except docker.errors.NotFound: - return \ No newline at end of file + return diff --git a/testing/api/test_api.py b/testing/api/test_api.py index e19f2ddf6..75811e3bb 100644 --- a/testing/api/test_api.py +++ b/testing/api/test_api.py @@ -426,7 +426,7 @@ def test_delete_device_success(empty_devices_dir, testrun): # pylint: disable=W0 print(mockito) # Validate structure - assert all([isinstance(x, dict) for x in all_devices]) + assert all(isinstance(x, dict) for x in all_devices) # TOOO uncomment when is done # assert set(dict_paths(mockito[0])) == set(dict_paths(all_devices[0]))