diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index 2ba56b056..f50b824b4 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -248,7 +248,7 @@ async def start_test_run(self, request: Request, response: Response): return self._generate_msg(False,"Configured interfaces are not " + "ready for use. Ensure required interfaces " + "are connected.") - + device.test_modules = body_json["device"]["test_modules"] LOGGER.info("Starting Testrun with device target " + @@ -389,6 +389,8 @@ async def delete_report(self, request: Request, response: Response): try: body_json = json.loads(body_raw) except JSONDecodeError as e: + LOGGER.error("An error occurred whilst decoding JSON") + LOGGER.debug(e) response.status_code = status.HTTP_400_BAD_REQUEST return self._generate_msg(False, "Invalid request received") @@ -647,6 +649,8 @@ async def update_profile(self, request: Request, response: Response): req_raw = (await request.body()).decode("UTF-8") req_json = json.loads(req_raw) except JSONDecodeError as e: + LOGGER.error("An error occurred whilst decoding JSON") + LOGGER.debug(e) response.status_code = status.HTTP_400_BAD_REQUEST return self._generate_msg(False, "Invalid request received") @@ -656,9 +660,9 @@ async def update_profile(self, request: Request, response: Response): if not valid_profile: response.status_code = status.HTTP_400_BAD_REQUEST return self._generate_msg(False, "Invalid profile request received") - + profile_name = req_json.get("name") - + # Check if profile exists profile = self.get_session().get_profile(profile_name) @@ -674,15 +678,17 @@ async def update_profile(self, request: Request, response: Response): else: # Update existing profile profile = self.get_session().update_profile(req_json) - + if profile is not None: response.status_code = status.HTTP_200_OK return self._generate_msg(True, "Successfully updated that profile") LOGGER.error("An error occurred whilst updating a profile") response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - return self._generate_msg(False, "An error occurred whilst creating or updating a profile") - + return self._generate_msg( + False, + "An error occurred whilst creating or updating a profile") + async def delete_profile(self, request: Request, response: Response): LOGGER.debug("Received profile delete request") @@ -691,16 +697,18 @@ async def delete_profile(self, request: Request, response: Response): req_raw = (await request.body()).decode("UTF-8") req_json = json.loads(req_raw) except JSONDecodeError as e: + LOGGER.error("An error occurred whilst decoding JSON") + LOGGER.debug(e) response.status_code = status.HTTP_400_BAD_REQUEST return self._generate_msg(False, "Invalid request received") # Check name included in request - if 'name' not in req_json: + if "name" not in req_json: response.status_code = status.HTTP_400_BAD_REQUEST return self._generate_msg(False, "Invalid request received") - + # Get profile name profile_name = req_json.get("name") @@ -712,15 +720,16 @@ async def delete_profile(self, request: Request, response: Response): response.status_code = status.HTTP_404_NOT_FOUND return self._generate_msg(False, "A profile with that name could not be found") - + # Attempt to delete the profile success = self.get_session().delete_profile(profile) if not success: response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - return self._generate_msg(False, - "An error occurred whilst deleting that profile") - + return self._generate_msg( + False, + "An error occurred whilst deleting that profile") + return self._generate_msg(True, "Successfully deleted that profile") diff --git a/framework/python/src/common/risk_profile.py b/framework/python/src/common/risk_profile.py index 06ea41a43..eb81ba183 100644 --- a/framework/python/src/common/risk_profile.py +++ b/framework/python/src/common/risk_profile.py @@ -16,6 +16,7 @@ from datetime import datetime class RiskProfile(): + """Python representation of a risk profile""" def __init__(self, json_data): self.name = json_data['name'] @@ -38,4 +39,4 @@ def to_json(self): 'status': self.status, 'questions': self.questions } - return json \ No newline at end of file + return json diff --git a/framework/python/src/common/session.py b/framework/python/src/common/session.py index cd138bca0..783bc5c34 100644 --- a/framework/python/src/common/session.py +++ b/framework/python/src/common/session.py @@ -375,7 +375,6 @@ def _load_profiles(self): risk_profile = RiskProfile(json_data) risk_profile.status = self.check_profile_status(risk_profile) self._profiles.append(risk_profile) - except Exception as e: LOGGER.error('An error occurred whilst loading risk profiles') @@ -386,13 +385,13 @@ def get_profiles_format(self): def get_profiles(self): return self._profiles - + def get_profile(self, name): for profile in self._profiles: if profile.name.lower() == name.lower(): return profile return None - + def validate_profile(self, profile_json): # Check name field is present @@ -405,8 +404,10 @@ def validate_profile(self, profile_json): # Check all questions are present for format_q in self.get_profiles_format(): - if self._get_profile_question(profile_json, format_q.get('question')) is None: - LOGGER.error('Missing question: ' + format_q.get('question')) + if self._get_profile_question( + profile_json, format_q.get('question')) is None: + LOGGER.error( + 'Missing question: ' + format_q.get('question')) return False return True @@ -444,11 +445,12 @@ def update_profile(self, profile_json): # Check answer is present if 'answer' not in profile_question: - LOGGER.error("Missing answer for question: " + question.get('question')) + LOGGER.error( + 'Missing answer for question: ' + question.get('question')) all_questions_answered = False else: - LOGGER.error("Missing question: " + question.get('question')) + LOGGER.error('Missing question: ' + question.get('question')) all_questions_answered = False if not all_questions_answered: @@ -481,9 +483,11 @@ def update_profile(self, profile_json): risk_profile.questions = profile_json.get('questions') # Write file to disk - with open(os.path.join(PROFILES_DIR, risk_profile.name + '.json'), 'w') as f: + with open(os.path.join( + PROFILES_DIR, risk_profile.name + '.json'), 'w', + encoding='utf-8') as f: f.write(json.dumps(risk_profile.to_json())) - + return risk_profile def check_profile_status(self, profile): @@ -499,7 +503,7 @@ def check_profile_status(self, profile): profile.status = 'Expired' return profile.status - + def delete_profile(self, profile): try: diff --git a/framework/python/src/net_orc/network_orchestrator.py b/framework/python/src/net_orc/network_orchestrator.py index b716f2ab4..f5e43cb32 100644 --- a/framework/python/src/net_orc/network_orchestrator.py +++ b/framework/python/src/net_orc/network_orchestrator.py @@ -538,7 +538,7 @@ def _start_network_service(self, net_module): # DNS configuration (/etc/resolv.conf) Re-add when/if # this network is utilized and DNS issue is resolved #network=PRIVATE_DOCKER_NET, - network_mode="none", + network_mode='none', privileged=True, detach=True, mounts=net_module.mounts, diff --git a/modules/devices/faux-dev/python/src/util.py b/modules/devices/faux-dev/python/src/util.py index 8ffc70cbc..81f9d2ced 100644 --- a/modules/devices/faux-dev/python/src/util.py +++ b/modules/devices/faux-dev/python/src/util.py @@ -26,21 +26,23 @@ def run_command(cmd, logger, output=True): by any return code from the process other than zero.""" success = False - process = subprocess.Popen(shlex.split(cmd), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = process.communicate() + with subprocess.Popen( + shlex.split(cmd), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) as process: - if process.returncode != 0: - err_msg = f'{stderr.strip()}. Code: {process.returncode}' - logger.error('Command Failed: ' + cmd) - logger.error('Error: ' + err_msg) - else: - success = True - logger.debug('Command succeeded: ' + cmd) - if output: - out = stdout.strip().decode('utf-8') - logger.debug('Command output: ' + out) - return success, out - else: - return success, None + stdout, stderr = process.communicate() + + if process.returncode != 0: + err_msg = f'{stderr.strip()}. Code: {process.returncode}' + logger.error('Command Failed: ' + cmd) + logger.error('Error: ' + err_msg) + else: + success = True + logger.debug('Command succeeded: ' + cmd) + if output: + out = stdout.strip().decode('utf-8') + logger.debug('Command output: ' + out) + return success, out + else: + return success, None diff --git a/modules/test/tls/python/src/run.py b/modules/test/tls/python/src/run.py index 78296f0c1..89de9f65e 100644 --- a/modules/test/tls/python/src/run.py +++ b/modules/test/tls/python/src/run.py @@ -66,4 +66,4 @@ def run(): if __name__ == '__main__': - run() \ No newline at end of file + run() diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 544f40ba9..d8c1d7a16 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -771,4 +771,4 @@ def is_ecdh_and_ecdsa(self, ciphers): for cipher in ciphers: ecdh |= 'ECDH' in cipher ecdsa |= 'ECDSA' in cipher - return {'ecdh': ecdh, 'ecdsa': ecdsa} \ No newline at end of file + return {'ecdh': ecdh, 'ecdsa': ecdsa} diff --git a/testing/api/test_api.py b/testing/api/test_api.py index 741d10ca5..e19f2ddf6 100644 --- a/testing/api/test_api.py +++ b/testing/api/test_api.py @@ -119,38 +119,38 @@ def testing_devices(): @pytest.fixture def testrun(request): # pylint: disable=W0613 """ Start intstance of testrun """ - proc = subprocess.Popen( + with subprocess.Popen( "bin/testrun", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8", preexec_fn=os.setsid - ) + ) as proc: + + while True: + try: + outs = proc.communicate(timeout=1)[0] + except subprocess.TimeoutExpired as e: + if e.output is not None: + output = e.output.decode("utf-8") + if re.search("API waiting for requests", output): + break + except Exception: + pytest.fail("testrun terminated") + + time.sleep(2) - while True: + yield + + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) try: - outs = proc.communicate(timeout=1)[0] + outs = proc.communicate(timeout=60)[0] except subprocess.TimeoutExpired as e: - if e.output is not None: - output = e.output.decode("utf-8") - if re.search("API waiting for requests", output): - break - except Exception: - pytest.fail("testrun terminated") - - time.sleep(2) - - yield - - os.killpg(os.getpgid(proc.pid), signal.SIGTERM) - try: - outs = proc.communicate(timeout=60)[0] - except subprocess.TimeoutExpired as e: - print(e.output) - os.killpg(os.getpgid(proc.pid), signal.SIGKILL) - pytest.exit( - "waited 60s but test run did not cleanly exit .. terminating all tests" - ) + print(e.output) + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + pytest.exit( + "waited 60s but Testrun did not cleanly exit .. terminating all tests" + ) print(outs) @@ -232,10 +232,10 @@ def test_get_system_interfaces(testrun): # pylint: disable=W0613 assert set(response.keys()) == set(local_interfaces) # schema expects a flat list - assert all([isinstance(x, str) for x in response]) + assert all(isinstance(x, str) for x in response) -def test_status_idle(testrun): +def test_status_idle(testrun): # pylint: disable=W0613 until_true( lambda: query_system_status().lower() == "idle", "system status is `idle`", @@ -244,7 +244,7 @@ def test_status_idle(testrun): # Currently not working due to blocking during monitoring period @pytest.mark.skip() -def test_status_in_progress(testing_devices, testrun): +def test_status_in_progress(testing_devices, testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) @@ -348,7 +348,7 @@ def test_create_get_devices(empty_devices_dir, testrun): # pylint: disable=W0613 print(mockito) # Validate structure - assert all([isinstance(x, dict) for x in all_devices]) + assert all(isinstance(x, dict) for x in all_devices) # TOOO uncomment when is done # assert set(dict_paths(mockito[0])) == set(dict_paths(all_devices[0])) @@ -360,7 +360,7 @@ def test_create_get_devices(empty_devices_dir, testrun): # pylint: disable=W0613 ) -def test_delete_device_success(empty_devices_dir, testrun): +def test_delete_device_success(empty_devices_dir, testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -375,9 +375,10 @@ def test_delete_device_success(empty_devices_dir, testrun): } # Send create device request - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", + data=json.dumps(device_1), + timeout=5) print(r.text) - device1_response = r.text # Check device has been created assert r.status_code == 201 @@ -395,25 +396,30 @@ def test_delete_device_success(empty_devices_dir, testrun): "nmap": {"enabled": True}, }, } - r = requests.post(f"{API}/device", data=json.dumps(device_2)) - device2_response = json.loads(r.text) + r = requests.post(f"{API}/device", + data=json.dumps(device_2), + timeout=5) assert r.status_code == 201 assert len(local_get_devices()) == 2 # Test that device_1 deletes - r = requests.delete(f"{API}/device/",data=json.dumps(device_1)) + r = requests.delete(f"{API}/device/", + data=json.dumps(device_1), + timeout=5) assert r.status_code == 200 assert len(local_get_devices()) == 1 # Test that returned devices API endpoint matches expected structure - r = requests.get(f"{API}/devices") + r = requests.get(f"{API}/devices", timeout=5) all_devices = json.loads(r.text) pretty_print(all_devices) with open( - os.path.join(os.path.dirname(__file__), "mockito/get_devices.json") + os.path.join(os.path.dirname(__file__), + "mockito/get_devices.json"), + encoding="utf-8" ) as f: mockito = json.load(f) @@ -432,7 +438,7 @@ def test_delete_device_success(empty_devices_dir, testrun): ) -def test_delete_device_not_found(empty_devices_dir, testrun): +def test_delete_device_not_found(empty_devices_dir, testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -447,27 +453,31 @@ def test_delete_device_not_found(empty_devices_dir, testrun): } # Send create device request - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", + data=json.dumps(device_1), + timeout=5) print(r.text) - device1_response = r.text # Check device has been created assert r.status_code == 201 assert len(local_get_devices()) == 1 # Test that device_1 deletes - r = requests.delete(f"{API}/device/",data=json.dumps(device_1)) + r = requests.delete(f"{API}/device/", + data=json.dumps(device_1), + timeout=5) assert r.status_code == 200 assert len(local_get_devices()) == 0 # Test that device_1 is not found - r = requests.delete(f"{API}/device/",data=json.dumps(device_1)) + r = requests.delete(f"{API}/device/", + data=json.dumps(device_1), + timeout=5) assert r.status_code == 404 assert len(local_get_devices()) == 0 - -def test_delete_device_no_mac(empty_devices_dir, testrun): +def test_delete_device_no_mac(empty_devices_dir, testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -482,7 +492,9 @@ def test_delete_device_no_mac(empty_devices_dir, testrun): } # Send create device request - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", + data=json.dumps(device_1), + timeout=5) print(r.text) # Check device has been created @@ -492,14 +504,16 @@ def test_delete_device_no_mac(empty_devices_dir, testrun): device_1.pop("mac_addr") # Test that device_1 can't delete with no mac address - r = requests.delete(f"{API}/device/",data=json.dumps(device_1)) + r = requests.delete(f"{API}/device/", + data=json.dumps(device_1), + timeout=5) assert r.status_code == 400 assert len(local_get_devices()) == 1 # Currently not working due to blocking during monitoring period @pytest.mark.skip() -def test_delete_device_testrun_running(testing_devices, testrun): +def test_delete_device_testrun_running(testing_devices, testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) @@ -531,11 +545,15 @@ def test_delete_device_testrun_running(testing_devices, testrun): "nmap": {"enabled": True}, }, } - r = requests.delete(f"{API}/device/",data=json.dumps(device_1)) + r = requests.delete(f"{API}/device/", + data=json.dumps(device_1), + timeout=5) assert r.status_code == 403 -def test_start_testrun_started_successfully(testing_devices, testrun): +def test_start_testrun_started_successfully( + testing_devices, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) assert r.status_code == 200 @@ -543,7 +561,9 @@ def test_start_testrun_started_successfully(testing_devices, testrun): # Currently not working due to blocking during monitoring period @pytest.mark.skip() -def test_start_testrun_already_in_progress(testing_devices, testrun): +def test_start_testrun_already_in_progress( + testing_devices, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) @@ -563,7 +583,9 @@ def test_start_testrun_already_in_progress(testing_devices, testrun): r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) assert r.status_code == 409 -def test_start_system_not_configured_correctly(empty_devices_dir, testrun): +def test_start_system_not_configured_correctly( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -578,15 +600,20 @@ def test_start_system_not_configured_correctly(empty_devices_dir, testrun): } # Send create device request - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", + data=json.dumps(device_1), + timeout=5) print(r.text) payload = {"device": {"mac_addr": None, "firmware": "asd"}} - r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) + r = requests.post(f"{API}/system/start", + data=json.dumps(payload), + timeout=10) assert r.status_code == 500 -def test_start_device_not_found(empty_devices_dir, testrun): +def test_start_device_not_found(empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -601,18 +628,26 @@ def test_start_device_not_found(empty_devices_dir, testrun): } # Send create device request - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", + data=json.dumps(device_1), + timeout=5) print(r.text) - r = requests.delete(f"{API}/device/",data=json.dumps(device_1)) + r = requests.delete(f"{API}/device/", + data=json.dumps(device_1), + timeout=5) assert r.status_code == 200 payload = {"device": {"mac_addr": device_1["mac_addr"], "firmware": "asd"}} - r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) + r = requests.post(f"{API}/system/start", + data=json.dumps(payload), + timeout=10) assert r.status_code == 404 -def test_start_missing_device_information(empty_devices_dir, testrun): +def test_start_missing_device_information( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -627,15 +662,21 @@ def test_start_missing_device_information(empty_devices_dir, testrun): } # Send create device request - r = requests.post(f"{API}/device", data=json.dumps(device_1)) + r = requests.post(f"{API}/device", + data=json.dumps(device_1), + timeout=5) print(r.text) payload = {} - r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) + r = requests.post(f"{API}/system/start", + data=json.dumps(payload), + timeout=10) assert r.status_code == 400 -def test_create_device_already_exists(empty_devices_dir, testrun): +def test_create_device_already_exists( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -649,37 +690,47 @@ def test_create_device_already_exists(empty_devices_dir, testrun): }, } - r = requests.post(f"{API}/device", data=json.dumps(device_1), + r = requests.post(f"{API}/device", + data=json.dumps(device_1), timeout=5) print(r.text) assert r.status_code == 201 assert len(local_get_devices()) == 1 - r = requests.post(f"{API}/device", data=json.dumps(device_1), + r = requests.post(f"{API}/device", + data=json.dumps(device_1), timeout=5) print(r.text) assert r.status_code == 409 -def test_create_device_invalid_json(empty_devices_dir, testrun): +def test_create_device_invalid_json( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { } - r = requests.post(f"{API}/device", data=json.dumps(device_1), + r = requests.post(f"{API}/device", + data=json.dumps(device_1), timeout=5) print(r.text) assert r.status_code == 400 -def test_create_device_invalid_request(empty_devices_dir, testrun): +def test_create_device_invalid_request( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 - r = requests.post(f"{API}/device", data=None, + r = requests.post(f"{API}/device", + data=None, timeout=5) print(r.text) assert r.status_code == 400 -def test_device_edit_device(testing_devices, testrun): # pylint: disable=W0613 +def test_device_edit_device( + testing_devices, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 with open( testing_devices[1], encoding="utf-8" ) as f: @@ -726,7 +777,9 @@ def test_device_edit_device(testing_devices, testrun): # pylint: disable=W0613 assert updated_device_api["test_modules"] == new_test_modules -def test_device_edit_device_not_found(empty_devices_dir, testrun): +def test_device_edit_device_not_found( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -740,7 +793,8 @@ def test_device_edit_device_not_found(empty_devices_dir, testrun): }, } - r = requests.post(f"{API}/device", data=json.dumps(device_1), + r = requests.post(f"{API}/device", + data=json.dumps(device_1), timeout=5) print(r.text) assert r.status_code == 201 @@ -761,7 +815,9 @@ def test_device_edit_device_not_found(empty_devices_dir, testrun): assert r.status_code == 404 -def test_device_edit_device_incorrect_json_format(empty_devices_dir, testrun): +def test_device_edit_device_incorrect_json_format( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -775,7 +831,8 @@ def test_device_edit_device_incorrect_json_format(empty_devices_dir, testrun): }, } - r = requests.post(f"{API}/device", data=json.dumps(device_1), + r = requests.post(f"{API}/device", + data=json.dumps(device_1), timeout=5) print(r.text) assert r.status_code == 201 @@ -791,7 +848,9 @@ def test_device_edit_device_incorrect_json_format(empty_devices_dir, testrun): assert r.status_code == 400 -def test_device_edit_device_with_mac_already_exists(empty_devices_dir, testrun): +def test_device_edit_device_with_mac_already_exists( + empty_devices_dir, # pylint: disable=W0613 + testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", "model": "First", @@ -805,7 +864,8 @@ def test_device_edit_device_with_mac_already_exists(empty_devices_dir, testrun): }, } - r = requests.post(f"{API}/device", data=json.dumps(device_1), + r = requests.post(f"{API}/device", + data=json.dumps(device_1), timeout=5) print(r.text) assert r.status_code == 201 @@ -823,7 +883,8 @@ def test_device_edit_device_with_mac_already_exists(empty_devices_dir, testrun): "nmap": {"enabled": True}, }, } - r = requests.post(f"{API}/device", data=json.dumps(device_2), + r = requests.post(f"{API}/device", + data=json.dumps(device_2), timeout=5) assert r.status_code == 201 assert len(local_get_devices()) == 2 @@ -844,7 +905,7 @@ def test_device_edit_device_with_mac_already_exists(empty_devices_dir, testrun): assert r.status_code == 409 -def test_system_latest_version(testrun): +def test_system_latest_version(testrun): # pylint: disable=W0613 r = requests.get(f"{API}/system/version", timeout=5) assert r.status_code == 200 updated_system_version = json.loads(r.text)["update_available"]