Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions tofupilot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
ENDPOINT,
FILE_MAX_SIZE,
CLIENT_MAX_ATTACHMENTS,
SECONDS_BEFORE_TIMEOUT,
)
from .models import SubUnit, UnitUnderTest, Step, Phase, Log
from .utils import (
Expand All @@ -26,7 +25,6 @@
setup_logger,
timedelta_to_iso,
datetime_to_iso,
handle_response,
handle_http_error,
handle_network_error,
api_request,
Expand Down Expand Up @@ -480,20 +478,13 @@ def get_connection_credentials(self) -> dict:
a dict containing the emqx server url, the topic to connect to, and the JWT token required to connect
other fields as set in handle_http_error and handle_network_error
"""
try:
response = requests.get(
f"{self._url}/streaming",
headers=self._headers,
verify=self._verify,
timeout=SECONDS_BEFORE_TIMEOUT,
)
response.raise_for_status()
values = handle_response(self._logger, response)
return {"success": True, "values": values}
except requests.exceptions.HTTPError as http_err:
return handle_http_error(self._logger, http_err)
except requests.RequestException as e:
return handle_network_error(self._logger, e)
return api_request(
self._logger,
"GET",
f"{self._url}/streaming",
self._headers,
verify=self._verify,
)


def print_version_banner(current_version: str):
Expand Down
2 changes: 2 additions & 0 deletions tofupilot/openhtf/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def __call__(self, test_record: TestRecord) -> str:
data=attachment_data,
headers={"Content-Type": attachment.mimetype},
timeout=SECONDS_BEFORE_TIMEOUT,
verify=self._verify,
)
except Exception as e:
self._logger.error(f"Error uploading data: {str(e)}")
Expand All @@ -220,6 +221,7 @@ def __call__(self, test_record: TestRecord) -> str:
upload_id,
run_id,
logger=self._logger,
verify=self._verify,
)

# Use LoggerStateManager to temporarily activate the logger
Expand Down
111 changes: 65 additions & 46 deletions tofupilot/utils/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ..constants.requests import SECONDS_BEFORE_TIMEOUT
from .logger import LoggerStateManager
from .network import prepare_verify_setting, cleanup_temp_cert_bundle


def log_and_raise(logger: Logger, error_message: str):
Expand Down Expand Up @@ -60,45 +61,50 @@ def upload_file(
Returns:
str: The ID of the created upload
"""
# Upload initialization
initialize_url = f"{url}/uploads/initialize"
file_name = os.path.basename(file_path)
payload = {"name": file_name}

response = requests.post(
initialize_url,
data=json.dumps(payload),
headers=headers,
timeout=SECONDS_BEFORE_TIMEOUT,
verify=verify,
)

# Check for API key errors before raising for status
if response.status_code == 401:
error_data = response.json()
error_message = error_data.get("error", {}).get("message", "Authentication failed")
# Create a proper HTTPError with the response
http_error = requests.exceptions.HTTPError(response=response)
http_error.response = response
raise http_error
verify_setting = prepare_verify_setting(verify)

response.raise_for_status()
response_json = response.json()
upload_url = response_json.get("uploadUrl")
upload_id = response_json.get("id")

# File storing
with open(file_path, "rb") as file:
content_type, _ = mimetypes.guess_type(file_path) or "application/octet-stream"
requests.put(
upload_url,
data=file,
headers={"Content-Type": content_type},
try:
# Upload initialization
initialize_url = f"{url}/uploads/initialize"
file_name = os.path.basename(file_path)
payload = {"name": file_name}

response = requests.post(
initialize_url,
data=json.dumps(payload),
headers=headers,
timeout=SECONDS_BEFORE_TIMEOUT,
verify=verify,
verify=verify_setting,
)

return upload_id
# Check for API key errors before raising for status
if response.status_code == 401:
error_data = response.json()
error_message = error_data.get("error", {}).get("message", "Authentication failed")
# Create a proper HTTPError with the response
http_error = requests.exceptions.HTTPError(response=response)
http_error.response = response
raise http_error

response.raise_for_status()
response_json = response.json()
upload_url = response_json.get("uploadUrl")
upload_id = response_json.get("id")

# File storing
with open(file_path, "rb") as file:
content_type, _ = mimetypes.guess_type(file_path) or "application/octet-stream"
requests.put(
upload_url,
data=file,
headers={"Content-Type": content_type},
timeout=SECONDS_BEFORE_TIMEOUT,
verify=verify_setting,
)

return upload_id
finally:
cleanup_temp_cert_bundle(verify_setting, verify)


def notify_server(
Expand All @@ -107,7 +113,7 @@ def notify_server(
upload_id: str,
run_id: str,
logger = None,
verify = None, # str | None
verify: Optional[str] = None,
) -> bool:
"""Tells TP server to sync upload with newly created run

Expand All @@ -122,16 +128,18 @@ def notify_server(
Returns:
bool: True if successful
"""
sync_url = f"{url}/uploads/sync"
sync_payload = {"upload_id": upload_id, "run_id": run_id}

verify_setting = prepare_verify_setting(verify)

try:
sync_url = f"{url}/uploads/sync"
sync_payload = {"upload_id": upload_id, "run_id": run_id}

response = requests.post(
sync_url,
data=json.dumps(sync_payload),
headers=headers,
timeout=SECONDS_BEFORE_TIMEOUT,
verify=verify,
verify=verify_setting,
)
response.raise_for_status()

Expand All @@ -142,6 +150,8 @@ def notify_server(
with LoggerStateManager(logger):
logger.error(f"Failed to sync attachment: {str(e)}")
return False
finally:
cleanup_temp_cert_bundle(verify_setting, verify)


def upload_attachment_data(
Expand All @@ -152,13 +162,15 @@ def upload_attachment_data(
data,
mimetype: str,
run_id: str,
verify, #: str | None,
verify: Optional[str],
) -> bool:
"""
Uploads binary data as an attachment and links it to a run

Uses LoggerStateManager to ensure proper logging, similar to OpenHTF implementation.
"""
verify_setting = prepare_verify_setting(verify)

try:
initialize_url = f"{url}/uploads/initialize"
payload = {"name": name}
Expand All @@ -168,7 +180,7 @@ def upload_attachment_data(
data=json.dumps(payload),
headers=headers,
timeout=SECONDS_BEFORE_TIMEOUT,
verify=verify,
verify=verify_setting,
)
response.raise_for_status()

Expand All @@ -184,12 +196,12 @@ def upload_attachment_data(
data=data,
headers={"Content-Type": content_type},
timeout=SECONDS_BEFORE_TIMEOUT,
verify=verify,
verify=verify_setting,
)
upload_response.raise_for_status()

# Link attachment to run
notify_server(headers, url, upload_id, run_id, logger)
# Link attachment to run (uses its own verify handling)
notify_server(headers, url, upload_id, run_id, verify=verify, logger=logger)

# Log success with LoggerStateManager for visibility
with LoggerStateManager(logger):
Expand All @@ -199,7 +211,14 @@ def upload_attachment_data(
# Log error with LoggerStateManager for visibility
with LoggerStateManager(logger):
logger.error(f"Upload failed: {name} - {str(e)}")

# Provide specific guidance for SSL errors with storage service
if "storage." in str(e) and "certificate is not valid for" in str(e):
logger.warning("Certificate must include storage subdomain")
logger.warning("Generate wildcard certificate or add storage hostname to SAN")
return False
finally:
cleanup_temp_cert_bundle(verify_setting, verify)

def upload_attachments(
logger: Logger,
Expand Down Expand Up @@ -265,7 +284,7 @@ def process_openhtf_attachments(
max_attachments: int,
max_file_size: int,
needs_base64_decode: bool = True,
verify = None, #: str | None = None,
verify: Optional[str] = None,
) -> None:
"""
Process attachments from an OpenHTF test record and upload them.
Expand Down
65 changes: 58 additions & 7 deletions tofupilot/utils/network.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,51 @@
from typing import Dict, List, Optional, Any
import tempfile
import os
import hashlib

import requests
import certifi
from ..constants.requests import SECONDS_BEFORE_TIMEOUT

# Cache for certificate bundles to avoid recreating them
_cert_bundle_cache = {}

def prepare_verify_setting(verify: Optional[str]) -> str:
"""Prepare verify setting for self-signed certificates by creating cached certificate bundle."""
if verify and isinstance(verify, str) and verify.endswith('.crt'):
try:
cert_mtime = os.path.getmtime(verify)
cache_key = f"{verify}:{cert_mtime}"

if cache_key in _cert_bundle_cache:
cached_path = _cert_bundle_cache[cache_key]
if os.path.exists(cached_path):
return cached_path
else:
del _cert_bundle_cache[cache_key]

with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as temp_bundle:
with open(certifi.where(), 'r') as ca_bundle:
temp_bundle.write(ca_bundle.read())

with open(verify, 'r') as custom_cert:
temp_bundle.write('\n')
temp_bundle.write(custom_cert.read())

bundle_path = temp_bundle.name
_cert_bundle_cache[cache_key] = bundle_path
return bundle_path

except (OSError, IOError):
return verify

return verify


def cleanup_temp_cert_bundle(verify_setting: str, original_verify: Optional[str]):
"""Clean up temporary certificate bundle if it was created."""
pass


def parse_error_message(response: requests.Response) -> str:
"""Extract error message from response"""
Expand All @@ -20,24 +63,30 @@ def api_request(
data: Optional[Dict] = None,
params: Optional[Dict] = None,
timeout: int = SECONDS_BEFORE_TIMEOUT,
verify = None, #: str | None = None,
verify: Optional[str] = None,
) -> Dict:
"""Unified API request handler with consistent error handling"""
verify_setting = prepare_verify_setting(verify)

try:
response = requests.request(
method, url,
json=data,
headers=headers,
params=params,
timeout=timeout,
verify=verify,
verify=verify_setting,
)
response.raise_for_status()
return handle_response(logger, response)
result = handle_response(logger, response)

return result
except requests.exceptions.HTTPError as http_err:
return handle_http_error(logger, http_err)
except requests.RequestException as e:
return handle_network_error(logger, e)
finally:
cleanup_temp_cert_bundle(verify_setting, verify)


def handle_response(
Expand Down Expand Up @@ -152,11 +201,13 @@ def handle_network_error(logger, e: requests.RequestException) -> Dict[str, Any]
error_message = f"Network error: {str(e)}"
logger.error(error_message)

# Provide SSL-specific guidance
if isinstance(e, requests.exceptions.SSLError) or "SSL" in str(e) or "certificate verify failed" in str(e):
logger.warning("SSL certificate verification error detected")
logger.warning("This is typically caused by missing or invalid SSL certificates")
logger.warning("Try: 1) pip install certifi 2) /Applications/Python*/Install Certificates.command")
if "storage." in str(e) and "certificate is not valid for" in str(e):
logger.warning("Certificate must include storage subdomain")
logger.warning("Generate wildcard certificate or add storage hostname to SAN")
else:
logger.warning("SSL certificate verification error detected")
logger.warning("Try: 1) pip install certifi 2) /Applications/Python*/Install Certificates.command")
finally:
# Restore logger state if needed
if was_resumed and hasattr(logger, 'pause'):
Expand Down
Loading