diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index 87039d40c..409884fd1 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -433,7 +433,7 @@ async def delete_report(self, request: Request, response: Response): if len(body_raw) == 0: response.status_code = 400 - return self._generate_msg(False, "Invalid request received") + return self._generate_msg(False, "Invalid request received, missing body") try: body_json = json.loads(body_raw) @@ -445,12 +445,18 @@ async def delete_report(self, request: Request, response: Response): if "mac_addr" not in body_json or "timestamp" not in body_json: response.status_code = 400 - return self._generate_msg(False, "Invalid request received") + return self._generate_msg(False, "Missing mac address or timestamp") mac_addr = body_json.get("mac_addr").lower() timestamp = body_json.get("timestamp") - parsed_timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") - timestamp_formatted = parsed_timestamp.strftime("%Y-%m-%dT%H:%M:%S") + + try: + parsed_timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") + timestamp_formatted = parsed_timestamp.strftime("%Y-%m-%dT%H:%M:%S") + + except ValueError: + response.status_code = 400 + return self._generate_msg(False, "Incorrect timestamp format") # Get device from MAC address device = self._session.get_device(mac_addr) @@ -541,7 +547,7 @@ async def save_device(self, request: Request, response: Response): ) # Check if device folder exists - device_folder = os.path.join(self._test_run.get_root_dir(), + device_folder = os.path.join(self._testrun.get_root_dir(), DEVICES_PATH, device_json.get(DEVICE_MANUFACTURER_KEY) + " " + @@ -647,6 +653,12 @@ async def edit_device(self, request: Request, response: Response): async def get_report(self, response: Response, device_name, timestamp): device = self._session.get_device_by_name(device_name) + # If the device not found + if device is None: + LOGGER.info("Device not found, returning 404") + response.status_code = 404 + return self._generate_msg(False, "Device not found") + # 1.3 file path file_path = os.path.join( DEVICES_PATH, @@ -700,16 +712,34 @@ async def get_results(self, request: Request, response: Response, device_name, return self._generate_msg(False, "A device with that name could not be found") - file_path = self._get_testrun().get_test_orc().zip_results( + # Check if report exists (1.3 file path) + report_file_path = os.path.join( + DEVICES_PATH, + device_name, + "reports", + timestamp,"test", + device.mac_addr.replace(":","")) + + if not os.path.isdir(report_file_path): + # pre 1.3 file path + report_file_path = os.path.join(DEVICES_PATH, device_name, "reports", + timestamp) + + if not os.path.isdir(report_file_path): + LOGGER.info("Report could not be found, returning 404") + response.status_code = 404 + return self._generate_msg(False, "Report could not be found") + + zip_file_path = self._get_testrun().get_test_orc().zip_results( device, timestamp, profile) - if file_path is None: + if zip_file_path is None: response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR return self._generate_msg( False, "An error occurred whilst archiving test results") - if os.path.isfile(file_path): - return FileResponse(file_path) + if os.path.isfile(zip_file_path): + return FileResponse(zip_file_path) else: LOGGER.info("Test results could not be found, returning 404") response.status_code = 404 diff --git a/framework/requirements.txt b/framework/requirements.txt index 0484905ee..402009ef9 100644 --- a/framework/requirements.txt +++ b/framework/requirements.txt @@ -21,8 +21,6 @@ pydantic==2.7.1 # Requirements for testing pytest==7.4.4 pytest-timeout==2.2.0 -responses==0.25.3 - # Requirements for the report markdown==3.5.2 diff --git a/testing/api/certificates/WR2.pem b/testing/api/certificates/WR2.pem new file mode 100644 index 000000000..f82f4d12d --- /dev/null +++ b/testing/api/certificates/WR2.pem @@ -0,0 +1,29 @@ +-----BEGIN CERTIFICATE----- +MIIFCzCCAvOgAwIBAgIQf/AFoHxM3tEArZ1mpRB7mDANBgkqhkiG9w0BAQsFADBH +MQswCQYDVQQGEwJVUzEiMCAGA1UEChMZR29vZ2xlIFRydXN0IFNlcnZpY2VzIExM +QzEUMBIGA1UEAxMLR1RTIFJvb3QgUjEwHhcNMjMxMjEzMDkwMDAwWhcNMjkwMjIw +MTQwMDAwWjA7MQswCQYDVQQGEwJVUzEeMBwGA1UEChMVR29vZ2xlIFRydXN0IFNl +cnZpY2VzMQwwCgYDVQQDEwNXUjIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQCp/5x/RR5wqFOfytnlDd5GV1d9vI+aWqxG8YSau5HbyfsvAfuSCQAWXqAc ++MGr+XgvSszYhaLYWTwO0xj7sfUkDSbutltkdnwUxy96zqhMt/TZCPzfhyM1IKji +aeKMTj+xWfpgoh6zySBTGYLKNlNtYE3pAJH8do1cCA8Kwtzxc2vFE24KT3rC8gIc +LrRjg9ox9i11MLL7q8Ju26nADrn5Z9TDJVd06wW06Y613ijNzHoU5HEDy01hLmFX +xRmpC5iEGuh5KdmyjS//V2pm4M6rlagplmNwEmceOuHbsCFx13ye/aoXbv4r+zgX +FNFmp6+atXDMyGOBOozAKql2N87jAgMBAAGjgf4wgfswDgYDVR0PAQH/BAQDAgGG +MB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjASBgNVHRMBAf8ECDAGAQH/ +AgEAMB0GA1UdDgQWBBTeGx7teRXUPjckwyG77DQ5bUKyMDAfBgNVHSMEGDAWgBTk +rysmcRorSCeFL1JmLO/wiRNxPjA0BggrBgEFBQcBAQQoMCYwJAYIKwYBBQUHMAKG +GGh0dHA6Ly9pLnBraS5nb29nL3IxLmNydDArBgNVHR8EJDAiMCCgHqAchhpodHRw +Oi8vYy5wa2kuZ29vZy9yL3IxLmNybDATBgNVHSAEDDAKMAgGBmeBDAECATANBgkq +hkiG9w0BAQsFAAOCAgEARXWL5R87RBOWGqtY8TXJbz3S0DNKhjO6V1FP7sQ02hYS +TL8Tnw3UVOlIecAwPJQl8hr0ujKUtjNyC4XuCRElNJThb0Lbgpt7fyqaqf9/qdLe +SiDLs/sDA7j4BwXaWZIvGEaYzq9yviQmsR4ATb0IrZNBRAq7x9UBhb+TV+PfdBJT +DhEl05vc3ssnbrPCuTNiOcLgNeFbpwkuGcuRKnZc8d/KI4RApW//mkHgte8y0YWu +ryUJ8GLFbsLIbjL9uNrizkqRSvOFVU6xddZIMy9vhNkSXJ/UcZhjJY1pXAprffJB +vei7j+Qi151lRehMCofa6WBmiA4fx+FOVsV2/7R6V2nyAiIJJkEd2nSi5SnzxJrl +Xdaqev3htytmOPvoKWa676ATL/hzfvDaQBEcXd2Ppvy+275W+DKcH0FBbX62xevG +iza3F4ydzxl6NJ8hk8R+dDXSqv1MbRT1ybB5W0k8878XSOjvmiYTDIfyc9acxVJr +Y/cykHipa+te1pOhv7wYPYtZ9orGBV5SGOJm4NrB3K1aJar0RfzxC3ikr7Dyc6Qw +qDTBU39CluVIQeuQRgwG3MuSxl7zRERDRilGoKb8uY45JzmxWuKxrfwT/478JuHU +/oTxUFqOl2stKnn7QGTq8z29W+GgBLCXSBxC9epaHM0myFH/FJlniXJfHeytWt0= +-----END CERTIFICATE----- diff --git a/testing/api/certificates/crt.pem b/testing/api/certificates/crt.pem new file mode 100644 index 000000000..410b1f104 --- /dev/null +++ b/testing/api/certificates/crt.pem @@ -0,0 +1,31 @@ +-----BEGIN CERTIFICATE----- +MIIFVzCCAz+gAwIBAgINAgPlk28xsBNJiGuiFzANBgkqhkiG9w0BAQwFADBHMQsw +CQYDVQQGEwJVUzEiMCAGA1UEChMZR29vZ2xlIFRydXN0IFNlcnZpY2VzIExMQzEU +MBIGA1UEAxMLR1RTIFJvb3QgUjEwHhcNMTYwNjIyMDAwMDAwWhcNMzYwNjIyMDAw +MDAwWjBHMQswCQYDVQQGEwJVUzEiMCAGA1UEChMZR29vZ2xlIFRydXN0IFNlcnZp +Y2VzIExMQzEUMBIGA1UEAxMLR1RTIFJvb3QgUjEwggIiMA0GCSqGSIb3DQEBAQUA +A4ICDwAwggIKAoICAQC2EQKLHuOhd5s73L+UPreVp0A8of2C+X0yBoJx9vaMf/vo +27xqLpeXo4xL+Sv2sfnOhB2x+cWX3u+58qPpvBKJXqeqUqv4IyfLpLGcY9vXmX7w +Cl7raKb0xlpHDU0QM+NOsROjyBhsS+z8CZDfnWQpJSMHobTSPS5g4M/SCYe7zUjw +TcLCeoiKu7rPWRnWr4+wB7CeMfGCwcDfLqZtbBkOtdh+JhpFAz2weaSUKK0Pfybl +qAj+lug8aJRT7oM6iCsVlgmy4HqMLnXWnOunVmSPlk9orj2XwoSPwLxAwAtcvfaH +szVsrBhQf4TgTM2S0yDpM7xSma8ytSmzJSq0SPly4cpk9+aCEI3oncKKiPo4Zor8 +Y/kB+Xj9e1x3+naH+uzfsQ55lVe0vSbv1gHR6xYKu44LtcXFilWr06zqkUspzBmk +MiVOKvFlRNACzqrOSbTqn3yDsEB750Orp2yjj32JgfpMpf/VjsPOS+C12LOORc92 +wO1AK/1TD7Cn1TsNsYqiA94xrcx36m97PtbfkSIS5r762DL8EGMUUXLeXdYWk70p +aDPvOmbsB4om3xPXV2V4J95eSRQAogB/mqghtqmxlbCluQ0WEdrHbEg8QOB+DVrN +VjzRlwW5y0vtOUucxD/SVRNuJLDWcfr0wbrM7Rv1/oFB2ACYPTrIrnqYNxgFlQID +AQABo0IwQDAOBgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E +FgQU5K8rJnEaK0gnhS9SZizv8IkTcT4wDQYJKoZIhvcNAQEMBQADggIBAJ+qQibb +C5u+/x6Wki4+omVKapi6Ist9wTrYggoGxval3sBOh2Z5ofmmWJyq+bXmYOfg6LEe +QkEzCzc9zolwFcq1JKjPa7XSQCGYzyI0zzvFIoTgxQ6KfF2I5DUkzps+GlQebtuy +h6f88/qBVRRiClmpIgUxPoLW7ttXNLwzldMXG+gnoot7TiYaelpkttGsN/H9oPM4 +7HLwEXWdyzRSjeZ2axfG34arJ45JK3VmgRAhpuo+9K4l/3wV3s6MJT/KYnAK9y8J +ZgfIPxz88NtFMN9iiMG1D53Dn0reWVlHxYciNuaCp+0KueIHoI17eko8cdLiA6Ef +MgfdG+RCzgwARWGAtQsgWSl4vflVy2PFPEz0tv/bal8xa5meLMFrUKTX5hgUvYU/ +Z6tGn6D/Qqc6f1zLXbBwHSs09dR2CQzreExZBfMzQsNhFRAbd03OIozUhfJFfbdT +6u9AWpQKXCBfTkBdYiJ23//OYb2MI3jSNwLgjt7RETeJ9r/tSQdirpLsQBqvFAnZ +0E6yove+7u7Y/9waLd64NnHi/Hm3lCXRSHNboTXns5lndcEZOitHTtNCjv0xyBZm +2tIMPNuzjsmhDYAPexZ3FL//2wmUspO8IFgV6dtxQ/PeEMMA3KgqlbbC1j+Qa3bb +bP6MvPJwNQzcmRk13NfIRmPVNnGuV/u3gm3c +-----END CERTIFICATE----- diff --git a/testing/api/certificates/invalid.pem b/testing/api/certificates/invalid.pem new file mode 100644 index 000000000..d3f5a12fa --- /dev/null +++ b/testing/api/certificates/invalid.pem @@ -0,0 +1 @@ + diff --git a/testing/api/certificates/invalidname1234567891234.pem b/testing/api/certificates/invalidname1234567891234.pem new file mode 100644 index 000000000..410b1f104 --- /dev/null +++ b/testing/api/certificates/invalidname1234567891234.pem @@ -0,0 +1,31 @@ +-----BEGIN CERTIFICATE----- +MIIFVzCCAz+gAwIBAgINAgPlk28xsBNJiGuiFzANBgkqhkiG9w0BAQwFADBHMQsw +CQYDVQQGEwJVUzEiMCAGA1UEChMZR29vZ2xlIFRydXN0IFNlcnZpY2VzIExMQzEU +MBIGA1UEAxMLR1RTIFJvb3QgUjEwHhcNMTYwNjIyMDAwMDAwWhcNMzYwNjIyMDAw +MDAwWjBHMQswCQYDVQQGEwJVUzEiMCAGA1UEChMZR29vZ2xlIFRydXN0IFNlcnZp +Y2VzIExMQzEUMBIGA1UEAxMLR1RTIFJvb3QgUjEwggIiMA0GCSqGSIb3DQEBAQUA +A4ICDwAwggIKAoICAQC2EQKLHuOhd5s73L+UPreVp0A8of2C+X0yBoJx9vaMf/vo +27xqLpeXo4xL+Sv2sfnOhB2x+cWX3u+58qPpvBKJXqeqUqv4IyfLpLGcY9vXmX7w +Cl7raKb0xlpHDU0QM+NOsROjyBhsS+z8CZDfnWQpJSMHobTSPS5g4M/SCYe7zUjw +TcLCeoiKu7rPWRnWr4+wB7CeMfGCwcDfLqZtbBkOtdh+JhpFAz2weaSUKK0Pfybl +qAj+lug8aJRT7oM6iCsVlgmy4HqMLnXWnOunVmSPlk9orj2XwoSPwLxAwAtcvfaH +szVsrBhQf4TgTM2S0yDpM7xSma8ytSmzJSq0SPly4cpk9+aCEI3oncKKiPo4Zor8 +Y/kB+Xj9e1x3+naH+uzfsQ55lVe0vSbv1gHR6xYKu44LtcXFilWr06zqkUspzBmk +MiVOKvFlRNACzqrOSbTqn3yDsEB750Orp2yjj32JgfpMpf/VjsPOS+C12LOORc92 +wO1AK/1TD7Cn1TsNsYqiA94xrcx36m97PtbfkSIS5r762DL8EGMUUXLeXdYWk70p +aDPvOmbsB4om3xPXV2V4J95eSRQAogB/mqghtqmxlbCluQ0WEdrHbEg8QOB+DVrN +VjzRlwW5y0vtOUucxD/SVRNuJLDWcfr0wbrM7Rv1/oFB2ACYPTrIrnqYNxgFlQID +AQABo0IwQDAOBgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E +FgQU5K8rJnEaK0gnhS9SZizv8IkTcT4wDQYJKoZIhvcNAQEMBQADggIBAJ+qQibb +C5u+/x6Wki4+omVKapi6Ist9wTrYggoGxval3sBOh2Z5ofmmWJyq+bXmYOfg6LEe +QkEzCzc9zolwFcq1JKjPa7XSQCGYzyI0zzvFIoTgxQ6KfF2I5DUkzps+GlQebtuy +h6f88/qBVRRiClmpIgUxPoLW7ttXNLwzldMXG+gnoot7TiYaelpkttGsN/H9oPM4 +7HLwEXWdyzRSjeZ2axfG34arJ45JK3VmgRAhpuo+9K4l/3wV3s6MJT/KYnAK9y8J +ZgfIPxz88NtFMN9iiMG1D53Dn0reWVlHxYciNuaCp+0KueIHoI17eko8cdLiA6Ef +MgfdG+RCzgwARWGAtQsgWSl4vflVy2PFPEz0tv/bal8xa5meLMFrUKTX5hgUvYU/ +Z6tGn6D/Qqc6f1zLXbBwHSs09dR2CQzreExZBfMzQsNhFRAbd03OIozUhfJFfbdT +6u9AWpQKXCBfTkBdYiJ23//OYb2MI3jSNwLgjt7RETeJ9r/tSQdirpLsQBqvFAnZ +0E6yove+7u7Y/9waLd64NnHi/Hm3lCXRSHNboTXns5lndcEZOitHTtNCjv0xyBZm +2tIMPNuzjsmhDYAPexZ3FL//2wmUspO8IFgV6dtxQ/PeEMMA3KgqlbbC1j+Qa3bb +bP6MvPJwNQzcmRk13NfIRmPVNnGuV/u3gm3c +-----END CERTIFICATE----- diff --git a/testing/api/profiles/new_profile.json b/testing/api/profiles/new_profile.json index d63ecd17c..e2e46f3ab 100644 --- a/testing/api/profiles/new_profile.json +++ b/testing/api/profiles/new_profile.json @@ -2,10 +2,6 @@ "name": "New Profile", "status": "Valid", "questions": [ - { - "question": "What type of device is this?", - "answer": "IoT Gateway" - }, { "question": "How will this device be used at Google?", "answer": "Monitoring" @@ -18,17 +14,11 @@ "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", "answer": "N/A" }, - { - "question": "Are any of the following statements true about your device?", - "answer": [ - 0 - ] - }, { "question": "Which of the following statements are true about this device?", "answer": [ 0 - ] + ] }, { "question": "Does the network protocol assure server-to-client identity verification?", @@ -38,7 +28,7 @@ "question": "Click the statements that best describe the characteristics of this device.", "answer": [ 0 - ] + ] }, { "question": "Are any of the following statements true about this device?", diff --git a/testing/api/profiles/new_profile_2.json b/testing/api/profiles/new_profile_2.json index 2ac93dc17..963265fe0 100644 --- a/testing/api/profiles/new_profile_2.json +++ b/testing/api/profiles/new_profile_2.json @@ -2,55 +2,35 @@ "name": "New Profile 2", "status": "Draft", "questions": [ - { - "question": "What type of device is this?", - "answer": "IoT Gateway" - }, - { - "question": "How will this device be used at Google?", - "answer": "Installed in a building" - }, - { - "question": "Is this device going to be managed by Google or a third party?", - "answer": "Google" - }, - { - "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", - "answer": "Yes" - }, - { - "question": "Are any of the following statements true about your device?", - "answer": [ - 0, - 2 - ] - }, - { - "question": "Which of the following statements are true about this device?", - "answer": [ - 0, - 1, - 5 - ] - }, - { - "question": "Does the network protocol assure server-to-client identity verification?", - "answer": "Yes" - }, - { - "question": "Click the statements that best describe the characteristics of this device.", - "answer": [ - 0, - 1, - 2 - ] - }, - { - "question": "Are any of the following statements true about this device?", - "answer": [ - 2, - 3 - ] - } + { + "question": "How will this device be used at Google?", + "answer": "Monitoring" + }, + { + "question": "Is this device going to be managed by Google or a third party?", + "answer": "Google" + }, + { + "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", + "answer": "N/A" + }, + { + "question": "Which of the following statements are true about this device?", + "answer": [] + }, + { + "question": "Does the network protocol assure server-to-client identity verification?", + "answer": "Yes" + }, + { + "question": "Click the statements that best describe the characteristics of this device.", + "answer": [] + }, + { + "question": "Are any of the following statements true about this device?", + "answer": [ + 0 + ] + } ] } \ No newline at end of file diff --git a/testing/api/profiles/updated_profile.json b/testing/api/profiles/updated_profile.json index 91714bcfa..9184cd07d 100644 --- a/testing/api/profiles/updated_profile.json +++ b/testing/api/profiles/updated_profile.json @@ -3,55 +3,39 @@ "rename": "Updated Profile", "status": "Draft", "questions": [ - { - "question": "What type of device is this?", - "answer": "IoT Gateway" - }, - { - "question": "How will this device be used at Google?", - "answer": "Installed in a building" - }, - { - "question": "Is this device going to be managed by Google or a third party?", - "answer": "Google" - }, - { - "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", - "answer": "Yes" - }, - { - "question": "Are any of the following statements true about your device?", - "answer": [ - 0, - 2 - ] - }, - { - "question": "Which of the following statements are true about this device?", - "answer": [ - 0, - 1, - 5 - ] - }, - { - "question": "Does the network protocol assure server-to-client identity verification?", - "answer": "Yes" - }, - { - "question": "Click the statements that best describe the characteristics of this device.", - "answer": [ - 0, - 1, - 2 - ] - }, - { - "question": "Are any of the following statements true about this device?", - "answer": [ - 2, - 3 - ] - } + { + "question": "How will this device be used at Google?", + "answer": "Monitoring" + }, + { + "question": "Is this device going to be managed by Google or a third party?", + "answer": "Google" + }, + { + "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", + "answer": "N/A" + }, + { + "question": "Which of the following statements are true about this device?", + "answer": [] + }, + { + "question": "Does the network protocol assure server-to-client identity verification?", + "answer": "Yes" + }, + { + "question": "Click the statements that best describe the characteristics of this device.", + "answer": [] + }, + { + "question": "Are any of the following statements true about this device?", + "answer": [ + 0 + ] + }, + { + "question": "Comments", + "answer": "" + } ] } \ No newline at end of file diff --git a/testing/api/reports/report.json b/testing/api/reports/report.json new file mode 100644 index 000000000..bd697654d --- /dev/null +++ b/testing/api/reports/report.json @@ -0,0 +1,134 @@ +{ + "testrun": { + "version": "1.3.1" + }, + "mac_addr": null, + "device": { + "mac_addr": "00:1e:42:35:73:c4", + "manufacturer": "Teltonika", + "model": "TRB140", + "firmware": "1.2.3", + "test_modules": { + "protocol": { + "enabled": true + }, + "services": { + "enabled": false + }, + "connection": { + "enabled": false + }, + "tls": { + "enabled": true + }, + "ntp": { + "enabled": true + }, + "dns": { + "enabled": true + } + } + }, + "status": "Non-Compliant", + "started": "2024-08-05 13:37:53", + "finished": "2024-08-05 13:39:35", + "tests": { + "total": 12, + "results": [ + { + "name": "protocol.valid_bacnet", + "description": "BACnet device could not be discovered", + "expected_behavior": "BACnet traffic can be seen on the network and packets are valid and not malformed", + "required_result": "Recommended", + "result": "Feature Not Detected" + }, + { + "name": "protocol.bacnet.version", + "description": "Device did not respond to BACnet discovery", + "expected_behavior": "The BACnet client implements an up to date version of BACnet", + "required_result": "Recommended", + "result": "Feature Not Detected" + }, + { + "name": "protocol.valid_modbus", + "description": "Device did not respond to Modbus connection", + "expected_behavior": "Any Modbus functionality works as expected and valid Modbus traffic can be observed", + "required_result": "Recommended", + "result": "Feature Not Detected" + }, + { + "name": "security.tls.v1_2_server", + "description": "TLS 1.2 certificate is invalid", + "expected_behavior": "TLS 1.2 certificate is issued to the web browser client when accessed", + "required_result": "Required if Applicable", + "result": "Non-Compliant", + "recommendations": [ + "Enable TLS 1.2 support in the web server configuration", + "Disable TLS 1.0 and 1.1", + "Sign the certificate used by the web server" + ] + }, + { + "name": "security.tls.v1_2_client", + "description": "TLS 1.2 client connections valid", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers", + "required_result": "Required if Applicable", + "result": "Compliant" + }, + { + "name": "security.tls.v1_3_server", + "description": "TLS 1.3 certificate is invalid", + "expected_behavior": "TLS 1.3 certificate is issued to the web browser client when accessed", + "required_result": "Informational", + "result": "Informational" + }, + { + "name": "security.tls.v1_3_client", + "description": "TLS 1.3 client connections valid", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.3", + "required_result": "Informational", + "result": "Informational" + }, + { + "name": "ntp.network.ntp_support", + "description": "Device sent NTPv3 packets. NTPv3 is not allowed", + "expected_behavior": "The device sends an NTPv4 request to the configured NTP server.", + "required_result": "Required", + "result": "Non-Compliant", + "recommendations": [ + "Set the NTP version to v4 in the NTP client", + "Install an NTP client that supports NTPv4" + ] + }, + { + "name": "ntp.network.ntp_dhcp", + "description": "Device sent NTP request to non-DHCP provided server", + "expected_behavior": "Device can accept NTP server address, provided by the DHCP server (DHCP OFFER PACKET)", + "required_result": "Roadmap", + "result": "Feature Not Detected" + }, + { + "name": "dns.network.hostname_resolution", + "description": "DNS traffic detected from device", + "expected_behavior": "The device sends DNS requests.", + "required_result": "Required", + "result": "Compliant" + }, + { + "name": "dns.network.from_dhcp", + "description": "DNS traffic detected only to DHCP provided server", + "expected_behavior": "The device sends DNS requests to the DNS server provided by the DHCP server", + "required_result": "Informational", + "result": "Informational" + }, + { + "name": "dns.mdns", + "description": "No MDNS traffic detected from the device", + "expected_behavior": "Device may send MDNS requests", + "required_result": "Informational", + "result": "Informational" + } + ] + }, + "report": "http://localhost:8000/report/Teltonika TRB140/2024-08-05T13:37:53" +} \ No newline at end of file diff --git a/testing/api/reports/report.pdf b/testing/api/reports/report.pdf new file mode 100644 index 000000000..0e449f196 Binary files /dev/null and b/testing/api/reports/report.pdf differ diff --git a/testing/api/test_api.py b/testing/api/test_api.py index cea760576..49f894c86 100644 --- a/testing/api/test_api.py +++ b/testing/api/test_api.py @@ -29,7 +29,6 @@ import pytest import requests - ALL_DEVICES = "*" API = "http://127.0.0.1:8000" LOG_PATH = "/tmp/testrun.log" @@ -39,12 +38,17 @@ TESTING_DEVICES = "../device_configs" PROFILES_DIRECTORY = "local/risk_profiles" SYSTEM_CONFIG_PATH = "local/system.json" +CERTS_DIRECTORY = "local/root_certs" + SYSTEM_CONFIG_RESTORE_PATH = "testing/api/system.json" +CERTS_PATH = "testing/api/certificates" PROFILES_PATH = "testing/api/profiles" +REPORTS_PATH = "testing/api/reports" BASELINE_MAC_ADDR = "02:42:aa:00:01:01" ALL_MAC_ADDR = "02:42:aa:00:00:01" +TIMESTAMP = "2024-01-01 00:00:00" DEVICE_PROFILE_QUESTIONS = "resources/devices/device_profile.json" def pretty_print(dictionary: dict): @@ -101,9 +105,9 @@ def docker_logs(device_name): def load_json(file_name, directory): """Utility method to load json files' """ # Construct the base path relative to the main folder - base_path = Path(__file__).resolve().parent.parent.parent + base_path = os.path.abspath(os.path.join(__file__, "../../..")) # Construct the full file path - file_path = base_path / directory / file_name + file_path = os.path.join(base_path, directory, file_name) # Open the file in read mode with open(file_path, "r", encoding="utf-8") as file: @@ -112,7 +116,13 @@ def load_json(file_name, directory): @pytest.fixture def empty_devices_dir(): - """ Use e,pty devices directory """ + """ Fixture to empty devices directory """ + # Empty the directory before the test + local_delete_devices(ALL_DEVICES) + + yield + + # Empty the directory after the test local_delete_devices(ALL_DEVICES) @pytest.fixture @@ -352,7 +362,7 @@ def test_get_system_config(testrun): # pylint: disable=W0613 == api_config["network"]["internet_intf"] ) -def test_start_testrun_started_successfully(testing_devices, testrun): # pylint: disable=W0613 +def test_start_testrun_success(testing_devices, testrun): # pylint: disable=W0613 """Test for testrun started successfully """ # Payload with device details @@ -465,7 +475,7 @@ def test_start_testrun_device_not_found(testing_devices, testrun): # pylint: dis # Currently not working due to blocking during monitoring period @pytest.mark.skip() -def test_status_in_progress(testing_devices, testrun): # pylint: disable=W0613 +def test_status_in_progress(testing_devices, testrun): # pylint: disable=W0613 payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) @@ -682,7 +692,7 @@ def test_system_shutdown(testrun): # pylint: disable=W0613 # Check if the response status code is 200 (OK) assert r.status_code == 200, f"Expected status code 200, got {r.status_code}" -def test_system_shutdown_in_progress(testrun): # pylint: disable=W0613 +def test_system_shutdown_in_progress(testrun): # pylint: disable=W0613 """Test system shutdown during an in-progress test""" # Payload with device details to start a test payload = { @@ -712,24 +722,110 @@ def test_system_shutdown_in_progress(testrun): # pylint: disable=W0613 # Check if the response status code is 400 (test in progress) assert r.status_code == 400 -def test_system_latest_version(testrun): # pylint: disable=W0613 - """Test for testrun version when the latest version is installed""" +def test_system_version(testrun): # pylint: disable=W0613 + """Test for testrun version endpoint""" # Send the get request to the API r = requests.get(f"{API}/system/version", timeout=5) + # Check if status code is 200 (ok) + assert r.status_code == 200 + # Parse the response response = r.json() - # Check if status code is 200 (update available) - assert r.status_code == 200 - # Check if an update is available - assert response["update_available"] is False + # Assign the json response keys and expected types + expected_keys = { + "installed_version":str, + "update_available":bool, + "latest_version":str, + "latest_version_url":str + } + + # Iterate over the dict keys and values + for key, key_type in expected_keys.items(): + + # Check if the key is in the JSON response + assert key in response + + # Check if the key has the expected data type + assert isinstance(response[key], key_type) # Tests for reports endpoints +@pytest.fixture +def create_report_folder(): # pylint: disable=W0613 + """Fixture to create the reports folder in local/devices""" + def _create_report_folder(device_name, mac_address, timestamp): + + # Create the device folder path + main_folder = os.path.join(DEVICES_DIRECTORY, device_name) + + # Remove the ":" from mac address for the folder structure + mac_address = mac_address.replace(":", "") + + # Change the timestamp format for the folder structure + timestamp = timestamp.replace(" ", "T") + + # Create the report folder path + report_folder = os.path.join(main_folder, "reports", timestamp, + "test", mac_address) + + # Ensure the report folder exists + os.makedirs(report_folder, exist_ok=True) + + # Iterate over the files from 'testing/api/reports' folder + for file in os.listdir(REPORTS_PATH): + + # Construct full path of the file from 'testing/api/reports' folder + source_path = os.path.join(REPORTS_PATH, file) + + # Construct full path where the file will be copied + target_path = os.path.join(report_folder, file) + + # Copy the file + shutil.copy(source_path, target_path) + + return report_folder + + return _create_report_folder + +def create_and_get_device(): + """Utility method to create a device""" + + # Payload with device details + device = { + "mac_addr": "00:1e:42:28:9e:4a", + "manufacturer": "Teltonika", + "model": "TRB140" + } + + # Send a POST request to create a new device + requests.post(f"{API}/device", data=json.dumps(device), timeout=5) + + # Send the GET request to retrieve the created device's folder name + r = requests.get(f"{API}/devices", timeout=5) + + # Parse the json response + response = r.json() + + # Extract the device name and MAC address from response + device_name = f'{response[0]["manufacturer"]} {response[0]["model"]}' + mac_address = response[0]["mac_addr"] + + # Return only the device name and MAC address + return device_name, mac_address + + + +@pytest.fixture() +def add_device(): + """Fixture to add device during tests""" + # Returning the reference to create_device + return create_and_get_device + def test_get_reports_no_reports(testrun): # pylint: disable=W0613 - """Test get reports when no reports exist.""" + """Test get reports when no reports exist""" # Send a GET request to the /reports endpoint r = requests.get(f"{API}/reports", timeout=5) @@ -746,6 +842,370 @@ def test_get_reports_no_reports(testrun): # pylint: disable=W0613 # Check if the response is an empty list assert response == [] +def test_delete_report_success(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): + """Test for succesfully delete a report (200)""" + + r = requests.get(f"{API}/devices", timeout=5) + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Create the report directory + report_folder = create_report_folder(device_name, mac_address, TIMESTAMP) + + # Payload + delete_data = { + "mac_addr": mac_address, + "timestamp": TIMESTAMP + } + + # Send a DELETE request to remove the report + r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + + # Parse the json response + response = r.json() + + # Check if "success" in response + assert "success" in response + + # Check if report folder has been deleted + assert not os.path.exists(report_folder) + +def test_delete_report_no_payload(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): # pylint: disable=W0613 + """Test delete report bad request when the payload is missing (400)""" + + # Send a DELETE request to remove the report without the payload + r = requests.delete(f"{API}/report", timeout=5) + + # Check if status code is 400 (bad request) + assert r.status_code == 400 + + # Parse the json response + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message returned + assert "Invalid request received, missing body" in response["error"] + +def test_delete_report_invalid_payload(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): + """Test delete report bad request, mac addr and timestamp are missing (400)""" + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Create the report directory + create_report_folder(device_name, mac_address, TIMESTAMP) + + # Empty payload + delete_data = {} + + # Send a DELETE request to remove the report + r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + + # Check if status code is 400 (bad request) + assert r.status_code == 400 + + # Parse the json response + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message returned + assert "Missing mac address or timestamp" in response["error"] + +def test_delete_report_invalid_timestamp(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): + """Test delete report bad request when timestamp format is not valid (400)""" + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Assign the incorrect timestamp format + timestamp = "2024-01-01 invalid" + + # Create the report.json + create_report_folder(device_name, mac_address, timestamp) + + # Payload + delete_data = { + "mac_addr": mac_address, + "timestamp": timestamp + } + + # Send a DELETE request to remove the report + r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + + # Check if status code is 400 (bad request) + assert r.status_code == 400 + + # Parse the json response + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message returned + assert "Incorrect timestamp format" in response["error"] + +def test_delete_report_no_report(empty_devices_dir, testrun): # pylint: disable=W0613 + """Test delete report when report does not exist (404)""" + + # Payload to be deleted for a non existing device + delete_data = { + "mac_addr": "00:1e:42:35:73:c4", + "timestamp": TIMESTAMP + } + + # Send the delete request to the endpoint + r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + + # Check if status is 404 (not found) + assert r.status_code == 404 + + # Parse the response json + response = r.json() + + # Check if "error" in response + assert "error" in response + +def test_delete_report_server_error(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): + """Test for delete report causing internal server error (500)""" + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Create the report folder and JSON file + create_report_folder(device_name, mac_address, TIMESTAMP) + + # Construct the device report path + device_directory = os.path.join(DEVICES_DIRECTORY, device_name) + + # Remove the folder and all its content before delete request + shutil.rmtree(device_directory) + + # Prepare the payload for the DELETE request + delete_data = { + "mac_addr": mac_address, + "timestamp": TIMESTAMP + } + + # Send the delete request to delete the report + r = requests.delete(f"{API}/report", + data=json.dumps(delete_data), + timeout=5) + + # Check if status code is 500 (Internal Server Error) + assert r.status_code == 500 + + # Parse the JSON response + response = r.json() + + # Check if error is present in the response + assert "error" in response + + # Check if the correct error message is returned + assert "Error occured whilst deleting report" in response["error"] + +def test_get_report_success(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): + """Test get report when report exists (200)""" + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Assign the timestamp and change the format + timestamp = TIMESTAMP.replace(" ", "T") + + # Create the report for the device + create_report_folder(device_name, mac_address, timestamp) + + # Send the get request + r = requests.get(f"{API}/report/{device_name}/{timestamp}", timeout=5) + + # Check if status code is 200 (ok) + assert r.status_code == 200 + + # Check if the response is a PDF + assert r.headers["Content-Type"] == "application/pdf" + +def test_get_report_not_found(empty_devices_dir, testrun, add_device): # pylint: disable=W0613 + """Test get report when report doesn't exist (404)""" + + # Load the device_name and ignore mac_address from add_device fixture + device_name, _ = add_device() + + # Send the get request + r = requests.get(f"{API}/report/{device_name}/{TIMESTAMP}", timeout=5) + + # Check if status code is 404 (not found) + assert r.status_code == 404 + + # Parse the response json + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message returned + assert "Report could not be found" in response["error"] + +def test_get_report_device_not_found(empty_devices_dir, testrun): # pylint: disable=W0613 + """Test getting a report when the device is not found (404)""" + + # Assign device name and timestamp + device_name = "nonexistent_device" + + # Send the get request + r = requests.get(f"{API}/report/{device_name}/{TIMESTAMP}", timeout=5) + + # Check if is 404 (not found) + assert r.status_code == 404 + + # Parse the json response + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message is returned + assert "Device not found" in response["error"] + +def test_export_report_device_not_found(empty_devices_dir, testrun, # pylint: disable=W0613 + create_report_folder): + """Test for export the report result when the device could not be found""" + + # Assign the non-existing device name, mac_address + device_name = "non existing device" + mac_address = "00:1e:42:35:73:c4" + + # Create the report for the non-existing device + create_report_folder(device_name, mac_address, TIMESTAMP) + + # Send the post request + r = requests.post(f"{API}/export/{device_name}/{TIMESTAMP}", timeout=5) + + # Check if is 404 (not found) + assert r.status_code == 404 + + # Parse the json response + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message returned + assert "A device with that name could not be found" in response["error"] + +def test_export_report_profile_not_found(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): + """Test for export report result when the profile is not found""" + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Create the report for the device + create_report_folder(device_name, mac_address, TIMESTAMP) + + # Add a non existing profile into the payload + payload = {"profile": "non_existent_profile"} + + # Send the post request + r = requests.post(f"{API}/export/{device_name}/{TIMESTAMP}", + json=payload, + timeout=5) + + # Check if is 404 (not found) + assert r.status_code == 404 + + # Parse the json response + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message returned + assert "A profile with that name could not be found" in response["error"] + +def test_export_report_not_found(empty_devices_dir, testrun, add_device): # pylint: disable=W0613 + """Test for export the report result when the report could not be found""" + + # Load the device_name and ignore mac_address from add_device fixture + device_name, _ = add_device() + + # Send the post request to trigger the zipping process + r = requests.post(f"{API}/export/{device_name}/{TIMESTAMP}", timeout=10) + + # Check if status code is 500 (Internal Server Error) + assert r.status_code == 404 + + # Parse the json response + response = r.json() + + # Check if "error" in response + assert "error" in response + + # Check if the correct error message is returned + assert "Report could not be found" in response["error"] + +def test_export_report_with_profile(empty_devices_dir, testrun, add_device, # pylint: disable=W0613 + create_report_folder, reset_profiles, add_profile): # pylint: disable=W0613 + """Test export results with existing profile when report exists (200)""" + + # Create a profile using the add_profile fixture + profile = add_profile("new_profile.json") + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Assign the timestamp and change the format + timestamp = TIMESTAMP.replace(" ", "T") + + # Create the report for the device + create_report_folder(device_name, mac_address, timestamp) + + # Send the post request + r = requests.post(f"{API}/export/{device_name}/{timestamp}", + json=profile, + timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + + # Check if the response is a zip file + assert r.headers["Content-Type"] == "application/zip" + +def test_export_results_with_no_profile(empty_devices_dir, testrun, # pylint: disable=W0613 + add_device, create_report_folder): + """Test export results with no profile when report exists (200)""" + + # Load the device_name and mac_address from add_device fixture + device_name, mac_address = add_device() + + # Assign the timestamp and change the format + timestamp = TIMESTAMP.replace(" ", "T") + + # Create the report for the device + create_report_folder(device_name, mac_address, timestamp) + + # Send the post request + r = requests.post(f"{API}/export/{device_name}/{timestamp}", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + + # Check if the response is a zip file + assert r.headers["Content-Type"] == "application/zip" + # Tests for device endpoints @pytest.mark.skip() @@ -1049,7 +1509,6 @@ def test_start_system_not_configured_correctly( r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) payload = {"device": {"mac_addr": None, "firmware": "asd"}} r = requests.post(f"{API}/system/start", @@ -1076,7 +1535,6 @@ def test_start_device_not_found(empty_devices_dir, # pylint: disable=W0613 r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) r = requests.delete(f"{API}/device/", data=json.dumps(device_1), @@ -1109,7 +1567,6 @@ def test_start_missing_device_information( r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) payload = {} r = requests.post(f"{API}/system/start", @@ -1136,14 +1593,12 @@ def test_create_device_already_exists( r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) assert r.status_code == 201 assert len(local_get_devices()) == 1 r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) assert r.status_code == 409 def test_create_device_invalid_json( @@ -1155,7 +1610,6 @@ def test_create_device_invalid_json( r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) assert r.status_code == 400 def test_create_device_invalid_request( @@ -1165,7 +1619,6 @@ def test_create_device_invalid_request( r = requests.post(f"{API}/device", data=None, timeout=5) - print(r.text) assert r.status_code == 400 def test_device_edit_device( @@ -1235,7 +1688,6 @@ def test_device_edit_device_not_found( r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) assert r.status_code == 201 assert len(local_get_devices()) == 1 @@ -1272,7 +1724,6 @@ def test_device_edit_device_incorrect_json_format( r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) assert r.status_code == 201 assert len(local_get_devices()) == 1 @@ -1304,7 +1755,6 @@ def test_device_edit_device_with_mac_already_exists( r = requests.post(f"{API}/device", data=json.dumps(device_1), timeout=5) - print(r.text) assert r.status_code == 201 assert len(local_get_devices()) == 1 @@ -1394,30 +1844,414 @@ def test_get_test_modules(testrun): # pylint: disable=W0613 # Check if the response is a list assert isinstance(response, list) +# Tests for certificates endpoints + +def delete_all_certs(): + """Utility method to delete all certificates from root_certs folder""" + + try: + + # Check if the profile_path (local/root_certs) exists and is a folder + if os.path.exists(CERTS_DIRECTORY) and os.path.isdir(CERTS_DIRECTORY): + + # Iterate over all certificates from root_certs folder + for item in os.listdir(CERTS_DIRECTORY): + + # Combine the directory path with the item name to create the full path + item_path = os.path.join(CERTS_DIRECTORY, item) + + # Check if item is a file + if os.path.isfile(item_path): + + #If True remove file + os.unlink(item_path) + + else: + + # If item is a folder remove it + shutil.rmtree(item_path) + + except PermissionError: + + # Permission related issues + print(f"Permission Denied: {item}") + + except OSError as err: + + # System related issues + print(f"Error removing {item}: {err}") + +def load_certificate_file(cert_filename): + """Utility method to load a certificate file in binary read mode.""" + + # Construct the full file path + cert_path = os.path.join(CERTS_PATH, cert_filename) + + # Open the certificate file in binary read mode + with open(cert_path, "rb") as cert_file: + + # Return the certificate file + return cert_file.read() + +def upload_cert(filename): + """Utility method to upload a certificate""" + + # Load the certificate using the utility method + cert_file = load_certificate_file(filename) + + # Send a POST request to the API endpoint to upload the certificate + response = requests.post( + f"{API}/system/config/certs", + files={"file": (filename, cert_file, "application/x-x509-ca-cert")}, + timeout=5) + + # Return the response + return response + +@pytest.fixture() +def reset_certs(): + """Delete the certificates before and after each test""" + + # Delete before the test + delete_all_certs() + + yield + + # Delete after the test + delete_all_certs() + +@pytest.fixture() +def add_cert(): + """Fixture to upload certificates during tests.""" + + # Returning the reference to upload_certificate + return upload_cert + +def test_get_certificates_no_certificates(testrun, reset_certs): # pylint: disable=W0613 + """Test for get certificates when no certificates have been uploaded""" + + # Send the get request to "/system/config/certs" endpoint + r = requests.get(f"{API}/system/config/certs", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + + # Parse the response (certificates) + response = r.json() + + # Check if response is a list + assert isinstance(response, list) + + # Check if the list is empty + assert len(response) == 0 + +def test_get_certificates(testrun, reset_certs, add_cert): # pylint: disable=W0613 + """Test for get certificates when two certificates have been uploaded""" + + # Use add_cert fixture to upload the first certificate + add_cert("crt.pem") + + # Send the get request to "/system/config/certs" endpoint + r = requests.get(f"{API}/system/config/certs", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + + # Parse the response (certificates) + response = r.json() + + # Check if response is a list + assert isinstance(response, list) + + # Check if response contains one certificate + assert len(response) == 1 + + # Use add_cert fixture to upload the second certificate + add_cert("WR2.pem") + + # Send the get request to "/system/config/certs" endpoint + r = requests.get(f"{API}/system/config/certs", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + + # Parse the response (certificates) + response = r.json() + + # Check if response is a list + assert isinstance(response, list) + + # Check if response contains two certificates + assert len(response) == 2 + +def test_upload_certificate(testrun, reset_certs): # pylint: disable=W0613 + """Test for upload certificate successfully""" + + # Load the first certificate file content using the utility method + cert_file = load_certificate_file("crt.pem") + + # Send a POST request to the API endpoint to upload the certificate + r = requests.post( + f"{API}/system/config/certs", + files={"file": ("crt.pem", cert_file, "application/x-x509-ca-cert")}, + timeout=5 + ) + + # Check if status code is 201 (Created) + assert r.status_code == 201 + + # Parse the response + response = r.json() + + # Check if 'filename' field is in the response + assert "filename" in response + + # Check if the certificate filename is 'crt.pem' + assert response["filename"] == "crt.pem" + + # Load the second certificate file using the utility method + cert_file = load_certificate_file("WR2.pem") + + # Send a POST request to the API endpoint to upload the second certificate + r = requests.post( + f"{API}/system/config/certs", + files={"file": ("WR2.pem", cert_file, "application/x-x509-ca-cert")}, + timeout=5 + ) + + # Check if status code is 201 (Created) + assert r.status_code == 201 + + # Parse the response + response = r.json() + + # Check if 'filename' field is in the response + assert "filename" in response + + # Check if the certificate filename is 'WR2.pem' + assert response["filename"] == "WR2.pem" + + # Send get request to check that the certificates are listed + r = requests.get(f"{API}/system/config/certs", timeout=5) + + # Parse the response + response = r.json() + + # Check if "crt.pem" exists + assert any(cert["filename"] == "crt.pem" for cert in response) + + # Check if "WR2.pem" exists + assert any(cert["filename"] == "WR2.pem" for cert in response) + +def test_upload_invalid_certificate_format(testrun, reset_certs): # pylint: disable=W0613 + """Test for upload an invalid certificate format """ + + # Load the first certificate file content using the utility method + cert_file = load_certificate_file("invalid.pem") + + # Send a POST request to the API endpoint to upload the certificate + r = requests.post( + f"{API}/system/config/certs", + files={"file": ("invalid.pem", cert_file, "application/x-x509-ca-cert")}, + timeout=5 + ) + + # Check if status code is 400 (bad request) + assert r.status_code == 400 + + # Parse the response + response = r.json() + + # Check if "error" key is in response + assert "error" in response + +def test_upload_invalid_certificate_name(testrun, reset_certs): # pylint: disable=W0613 + """Test for upload a valid certificate with invalid filename""" + + # Assign the invalid certificate name to a variable + cert_name = "invalidname1234567891234.pem" + + # Load the first certificate file content using the utility method + cert_file = load_certificate_file(cert_name) + + # Send a POST request to the API endpoint to upload the certificate + r = requests.post( + f"{API}/system/config/certs", + files={"file": (cert_name, cert_file, "application/x-x509-ca-cert")}, + timeout=5 + ) + + # Check if status code is 400 (bad request) + assert r.status_code == 400 + + # Parse the response + response = r.json() + + # Check if "error" key is in response + assert "error" in response + +def test_upload_existing_certificate(testrun, reset_certs): # pylint: disable=W0613 + """Test for upload an existing certificate""" + + # Load the first certificate file content using the utility method + cert_file = load_certificate_file("crt.pem") + + # Send a POST request to the API endpoint to upload the certificate + r = requests.post( + f"{API}/system/config/certs", + files={"file": ("crt.pem", cert_file, "application/x-x509-ca-cert")}, + timeout=5 + ) + + # Check if status code is 201 (Created) + assert r.status_code == 201 + + # Parse the response + response = r.json() + + # Check if 'filename' field is in the response + assert "filename" in response + + # Check if the certificate name is 'crt.pem' + assert response["filename"] == "crt.pem" + + # Load the same certificate file content using the utility method + cert_file = load_certificate_file("crt.pem") + + # Send a POST request to the API endpoint to upload the second certificate + r = requests.post( + f"{API}/system/config/certs", + files={"file": ("crt.pem", cert_file, "application/x-x509-ca-cert")}, + timeout=5 + ) + + # Check if status code is 409 (conflict) + assert r.status_code == 409 + + # Parse the json response + response = r.json() + + # Check if "error" key is in response + assert "error" in response + +def test_delete_certificate_success(testrun, reset_certs, add_cert): # pylint: disable=W0613 + """Test for successfully deleting an existing certificate""" + + # Use the add_cert fixture to upload the first certificate + add_cert("crt.pem") + + # Retrieve the uploaded certificate's details + r = requests.get(f"{API}/system/config/certs", timeout=5) + + # Parse the json response + response = r.json() + + # Extract the name of the uploaded certificate + uploaded_cert = next( + (cert for cert in response if cert["filename"] == "crt.pem") + ) + + # Assign the certificate name + cert_name = uploaded_cert["name"] + + # Send delete certificate request + delete_payload = {"name": cert_name} + r = requests.delete(f"{API}/system/config/certs", + data=json.dumps(delete_payload), + timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + + # Send the get request to display all certificates + r = requests.get(f"{API}/system/config/certs", timeout=5) + + # Parse the json response + response = r.json() + + # Check that the certificate is no longer listed + assert not any(cert["filename"] == "crt.pem" for cert in response) + +def test_delete_certificate_bad_request(testrun, reset_certs, add_cert): # pylint: disable=W0613 + """Test for delete a certificate without providing the name""" + + # Use the add_cert fixture to upload the first certificate + add_cert("crt.pem") + + # Empty payload + delete_payload = {} + + # Send the delete request + r = requests.delete(f"{API}/system/config/certs", + data=json.dumps(delete_payload), + timeout=5) + + # Check if status code is 400 (Bad Request) + assert r.status_code == 400 + + # parse the json response + response = r.json() + + # Check if error in response + assert "error" in response + +def test_delete_certificate_not_found(testrun, reset_certs): # pylint: disable=W0613 + """Test for delete certificate when does not exist""" + + # Attempt to delete a certificate with a name that doesn't exist + delete_payload = {"name": "non_existing"} + + # Send the delete request + r = requests.delete(f"{API}/system/config/certs", + data=json.dumps(delete_payload), + timeout=5) + + # Check if status code is 404 (Not Found) + assert r.status_code == 404 + + # parse the json response + response = r.json() + + # Check if error in response + assert "error" in response + # Tests for profile endpoints + def delete_all_profiles(): """Utility method to delete all profiles from risk_profiles folder""" # Assign the profiles directory - profiles_path = Path(PROFILES_DIRECTORY) + profiles_path = os.path.join(PROFILES_DIRECTORY) try: + # Check if the profile_path (local/risk_profiles) exists and is a folder - if profiles_path.exists() and profiles_path.is_dir(): + if os.path.exists(profiles_path) and os.path.isdir(profiles_path): + # Iterate over all profiles from risk_profiles folder - for item in profiles_path.iterdir(): + for item in os.listdir(profiles_path): + + # Create the full path + item_path = os.path.join(profiles_path, item) + # Check if item is a file - if item.is_file(): + if os.path.isfile(item_path): + #If True remove file - item.unlink() + os.unlink(item_path) + else: + # If item is a folder remove it - shutil.rmtree(item) + shutil.rmtree(item_path) except PermissionError: + # Permission related issues print(f"Permission Denied: {item}") + except OSError as err: + # System related issues print(f"Error removing {item}: {err}") @@ -1458,23 +2292,28 @@ def reset_profiles(): @pytest.fixture() def add_profile(): - """Fixture to create profiles during tests.""" + """Fixture to create profiles during tests""" + # Returning the reference to create_profile return create_profile def profile_exists(profile_name): """Utility method to check if profile exists""" + # Send the get request r = requests.get(f"{API}/profiles", timeout=5) + # Check if status code is not 200 (OK) if r.status_code != 200: raise ValueError(f"Api request failed with code: {r.status_code}") + # Parse the JSON response to get the list of profiles profiles = r.json() + # Return if name is in the list of profiles return any(p["name"] == profile_name for p in profiles) -def test_get_profiles_format(testrun): # pylint: disable=W0613 +def test_get_profiles_format(testrun): # pylint: disable=W0613 """Test profiles format""" # Send the get request @@ -1494,10 +2333,10 @@ def test_get_profiles_format(testrun): # pylint: disable=W0613 assert "question" in item assert "type" in item -def test_get_profiles(testrun, reset_profiles, add_profile): # pylint: disable=W0613 +def test_get_profiles(testrun, reset_profiles, add_profile): # pylint: disable=W0613 """Test for get profiles (no profile, one profile, two profiles)""" - # Test for no profiles + # Test for no profile # Send the get request to "/profiles" endpoint r = requests.get(f"{API}/profiles", timeout=5) @@ -1539,11 +2378,15 @@ def test_get_profiles(testrun, reset_profiles, add_profile): # pylint: disable= for field in ["name", "status", "created", "version", "questions", "risk"]: assert field in profile + # Assign profile["questions"] + profile_questions = profile["questions"] + # Check if "questions" value is a list - assert isinstance(profile["questions"], list) + assert isinstance(profile_questions, list) # Check that "questions" value has the expected fields - for element in profile["questions"]: + for element in profile_questions: + # Check if each element is dict assert isinstance(element, dict) @@ -1574,7 +2417,7 @@ def test_get_profiles(testrun, reset_profiles, add_profile): # pylint: disable= assert len(response) == 2 def test_create_profile(testrun, reset_profiles): # pylint: disable=W0613 - """Test for create profile if not exists""" + """Test for create profile when profile does not exist""" # Load the profile new_profile = load_json("new_profile.json", directory=PROFILES_PATH) @@ -1616,7 +2459,7 @@ def test_create_profile(testrun, reset_profiles): # pylint: disable=W0613 assert created_profile is not None def test_update_profile(testrun, reset_profiles, add_profile): # pylint: disable=W0613 - """Test for update profile when exists""" + """Test for update profile when profile already exists""" # Load the new profile using add_profile fixture new_profile = add_profile("new_profile.json") @@ -1673,7 +2516,7 @@ def test_update_profile_invalid_json(testrun, reset_profiles, add_profile): # py # Load the new profile using add_profile fixture add_profile("new_profile.json") - # invalid JSON + # Invalid JSON updated_profile = {} # Send the post request to update the profile @@ -1694,7 +2537,7 @@ def test_update_profile_invalid_json(testrun, reset_profiles, add_profile): # py def test_create_profile_invalid_json(testrun, reset_profiles): # pylint: disable=W0613 """Test for create profile invalid JSON payload """ - # invalid JSON + # Invalid JSON new_profile = {} # Send the post request to update the profile @@ -1721,7 +2564,7 @@ def test_delete_profile(testrun, reset_profiles, add_profile): # pylint: disable # Assign the profile name profile_name = profile_to_delete["name"] - # Delete the profile + # Send the delete request r = requests.delete( f"{API}/profiles", data=json.dumps(profile_to_delete), @@ -1750,6 +2593,7 @@ def test_delete_profile(testrun, reset_profiles, add_profile): # pylint: disable (p for p in profiles if p["name"] == profile_name), None ) + # Check if profile was deleted assert deleted_profile is None @@ -1771,6 +2615,7 @@ def test_delete_profile_no_profile(testrun, reset_profiles): # pylint: disable=W def test_delete_profile_invalid_json(testrun, reset_profiles): # pylint: disable=W0613 """Test for delete profile wrong JSON payload""" + # Invalid payload profile_to_delete = {} # Delete the profile @@ -1788,7 +2633,9 @@ def test_delete_profile_invalid_json(testrun, reset_profiles): # pylint: disable # Check if "error" key in response assert "error" in response + # Invalid payload profile_to_delete_2 = {"status": "Draft"} + # Delete the profile r = requests.delete( f"{API}/profiles", @@ -1804,9 +2651,8 @@ def test_delete_profile_invalid_json(testrun, reset_profiles): # pylint: disable # Check if "error" key in response assert "error" in response -def test_delete_profile_internal_server_error(testrun, # pylint: disable=W0613 - reset_profiles, # pylint: disable=W0613 - add_profile ): +def test_delete_profile_server_error(testrun, reset_profiles, # pylint: disable=W0613 + add_profile): """Test for delete profile causing internal server error""" # Assign the profile from the fixture