Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
403 changes: 403 additions & 0 deletions docs/per-unit-pipeline.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,9 @@ async def cancel_sync(self):
async def get_sync_progress(self):
return self._sync_service.get_sync_progress()

async def get_perf_report(self):
return self._sync_service.get_perf_report()

async def sync_heartbeat(self):
return self._sync_service.sync_heartbeat()

Expand All @@ -495,6 +498,9 @@ async def sync_cancel_preview(self):
async def report_sync_results(self, rom_id_to_app_id, removed_rom_ids, cancelled=False):
return await self._sync_service.report_sync_results(rom_id_to_app_id, removed_rom_ids, cancelled)

async def report_unit_results(self, rom_id_to_app_id):
return self._sync_service.report_unit_results(rom_id_to_app_id)

async def get_registry_platforms(self):
return self._sync_service.get_registry_platforms()

Expand Down
29 changes: 28 additions & 1 deletion py_modules/adapters/romm/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ def __init__(self, settings: dict, plugin_dir: str, logger: logging.Logger) -> N
self._settings = settings
self._plugin_dir = plugin_dir
self._logger = logger
self._perf = None # Optional PerfCollector, set via set_perf_collector()

def set_perf_collector(self, perf) -> None:
"""Attach a PerfCollector instance to record HTTP metrics."""
self._perf = perf

# ------------------------------------------------------------------
# Platform map
Expand Down Expand Up @@ -182,6 +187,8 @@ def with_retry(self, fn, *args, max_attempts: int = 3, base_delay: int = 1, **kw
if attempt < max_attempts - 1 and self.is_retryable(exc):
delay = base_delay * (3**attempt)
self._logger.info(f"Retry {attempt + 1}/{max_attempts} after {delay}s: {exc}")
if self._perf is not None:
self._perf.increment("http_retries")
time.sleep(delay)
else:
raise
Expand All @@ -191,20 +198,33 @@ def with_retry(self, fn, *args, max_attempts: int = 3, base_delay: int = 1, **kw
# HTTP request methods
# ------------------------------------------------------------------

def _record_http(self, method: str, path: str, elapsed: float, status: int, nbytes: int) -> None:
"""Record an HTTP request in the attached PerfCollector, if any."""
if self._perf is not None:
self._perf.record_http_request(method, path, elapsed, status, nbytes)

def request(self, path: str):
"""GET a JSON resource from the RomM API."""
url = self._settings["romm_url"].rstrip("/") + path

def _do_request():
req = urllib.request.Request(url, method="GET")
req.add_header("Authorization", self.auth_header())
t0 = time.monotonic()
status = 0
nbytes = 0
try:
with urllib.request.urlopen(req, context=self.ssl_context(), timeout=30) as resp:
return json.loads(resp.read().decode())
data = resp.read()
status = resp.status
nbytes = len(data)
return json.loads(data.decode())
except RommApiError:
raise
except Exception as exc:
raise self.translate_http_error(exc, url, "GET") from exc
finally:
self._record_http("GET", path, time.monotonic() - t0, status, nbytes)

return self.with_retry(_do_request)

Expand Down Expand Up @@ -253,6 +273,9 @@ def _do_download():
req = urllib.request.Request(url, method="GET")
req.add_header("Authorization", self.auth_header())
ctx = self.ssl_context()
t0 = time.monotonic()
status = 0
nbytes = 0
try:
with urllib.request.urlopen(req, context=ctx, timeout=self._CONNECT_TIMEOUT) as resp:
raw_sock = getattr(getattr(getattr(resp, "fp", None), "raw", None), "_sock", None)
Expand All @@ -261,11 +284,15 @@ def _do_download():
total, downloaded = self._stream_to_file(
resp, dest_path, progress_callback, block_size=self._DOWNLOAD_BLOCK_SIZE, url=url
)
status = resp.status
nbytes = downloaded
self._validate_download(total, downloaded)
except RommApiError:
raise
except Exception as exc:
raise self.translate_http_error(exc, url, "GET") from exc
finally:
self._record_http("GET", path, time.monotonic() - t0, status, nbytes)

return self.with_retry(_do_download)

Expand Down
4 changes: 4 additions & 0 deletions py_modules/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ def wire_services(cfg: WiringConfig) -> dict:
# Resolve the circular dependency: point the box at the real sync_state getter.
_sync_state_box[0] = lambda: sync_service.sync_state

# Wire PerfCollector from sync_service into the HTTP adapter so all
# HTTP requests are automatically recorded for performance analysis.
cfg.http_adapter.set_perf_collector(sync_service.perf)

download_service = DownloadService(
romm_api=cfg.romm_api,
resolve_system=cfg.http_adapter.resolve_system,
Expand Down
Loading