From f86bf0ddcae0ac71939de359789e954fa7e46262 Mon Sep 17 00:00:00 2001 From: Christopher Blaisdell Date: Sun, 5 Apr 2026 16:38:50 -0400 Subject: [PATCH 1/3] fix(index): remove registerGameDetailPatch to prevent crashes in Steam's Library page --- src/index.tsx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/index.tsx b/src/index.tsx index dcfff43..0e0e659 100755 --- a/src/index.tsx +++ b/src/index.tsx @@ -14,7 +14,7 @@ import { DownloadQueue } from "./components/DownloadQueue"; import { initSyncManager } from "./utils/syncManager"; import { setSyncProgress } from "./utils/syncProgress"; import { updateDownload, getDownloadState } from "./utils/downloadStore"; -import { registerGameDetailPatch, unregisterGameDetailPatch, registerRomMAppId } from "./patches/gameDetailPatch"; +import { unregisterGameDetailPatch, registerRomMAppId } from "./patches/gameDetailPatch"; import { registerMetadataPatches, unregisterMetadataPatches, applyAllPlaytime } from "./patches/metadataPatches"; import { registerLaunchInterceptor, unregisterLaunchInterceptor } from "./utils/launchInterceptor"; import { getAllMetadataCache, getAppIdRomIdMap, ensureDeviceRegistered, getSaveSyncSettings, getAllPlaytime, getMigrationStatus, getSaveSortMigrationStatus, logError, logInfo } from "./api/backend"; @@ -48,7 +48,9 @@ const QAMPanel: FC = () => { }; export default definePlugin(() => { - registerGameDetailPatch(); + // registerGameDetailPatch() intentionally removed — it calls + // routerHook.addPatch() which triggers Decky route re-renders that crash + // Steam's Library page (GetAppCountWithToolsFilter TypeError). registerLaunchInterceptor(); // Load metadata cache, register store patches, and populate RomM app ID set. From 6f2cb355e9803615cdb02c874f7bef4584a03474 Mon Sep 17 00:00:00 2001 From: Christopher Blaisdell Date: Sun, 5 Apr 2026 22:51:57 -0400 Subject: [PATCH 2/3] feat: add performance instrumentation for sync pipeline Add PerfCollector and ETAEstimator to measure sync performance across all phases: per-phase timing, HTTP request counting and byte tracking, per-platform ROM fetch timing, artwork download progress logging at 10 percent intervals, shortcut and stale ROM gauges, get_perf_report RPC endpoint, and auto-save perf_report.json after each sync. Tested on Steam Deck with 373 ROMs. Includes 35 unit tests. --- main.py | 3 + py_modules/adapters/romm/http.py | 29 ++- py_modules/bootstrap.py | 4 + py_modules/lib/perf.py | 291 ++++++++++++++++++++++ py_modules/services/artwork.py | 17 ++ py_modules/services/library.py | 88 +++++-- pytest.ini | 2 + tests/lib/test_perf.py | 398 +++++++++++++++++++++++++++++++ 8 files changed, 808 insertions(+), 24 deletions(-) create mode 100644 py_modules/lib/perf.py create mode 100644 tests/lib/test_perf.py diff --git a/main.py b/main.py index 9c40a6b..6ed3d83 100755 --- a/main.py +++ b/main.py @@ -480,6 +480,9 @@ async def cancel_sync(self): async def get_sync_progress(self): return self._sync_service.get_sync_progress() + async def get_perf_report(self): + return self._sync_service.get_perf_report() + async def sync_heartbeat(self): return self._sync_service.sync_heartbeat() diff --git a/py_modules/adapters/romm/http.py b/py_modules/adapters/romm/http.py index c1a0c77..f888713 100644 --- a/py_modules/adapters/romm/http.py +++ b/py_modules/adapters/romm/http.py @@ -53,6 +53,11 @@ def __init__(self, settings: dict, plugin_dir: str, logger: logging.Logger) -> N self._settings = settings self._plugin_dir = plugin_dir self._logger = logger + self._perf = None # Optional PerfCollector, set via set_perf_collector() + + def set_perf_collector(self, perf) -> None: + """Attach a PerfCollector instance to record HTTP metrics.""" + self._perf = perf # ------------------------------------------------------------------ # Platform map @@ -182,6 +187,8 @@ def with_retry(self, fn, *args, max_attempts: int = 3, base_delay: int = 1, **kw if attempt < max_attempts - 1 and self.is_retryable(exc): delay = base_delay * (3**attempt) self._logger.info(f"Retry {attempt + 1}/{max_attempts} after {delay}s: {exc}") + if self._perf is not None: + self._perf.increment("http_retries") time.sleep(delay) else: raise @@ -191,6 +198,11 @@ def with_retry(self, fn, *args, max_attempts: int = 3, base_delay: int = 1, **kw # HTTP request methods # ------------------------------------------------------------------ + def _record_http(self, method: str, path: str, elapsed: float, status: int, nbytes: int) -> None: + """Record an HTTP request in the attached PerfCollector, if any.""" + if self._perf is not None: + self._perf.record_http_request(method, path, elapsed, status, nbytes) + def request(self, path: str): """GET a JSON resource from the RomM API.""" url = self._settings["romm_url"].rstrip("/") + path @@ -198,13 +210,21 @@ def request(self, path: str): def _do_request(): req = urllib.request.Request(url, method="GET") req.add_header("Authorization", self.auth_header()) + t0 = time.monotonic() + status = 0 + nbytes = 0 try: with urllib.request.urlopen(req, context=self.ssl_context(), timeout=30) as resp: - return json.loads(resp.read().decode()) + data = resp.read() + status = resp.status + nbytes = len(data) + return json.loads(data.decode()) except RommApiError: raise except Exception as exc: raise self.translate_http_error(exc, url, "GET") from exc + finally: + self._record_http("GET", path, time.monotonic() - t0, status, nbytes) return self.with_retry(_do_request) @@ -253,6 +273,9 @@ def _do_download(): req = urllib.request.Request(url, method="GET") req.add_header("Authorization", self.auth_header()) ctx = self.ssl_context() + t0 = time.monotonic() + status = 0 + nbytes = 0 try: with urllib.request.urlopen(req, context=ctx, timeout=self._CONNECT_TIMEOUT) as resp: raw_sock = getattr(getattr(getattr(resp, "fp", None), "raw", None), "_sock", None) @@ -261,11 +284,15 @@ def _do_download(): total, downloaded = self._stream_to_file( resp, dest_path, progress_callback, block_size=self._DOWNLOAD_BLOCK_SIZE, url=url ) + status = resp.status + nbytes = downloaded self._validate_download(total, downloaded) except RommApiError: raise except Exception as exc: raise self.translate_http_error(exc, url, "GET") from exc + finally: + self._record_http("GET", path, time.monotonic() - t0, status, nbytes) return self.with_retry(_do_download) diff --git a/py_modules/bootstrap.py b/py_modules/bootstrap.py index 9be861b..deff91d 100644 --- a/py_modules/bootstrap.py +++ b/py_modules/bootstrap.py @@ -247,6 +247,10 @@ def wire_services(cfg: WiringConfig) -> dict: # Resolve the circular dependency: point the box at the real sync_state getter. _sync_state_box[0] = lambda: sync_service.sync_state + # Wire PerfCollector from sync_service into the HTTP adapter so all + # HTTP requests are automatically recorded for performance analysis. + cfg.http_adapter.set_perf_collector(sync_service.perf) + download_service = DownloadService( romm_api=cfg.romm_api, resolve_system=cfg.http_adapter.resolve_system, diff --git a/py_modules/lib/perf.py b/py_modules/lib/perf.py new file mode 100644 index 0000000..8e7f1f9 --- /dev/null +++ b/py_modules/lib/perf.py @@ -0,0 +1,291 @@ +"""Performance instrumentation for sync lifecycle measurement. + +Provides ``PerfCollector`` for phase timing, HTTP request tracking, +counters, and gauges — and ``ETAEstimator`` for throughput-based +remaining-time estimation. + +Both classes are pure Python with no external dependencies. + +Production usage +~~~~~~~~~~~~~~~~ +Every sync automatically records perf data: + - Formatted report logged to Decky logs + - JSON written to ``/perf_report.json`` + - Available via ``get_perf_report()`` RPC + +Ad-hoc baseline: ``python3 scripts/deck_perf_test.py`` +See that script's docstring for the full test methodology and +representative platform selection rationale. +""" + +from __future__ import annotations + +import time +from contextlib import contextmanager +from dataclasses import dataclass + + +@dataclass +class _HttpRecord: + """Single HTTP request measurement.""" + + method: str + path: str + elapsed: float + status: int + nbytes: int + + +class PerfCollector: + """Collects performance metrics for a single sync cycle. + + Usage:: + + perf = PerfCollector() + perf.start_sync() + + with perf.time_phase("fetch_platforms"): + ... # work + + perf.record_http_request("GET", "/api/platforms", 0.42, 200, 1234) + perf.increment("platforms_fetched") + perf.set_gauge("fetch_concurrency", 4) + + perf.end_sync() + print(perf.format_report()) + """ + + def __init__(self) -> None: + self._sync_start: float = 0.0 + self._sync_end: float = 0.0 + self._phases: dict[str, float] = {} + self._phase_start: dict[str, float] = {} + self._http_requests: list[_HttpRecord] = [] + self._counters: dict[str, int] = {} + self._gauges: dict[str, float] = {} + + # ------------------------------------------------------------------ + # Sync lifecycle + # ------------------------------------------------------------------ + + def start_sync(self) -> None: + """Begin a new sync cycle. Clears all prior data.""" + self._sync_start = time.monotonic() + self._sync_end = 0.0 + self._phases.clear() + self._phase_start.clear() + self._http_requests.clear() + self._counters.clear() + self._gauges.clear() + + def end_sync(self) -> None: + """Mark sync cycle as finished.""" + self._sync_end = time.monotonic() + + @property + def wall_time(self) -> float: + """Total wall-clock seconds for the sync (0.0 if not finished).""" + if self._sync_end > 0 and self._sync_start > 0: + return self._sync_end - self._sync_start + return 0.0 + + # ------------------------------------------------------------------ + # Phase timing + # ------------------------------------------------------------------ + + @contextmanager + def time_phase(self, name: str): + """Context manager that records the duration of a named phase. + + Phases are cumulative — entering the same phase twice adds to + the previous total (useful for re-entrant phases). + """ + t0 = time.monotonic() + try: + yield + finally: + elapsed = time.monotonic() - t0 + self._phases[name] = self._phases.get(name, 0.0) + elapsed + + # ------------------------------------------------------------------ + # HTTP request tracking + # ------------------------------------------------------------------ + + def record_http_request( + self, method: str, path: str, elapsed: float, status: int, nbytes: int + ) -> None: + """Record a single HTTP request's measurements.""" + self._http_requests.append( + _HttpRecord(method=method, path=path, elapsed=elapsed, status=status, nbytes=nbytes) + ) + + # ------------------------------------------------------------------ + # Counters & gauges + # ------------------------------------------------------------------ + + def increment(self, name: str, amount: int = 1) -> None: + """Increment a named counter.""" + self._counters[name] = self._counters.get(name, 0) + amount + + def get_counter(self, name: str) -> int: + """Return current value of a counter (0 if not set).""" + return self._counters.get(name, 0) + + def set_gauge(self, name: str, value: float) -> None: + """Set a named gauge to a point-in-time value.""" + self._gauges[name] = value + + def get_gauge(self, name: str) -> float: + """Return current value of a gauge (0.0 if not set).""" + return self._gauges.get(name, 0.0) + + # ------------------------------------------------------------------ + # Reporting + # ------------------------------------------------------------------ + + def generate_report(self) -> dict: + """Return structured performance data as a dict.""" + total_http = len(self._http_requests) + total_bytes = sum(r.nbytes for r in self._http_requests) + total_http_time = sum(r.elapsed for r in self._http_requests) + + # Per-method breakdown + methods: dict[str, dict] = {} + for r in self._http_requests: + m = methods.setdefault(r.method, {"count": 0, "total_time": 0.0, "total_bytes": 0}) + m["count"] += 1 + m["total_time"] += r.elapsed + m["total_bytes"] += r.nbytes + + # Error count (non-2xx) + errors = sum(1 for r in self._http_requests if r.status < 200 or r.status >= 300) + + return { + "wall_time": round(self.wall_time, 3), + "phases": {k: round(v, 3) for k, v in self._phases.items()}, + "http": { + "total_requests": total_http, + "total_bytes": total_bytes, + "total_time": round(total_http_time, 3), + "errors": errors, + "by_method": methods, + }, + "counters": dict(self._counters), + "gauges": {k: round(v, 3) for k, v in self._gauges.items()}, + } + + def format_report(self) -> str: + """Return a human-readable performance summary.""" + data = self.generate_report() + lines: list[str] = [f"Sync completed in {data['wall_time']:.1f}s"] + self._format_phases(data, lines) + self._format_http(data["http"], lines) + self._format_map(data["counters"], "Counters", lines) + self._format_map(data["gauges"], "Gauges", lines) + return "\n".join(lines) + + @staticmethod + def _format_phases(data: dict, lines: list[str]) -> None: + if not data["phases"]: + return + lines.append(" Phases:") + wall = data["wall_time"] + for name, secs in data["phases"].items(): + pct = (secs / wall * 100) if wall > 0 else 0 + lines.append(f" {name}: {secs:.1f}s ({pct:.0f}%)") + + @staticmethod + def _format_http(h: dict, lines: list[str]) -> None: + if h["total_requests"] == 0: + return + mb = h["total_bytes"] / (1024 * 1024) + lines.append( + f" HTTP: {h['total_requests']} requests, " + f"{mb:.1f} MB, {h['total_time']:.1f}s cumulative" + ) + if h["errors"] > 0: + lines.append(f" HTTP errors: {h['errors']}") + for method, stats in h["by_method"].items(): + lines.append(f" {method}: {stats['count']} reqs, {stats['total_time']:.1f}s") + + @staticmethod + def _format_map(mapping: dict, label: str, lines: list[str]) -> None: + if not mapping: + return + lines.append(f" {label}:") + for name, val in mapping.items(): + lines.append(f" {name}: {val}") + + +class ETAEstimator: + """Throughput-based ETA estimator using exponential moving average. + + Parameters + ---------- + alpha: + Smoothing factor (0–1). Higher = more weight on recent samples. + Default 0.3 balances responsiveness with stability. + min_samples: + Minimum number of ``update()`` calls before ``eta_seconds()`` + returns a value (avoids wild early estimates). + """ + + def __init__(self, alpha: float = 0.3, min_samples: int = 3) -> None: + self._alpha = alpha + self._min_samples = min_samples + self._start: float = 0.0 + self._samples: int = 0 + self._ema_rate: float = 0.0 # items per second (smoothed) + self._last_update: float = 0.0 + self._last_current: int = 0 + + def start(self) -> None: + """Reset and begin a new estimation cycle.""" + self._start = time.monotonic() + self._samples = 0 + self._ema_rate = 0.0 + self._last_update = self._start + self._last_current = 0 + + def update(self, current: int) -> None: + """Record progress — *current* is the cumulative count of items processed.""" + now = time.monotonic() + dt = now - self._last_update + if dt <= 0 or current <= self._last_current: + return # skip duplicate or backward updates + + dx = current - self._last_current + rate = dx / dt + + if self._samples == 0: + self._ema_rate = rate + else: + self._ema_rate = self._alpha * rate + (1 - self._alpha) * self._ema_rate + + self._last_update = now + self._last_current = current + self._samples += 1 + + def eta_seconds(self, current: int, total: int) -> float | None: + """Estimated seconds remaining, or ``None`` if too few samples.""" + if self._samples < self._min_samples or self._ema_rate <= 0 or current >= total: + return None + remaining = total - current + return remaining / self._ema_rate + + @property + def elapsed(self) -> float: + """Seconds since ``start()`` was called.""" + if self._start <= 0: + return 0.0 + return time.monotonic() - self._start + + @property + def items_per_sec(self) -> float: + """Current smoothed throughput (items/sec). 0.0 if no samples yet.""" + return self._ema_rate if self._samples > 0 else 0.0 + + @property + def samples(self) -> int: + """Number of update() calls recorded.""" + return self._samples diff --git a/py_modules/services/artwork.py b/py_modules/services/artwork.py index 25dfe8a..88753fb 100644 --- a/py_modules/services/artwork.py +++ b/py_modules/services/artwork.py @@ -81,9 +81,17 @@ async def download_artwork( return cover_paths total = len(all_roms) + artwork_downloaded = 0 + artwork_skipped = 0 + artwork_failed = 0 + artwork_no_url = 0 + log_interval = max(1, total // 10) # Log every ~10% + + self._logger.info(f"[Artwork] Starting: {total} ROMs to process") for i, rom in enumerate(all_roms): if is_cancelling(): + self._logger.info(f"[Artwork] Cancelled at {i}/{total} (downloaded={artwork_downloaded}, skipped={artwork_skipped})") return cover_paths await emit_progress( @@ -97,21 +105,30 @@ async def download_artwork( cover_url = rom.get("path_cover_large") or rom.get("path_cover_small") if not cover_url: + artwork_no_url += 1 continue rom_id = rom["id"] existing = self.existing_cover_path(rom_id, grid) if existing: cover_paths[rom_id] = existing + artwork_skipped += 1 continue staging = os.path.join(grid, f"romm_{rom_id}_cover.png") try: await self._loop.run_in_executor(None, self._romm_api.download_cover, cover_url, staging) cover_paths[rom_id] = staging + artwork_downloaded += 1 except Exception as e: + artwork_failed += 1 self._logger.warning(f"Failed to download artwork for {rom['name']}: {e}") + # Periodic progress log + if (i + 1) % log_interval == 0: + self._logger.info(f"[Artwork] {i + 1}/{total} processed (downloaded={artwork_downloaded}, skipped={artwork_skipped}, failed={artwork_failed})") + + self._logger.info(f"[Artwork] Complete: {total} processed, {artwork_downloaded} downloaded, {artwork_skipped} skipped, {artwork_no_url} no URL, {artwork_failed} failed") return cover_paths # ── Artwork finalisation ─────────────────────────────────────────────── diff --git a/py_modules/services/library.py b/py_modules/services/library.py index 7a6222f..971bba3 100644 --- a/py_modules/services/library.py +++ b/py_modules/services/library.py @@ -18,6 +18,7 @@ from domain.shortcut_data import build_registry_entry, build_shortcuts_data from domain.sync_state import SyncState from lib.errors import RommUnsupportedError, classify_error +from lib.perf import ETAEstimator, PerfCollector if TYPE_CHECKING: import logging @@ -73,6 +74,10 @@ def __init__( self._metadata_service = metadata_service self._artwork = artwork + # Performance instrumentation + self._perf = PerfCollector() + self._eta = ETAEstimator() + # Sync-specific state (owned by this service) self._sync_state = SyncState.IDLE self._sync_last_heartbeat = 0.0 @@ -103,6 +108,21 @@ def shutdown(self) -> None: if self._sync_state == SyncState.RUNNING: self._sync_state = SyncState.CANCELLING + @property + def perf(self) -> PerfCollector: + """Expose PerfCollector for external wiring (e.g. http adapter).""" + return self._perf + + def get_perf_report(self) -> dict: + """Return the performance report from the most recent sync.""" + if self._perf.wall_time > 0: + return { + "success": True, + "report": self._perf.generate_report(), + "formatted": self._perf.format_report(), + } + return {"success": False, "message": "No performance data available"} + # ── Platform & ROM fetching ────────────────────────────── async def get_platforms(self): @@ -787,7 +807,8 @@ async def _fetch_and_prepare(self): # Phase 1: Fetch platforms await self._emit_progress("platforms", message="Fetching platforms...") - platforms = await self._fetch_enabled_platforms() + with self._perf.time_phase("fetch_platforms"): + platforms = await self._fetch_enabled_platforms() self._check_cancelling() # Phase 2: Fetch ROMs per platform (incremental if possible) @@ -797,40 +818,46 @@ async def _fetch_and_prepare(self): all_roms: list[dict] = [] total_platforms = len(platforms) - for pi, platform in enumerate(platforms, 1): - self._check_cancelling() - platform_name = platform.get("name", platform.get("display_name", "Unknown")) - platform_slug = platform.get("slug", "") - - skipped = await self._try_incremental_skip( - platform, registry, last_sync, platform_name, platform_slug, all_roms, pi, total_platforms - ) - if not skipped: - await self._full_fetch_platform_roms( - platform["id"], platform_name, platform_slug, all_roms, pi, total_platforms + with self._perf.time_phase("fetch_roms"): + for pi, platform in enumerate(platforms, 1): + self._check_cancelling() + platform_name = platform.get("name", platform.get("display_name", "Unknown")) + platform_slug = platform.get("slug", "") + + skipped = await self._try_incremental_skip( + platform, registry, last_sync, platform_name, platform_slug, all_roms, pi, total_platforms ) + if not skipped: + await self._full_fetch_platform_roms( + platform["id"], platform_name, platform_slug, all_roms, pi, total_platforms + ) self._check_cancelling() self._logger.info(f"Fetched {len(all_roms)} ROMs from {len(platforms)} platforms") + self._perf.set_gauge("total_roms", len(all_roms)) + self._perf.set_gauge("total_platforms", len(platforms)) # Record which rom_ids came from platforms platform_rom_ids: set[int] = {r["id"] for r in all_roms} # Phase 3: Fetch collection ROMs (adds ROMs not already in all_roms) - collection_only_roms, collection_memberships = await self._fetch_collection_roms(platform_rom_ids) + with self._perf.time_phase("fetch_collections"): + collection_only_roms, collection_memberships = await self._fetch_collection_roms(platform_rom_ids) all_roms.extend(collection_only_roms) # Phase 4: Prepare shortcut data - shortcuts_data = self._build_shortcuts_data(all_roms) + with self._perf.time_phase("prepare_shortcuts"): + shortcuts_data = self._build_shortcuts_data(all_roms) self._check_cancelling() # Cache metadata from sync response - if self._metadata_service is not None: - for rom in all_roms: - rom_id_str = str(rom["id"]) - self._metadata_cache[rom_id_str] = self._metadata_service.extract_metadata(rom) - self._metadata_service.mark_metadata_dirty() - self._metadata_service.flush_metadata_if_dirty() + with self._perf.time_phase("cache_metadata"): + if self._metadata_service is not None: + for rom in all_roms: + rom_id_str = str(rom["id"]) + self._metadata_cache[rom_id_str] = self._metadata_service.extract_metadata(rom) + self._metadata_service.mark_metadata_dirty() + self._metadata_service.flush_metadata_if_dirty() self._log_debug(f"Metadata cached for {len(all_roms)} ROMs") return all_roms, shortcuts_data, platforms, collection_memberships, platform_rom_ids @@ -838,6 +865,7 @@ async def _fetch_and_prepare(self): # ── Full sync ──────────────────────────────────────────── async def _do_sync(self): + self._perf.start_sync() try: try: fetch_result = await self._fetch_and_prepare() @@ -872,9 +900,10 @@ async def _do_sync(self): step=full_current_step, total_steps=full_total_steps, ) - cover_paths = await self._download_artwork( - all_roms, progress_step=full_current_step, progress_total_steps=full_total_steps - ) + with self._perf.time_phase("download_artwork"): + cover_paths = await self._download_artwork( + all_roms, progress_step=full_current_step, progress_total_steps=full_total_steps + ) else: cover_paths = {} @@ -922,6 +951,8 @@ async def _do_sync(self): ) self._logger.info(f"Sync data emitted: {len(shortcuts_data)} shortcuts, {len(stale_rom_ids)} stale") + self._perf.set_gauge("shortcuts_emitted", len(shortcuts_data)) + self._perf.set_gauge("stale_rom_ids", len(stale_rom_ids)) except Exception as e: import traceback @@ -936,6 +967,17 @@ async def _do_sync(self): } self._loop.create_task(self._emit("sync_progress", self._sync_progress)) finally: + self._perf.end_sync() + if self._perf.wall_time > 0: + self._logger.info(f"[PerfCollector] {self._perf.format_report()}") + try: + import json as _json + _perf_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "perf_report.json") + with open(_perf_path, "w") as _f: + _json.dump(self._perf.generate_report(), _f, indent=2) + self._logger.info(f"[PerfCollector] Report saved to {_perf_path}") + except Exception as _e: + self._logger.warning(f"[PerfCollector] Failed to save report: {_e}") if self._metadata_service is not None: self._metadata_service.flush_metadata_if_dirty() self._sync_state = SyncState.IDLE diff --git a/pytest.ini b/pytest.ini index deffe96..f4bb168 100755 --- a/pytest.ini +++ b/pytest.ini @@ -2,3 +2,5 @@ asyncio_mode = auto testpaths = tests addopts = --import-mode=importlib +markers = + benchmark: marks tests that require a live RomM server (deselect with '-m "not benchmark"') diff --git a/tests/lib/test_perf.py b/tests/lib/test_perf.py new file mode 100644 index 0000000..bb05d68 --- /dev/null +++ b/tests/lib/test_perf.py @@ -0,0 +1,398 @@ +"""Unit tests for PerfCollector and ETAEstimator.""" + +import time +from unittest.mock import patch + +import pytest + +from lib.perf import ETAEstimator, PerfCollector + + +# ═══════════════════════════════════════════════════════════════ +# PerfCollector +# ═══════════════════════════════════════════════════════════════ + + +class TestPerfCollectorLifecycle: + """start_sync / end_sync / wall_time basics.""" + + def test_wall_time_zero_before_sync(self): + perf = PerfCollector() + assert perf.wall_time == 0.0 + + def test_wall_time_zero_after_start_without_end(self): + perf = PerfCollector() + perf.start_sync() + assert perf.wall_time == 0.0 + + def test_wall_time_positive_after_end(self): + perf = PerfCollector() + with patch("lib.perf.time.monotonic", side_effect=[100.0, 105.0]): + perf.start_sync() + perf.end_sync() + assert perf.wall_time == pytest.approx(5.0) + + def test_start_sync_clears_prior_data(self): + perf = PerfCollector() + perf.start_sync() + perf.increment("x") + perf.set_gauge("y", 42) + perf.record_http_request("GET", "/a", 0.1, 200, 100) + with perf.time_phase("p"): + pass + perf.end_sync() + + # Second sync should clear everything + perf.start_sync() + report = perf.generate_report() + assert report["counters"] == {} + assert report["gauges"] == {} + assert report["http"]["total_requests"] == 0 + assert report["phases"] == {} + + +class TestPerfCollectorPhases: + """time_phase context manager.""" + + def test_phase_records_duration(self): + perf = PerfCollector() + perf.start_sync() + with perf.time_phase("test_phase"): + time.sleep(0.05) + perf.end_sync() + report = perf.generate_report() + assert "test_phase" in report["phases"] + assert report["phases"]["test_phase"] >= 0.03 + + def test_phase_is_cumulative(self): + perf = PerfCollector() + perf.start_sync() + with perf.time_phase("reentrant"): + time.sleep(0.01) + with perf.time_phase("reentrant"): + time.sleep(0.01) + perf.end_sync() + report = perf.generate_report() + # Both entries combined; use loose threshold for CI timing variance + assert report["phases"]["reentrant"] >= 0.015 + + def test_multiple_phases(self): + perf = PerfCollector() + perf.start_sync() + with perf.time_phase("alpha"): + pass + with perf.time_phase("beta"): + pass + perf.end_sync() + report = perf.generate_report() + assert "alpha" in report["phases"] + assert "beta" in report["phases"] + + def test_phase_records_even_on_exception(self): + perf = PerfCollector() + perf.start_sync() + with pytest.raises(ValueError): + with perf.time_phase("failing"): + raise ValueError("boom") + perf.end_sync() + assert "failing" in perf.generate_report()["phases"] + + +class TestPerfCollectorHttp: + """HTTP request tracking.""" + + def test_record_single_request(self): + perf = PerfCollector() + perf.start_sync() + perf.record_http_request("GET", "/api/platforms", 0.5, 200, 4096) + perf.end_sync() + report = perf.generate_report() + assert report["http"]["total_requests"] == 1 + assert report["http"]["total_bytes"] == 4096 + assert report["http"]["total_time"] == 0.5 + assert report["http"]["errors"] == 0 + + def test_record_multiple_methods(self): + perf = PerfCollector() + perf.start_sync() + perf.record_http_request("GET", "/a", 0.1, 200, 100) + perf.record_http_request("POST", "/b", 0.2, 201, 200) + perf.record_http_request("GET", "/c", 0.3, 200, 300) + perf.end_sync() + report = perf.generate_report() + assert report["http"]["total_requests"] == 3 + assert report["http"]["total_bytes"] == 600 + by_method = report["http"]["by_method"] + assert by_method["GET"]["count"] == 2 + assert by_method["POST"]["count"] == 1 + + def test_error_counting(self): + perf = PerfCollector() + perf.start_sync() + perf.record_http_request("GET", "/ok", 0.1, 200, 100) + perf.record_http_request("GET", "/fail", 0.1, 500, 0) + perf.record_http_request("GET", "/auth", 0.1, 401, 0) + perf.record_http_request("GET", "/ok2", 0.1, 204, 50) + perf.end_sync() + report = perf.generate_report() + # 500 and 401 are errors (non-2xx) + assert report["http"]["errors"] == 2 + + def test_zero_status_counted_as_error(self): + """Status 0 (connection failure) should count as error.""" + perf = PerfCollector() + perf.start_sync() + perf.record_http_request("GET", "/fail", 0.1, 0, 0) + perf.end_sync() + assert perf.generate_report()["http"]["errors"] == 1 + + +class TestPerfCollectorCounters: + """Named counters.""" + + def test_increment_default(self): + perf = PerfCollector() + perf.increment("roms_fetched") + assert perf.get_counter("roms_fetched") == 1 + + def test_increment_custom_amount(self): + perf = PerfCollector() + perf.increment("bytes", 1024) + perf.increment("bytes", 2048) + assert perf.get_counter("bytes") == 3072 + + def test_get_counter_default(self): + perf = PerfCollector() + assert perf.get_counter("nonexistent") == 0 + + +class TestPerfCollectorGauges: + """Named gauges.""" + + def test_set_and_get_gauge(self): + perf = PerfCollector() + perf.set_gauge("concurrency", 4.0) + assert perf.get_gauge("concurrency") == 4.0 + + def test_gauge_overwrites(self): + perf = PerfCollector() + perf.set_gauge("x", 1.0) + perf.set_gauge("x", 99.0) + assert perf.get_gauge("x") == 99.0 + + def test_get_gauge_default(self): + perf = PerfCollector() + assert perf.get_gauge("nonexistent") == 0.0 + + +class TestPerfCollectorReport: + """generate_report() and format_report().""" + + def test_generate_report_structure(self): + perf = PerfCollector() + perf.start_sync() + perf.end_sync() + report = perf.generate_report() + assert "wall_time" in report + assert "phases" in report + assert "http" in report + assert "counters" in report + assert "gauges" in report + assert "total_requests" in report["http"] + assert "total_bytes" in report["http"] + assert "errors" in report["http"] + assert "by_method" in report["http"] + + def test_format_report_readable(self): + perf = PerfCollector() + perf.start_sync() + with perf.time_phase("fetch"): + pass + perf.record_http_request("GET", "/api/roms", 0.5, 200, 10240) + perf.increment("platforms_fetched", 3) + perf.set_gauge("concurrency", 4) + perf.end_sync() + + text = perf.format_report() + assert "Sync completed in" in text + assert "fetch:" in text + assert "HTTP:" in text + assert "GET:" in text + assert "platforms_fetched: 3" in text + assert "concurrency: 4" in text + + def test_format_report_empty_sync(self): + perf = PerfCollector() + perf.start_sync() + perf.end_sync() + text = perf.format_report() + assert "Sync completed in" in text + # No phases, HTTP, counters, or gauges sections + assert "Phases:" not in text + assert "HTTP:" not in text + + def test_format_report_shows_http_errors(self): + perf = PerfCollector() + perf.start_sync() + perf.record_http_request("GET", "/fail", 0.1, 500, 0) + perf.end_sync() + text = perf.format_report() + assert "HTTP errors: 1" in text + + +# ═══════════════════════════════════════════════════════════════ +# ETAEstimator +# ═══════════════════════════════════════════════════════════════ + + +class TestETAEstimatorBasics: + """Core ETA behaviour.""" + + def test_eta_none_before_start(self): + eta = ETAEstimator() + assert eta.eta_seconds(0, 100) is None + + def test_eta_none_with_few_samples(self): + eta = ETAEstimator(min_samples=3) + eta.start() + eta.update(10) + assert eta.eta_seconds(10, 100) is None # only 1 sample + + def test_eta_returns_value_after_min_samples(self): + eta = ETAEstimator(alpha=0.5, min_samples=2) + eta.start() + # Simulate two updates with controlled timing + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + + mock_time.return_value = 101.0 + eta.update(10) # 10 items in 1s = 10/s + + mock_time.return_value = 102.0 + eta.update(20) # 10 items in 1s = 10/s + + result = eta.eta_seconds(20, 100) + assert result is not None + assert result > 0 + + def test_eta_none_when_complete(self): + eta = ETAEstimator(min_samples=1) + eta.start() + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + mock_time.return_value = 101.0 + eta.update(100) + assert eta.eta_seconds(100, 100) is None + + def test_elapsed_zero_before_start(self): + eta = ETAEstimator() + assert eta.elapsed == 0.0 + + def test_elapsed_positive_after_start(self): + eta = ETAEstimator() + eta.start() + assert eta.elapsed >= 0 + + def test_items_per_sec_zero_before_update(self): + eta = ETAEstimator() + eta.start() + assert eta.items_per_sec == 0.0 + + def test_samples_count(self): + eta = ETAEstimator() + eta.start() + assert eta.samples == 0 + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + mock_time.return_value = 101.0 + eta.update(10) + mock_time.return_value = 102.0 + eta.update(20) + assert eta.samples == 2 + + +class TestETAEstimatorEdgeCases: + """Edge cases and invariants.""" + + def test_backward_update_ignored(self): + eta = ETAEstimator(min_samples=1) + eta.start() + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + mock_time.return_value = 101.0 + eta.update(50) + mock_time.return_value = 102.0 + eta.update(30) # backward — should be ignored + assert eta.samples == 1 + + def test_duplicate_update_ignored(self): + eta = ETAEstimator(min_samples=1) + eta.start() + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + mock_time.return_value = 101.0 + eta.update(50) + mock_time.return_value = 102.0 + eta.update(50) # same value — should be ignored + assert eta.samples == 1 + + def test_start_resets_state(self): + eta = ETAEstimator(min_samples=1) + eta.start() + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + mock_time.return_value = 101.0 + eta.update(50) + assert eta.samples == 1 + + eta.start() + assert eta.samples == 0 + assert eta.items_per_sec == 0.0 + + def test_ema_smoothing(self): + """With alpha=1.0, the EMA should equal the latest rate exactly.""" + eta = ETAEstimator(alpha=1.0, min_samples=1) + eta.start() + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + + mock_time.return_value = 101.0 + eta.update(10) # 10 items/sec + + mock_time.return_value = 102.0 + eta.update(30) # 20 items in 1s = 20 items/sec + + # alpha=1.0 means newest rate dominates + assert eta.items_per_sec == pytest.approx(20.0) + + def test_eta_calculation_accuracy(self): + """With constant rate of 10 items/sec, ETA for 80 remaining should be ~8s.""" + eta = ETAEstimator(alpha=1.0, min_samples=1) + eta.start() + with patch("lib.perf.time.monotonic") as mock_time: + mock_time.return_value = 100.0 + eta._start = 100.0 + eta._last_update = 100.0 + + mock_time.return_value = 101.0 + eta.update(10) # 10 items/sec + + mock_time.return_value = 102.0 + eta.update(20) # 10 items/sec + + result = eta.eta_seconds(20, 100) + assert result == pytest.approx(8.0) From 67b47a6220653698efe74e807dbddd2947168f24 Mon Sep 17 00:00:00 2001 From: Christopher Blaisdell Date: Mon, 6 Apr 2026 00:00:35 -0400 Subject: [PATCH 3/3] feat: per-unit sync pipeline with incremental shortcut delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the monolithic sync_apply event with a per-unit pipeline that processes each platform/collection independently and registers shortcuts in Steam as each unit completes — not all at the end. Backend changes (library.py): - _build_work_queue(): Phase 0 builds sync plan from enabled platforms/ collections without fetching ROM lists (fast startup) - _sync_one_platform() / _sync_one_collection(): fetch ROMs, diff against state, emit sync_apply_unit event, wait for frontend callback - _wait_for_unit_results(): 60s heartbeat-based timeout with cancellation - Incremental skip: unchanged units emit shortcuts from cache (no re-fetch) Frontend changes (syncManager.ts): - processUnit(): handles sync_apply_unit events, creates/updates shortcuts with 50ms delay (safe for CEF), fetches artwork, reports back to backend - processStale(): handles stale shortcut cleanup - processLegacySyncApply(): preserves backward compat with old sync path UI changes (MainPage.tsx): - Skip Preview toggle now triggers per-unit pipeline (startSync) directly - Preview/Apply path unchanged when Skip Preview is off Tested on Steam Deck: 3 units (Dreamcast 362, N64 329, Metroid 11), 702 total shortcuts, incremental delivery confirmed, no CEF crash. --- docs/per-unit-pipeline.md | 403 ++++++++++++++++ main.py | 3 + py_modules/services/library.py | 455 +++++++++++++++--- src/api/backend.ts | 1 + src/components/MainPage.tsx | 39 +- src/index.tsx | 6 +- src/types/index.ts | 33 ++ src/utils/syncManager.ts | 561 +++++++++++++++-------- tests/services/test_per_unit_pipeline.py | 308 +++++++++++++ 9 files changed, 1523 insertions(+), 286 deletions(-) create mode 100644 docs/per-unit-pipeline.md create mode 100644 tests/services/test_per_unit_pipeline.py diff --git a/docs/per-unit-pipeline.md b/docs/per-unit-pipeline.md new file mode 100644 index 0000000..12130f6 --- /dev/null +++ b/docs/per-unit-pipeline.md @@ -0,0 +1,403 @@ +# Feature: Per-Unit Sync Pipeline + +> **Status:** Implemented — awaiting Deck testing +> **Created:** April 5, 2026 +> **Branch:** feat/per-unit-pipeline-v2 +> **Replaces:** The current all-at-once sync architecture + +--- + +## Problem Statement + +The current sync pipeline processes **all platforms and collections through each phase before advancing to the next phase**. This means: + +1. **All** ROMs are fetched before **any** artwork is downloaded. +2. **All** artwork is downloaded before **any** shortcuts are created. +3. **All** shortcuts are created before **any** become visible in Steam. +4. The user waits 40+ minutes seeing only a spinner, then everything appears simultaneously. +5. If sync is cancelled or crashes mid-way, **zero** value is preserved — no shortcuts, no artwork, nothing. + +### Observed Timing (3,241 ROMs across 3 platforms + 4 collections) + +| Phase | Time | % of total | +|---|---|---| +| Metadata fetch (all platforms + collections) | ~53s | ~3% | +| Artwork download (all ROMs, sequential) | ~6 min | ~15% | +| Shortcut application (all ROMs, 50ms delay each) | ~30+ min | ~80% | +| Registry update + collection creation | ~1 min | ~2% | +| **Total** | **~40 min** | | + +### Additional Waste + +- **Double artwork download:** Python downloads covers to `romm_{rom_id}_cover.png` staging files (step 6). Then after JS creates each shortcut and obtains the `appId`, JS calls `getArtworkBase64(romId)` which makes Python re-read the same file, base64-encode it, and send it back. JS then calls `SetCustomArtworkForApp()`. The cover is transferred twice. +- **Pure sleep overhead:** Each shortcut has a 50ms `await delay(50)` between creations = 3,241 × 50ms = **2.7 minutes of doing nothing**. +- **Giant single event:** `sync_apply` sends all 3,241 shortcuts in one event payload, which must be fully received and parsed before any processing begins. + +--- + +## Solution: Per-Unit Pipeline + +### Core Architecture Change + +Instead of running each phase across all data, run **all phases for one unit** before moving to the next unit. A "unit" is one platform or one collection. + +``` +BEFORE (all-at-once): + fetch ALL ROMs → download ALL artwork → create ALL shortcuts → done + +AFTER (per-unit): + for each platform: + fetch ROMs → download artwork → create shortcuts → VISIBLE + for each collection: + fetch ROMs → download artwork → create shortcuts → VISIBLE + cleanup stale → done +``` + +### Why This Works + +The user sees games appear platform-by-platform. Dreamcast (362 ROMs) completes in ~4 minutes and is fully playable with artwork while PlayStation (1,978 ROMs) is still syncing. A cancelled sync preserves all completed platforms. + +--- + +## Detailed Design + +### Phase 0: Build the Work Queue + +**What:** Fetch the list of enabled platforms and enabled collections. Do NOT fetch any ROMs yet. + +**Why fetch only the list:** +- It's fast (~1 API call for platforms, ~2 for collections). +- It gives us the complete work queue with ROM counts per unit, enabling accurate progress reporting ("Platform 2 of 5, ~362 ROMs"). +- It avoids the current approach of fetching all ROM data upfront, which delays the start of useful work. +- It decouples "what do we need to sync?" from "sync this specific thing" — the outer loop is just a dispatcher. + +**Output:** An ordered list of work units: + +```python +work_queue = [ + {"type": "platform", "id": 4, "name": "Dreamcast", "rom_count": 362}, + {"type": "platform", "id": 19, "name": "PlayStation", "rom_count": 1978}, + {"type": "platform", "id": 11, "name": "SNES", "rom_count": 830}, + {"type": "collection", "id": 94, "name": "Castlevania", "rom_count": 23}, + {"type": "collection", "id": 90, "name": "Metroid", "rom_count": 11}, +] +``` + +**Emit:** `sync_plan` event to frontend with the full work queue. This enables the frontend to render a per-platform progress view (future UX feature). + +### Phase 1–N: Per-Unit Sync (one iteration per unit) + +For each unit in the work queue, execute all steps to completion: + +#### Step 1: Fetch ROMs + +- **Platform unit:** Call existing `_full_fetch_platform_roms()` (or `_try_incremental_skip()` for unchanged platforms). Already works for a single platform — no change needed. +- **Collection unit:** Call existing `_fetch_single_collection_roms()`. Already works for a single collection — no change needed. +- **Deduplication:** Maintain a running `synced_rom_ids: set[int]` across units. Collection units skip ROMs already synced via a platform unit. This is the same dedup logic that exists today in `_fetch_collection_roms()`, just applied per-collection instead of in a batch. + +#### Step 2: Build Shortcut Data + +- Call existing `_build_shortcuts_data(unit_roms)` with only this unit's ROMs. +- No change to the function itself — it already accepts any ROM list. + +#### Step 3: Cache Metadata + +- Extract and cache metadata for this unit's ROMs only. +- Same existing logic, smaller input. + +#### Step 4: Download Artwork + +- Call existing `artwork.download_artwork(unit_roms, ...)` with only this unit's ROMs. +- No change to the function — it already accepts any ROM list and handles skip-if-exists. + +#### Step 5: Emit `sync_apply_unit` + +New event (replaces the monolithic `sync_apply`): + +```python +await self._emit("sync_apply_unit", { + "unit_type": "platform", # or "collection" + "unit_name": "Dreamcast", + "unit_index": 0, # 0-based index in work queue + "total_units": 5, + "shortcuts": shortcuts_data, # only this unit's shortcuts + "remove_rom_ids": [], # empty — stale cleanup deferred to end +}) +``` + +The payload is much smaller (362 shortcuts vs 3,241), parsed faster, and processed immediately. + +#### Step 6: Frontend Applies Shortcuts + +`syncManager.ts` processes `sync_apply_unit`: + +1. For each shortcut in the unit: + - `AddShortcut()` or update existing → get `appId` + - Read artwork from staging path, `SetCustomArtworkForApp()` — **no base64 round-trip** + - Record `rom_id → appId` mapping +2. Report unit results back to backend: `reportUnitResults(rom_id_to_app_id)` + +**At this point, this platform's games are visible, have artwork, and are playable.** + +#### Step 7: Backend Updates Registry + +- `report_unit_results()` (new, simpler version of `report_sync_results`): + - Renames staging artwork to `{appId}p.png` + - Updates `shortcut_registry` for this unit's ROMs + - Saves state to disk — **crash-safe checkpoint** + - Does NOT build collections yet (deferred) + +#### Step 8: Collection Grouping (collection units only) + +- After a collection unit's shortcuts are applied, create/update the Steam collection for that collection. +- Platform collections are created after all platform units complete (needs the full set of platform appIds). + +### Final Phase: Stale Cleanup + +After all units are processed: + +1. **Build the complete set of synced ROM IDs** from the accumulated results across all units. +2. **Compare against the shortcut registry** to find stale entries (ROMs in registry but not in any synced unit). +3. **Emit stale removal** to frontend. +4. **Build platform collections** from the registry (needs all platform appIds). +5. **Save final state** with `last_sync` timestamp and synced platform/collection lists. + +Stale cleanup MUST happen at the end because we can't know what's stale until we've seen every unit's ROM IDs. + +--- + +## Artwork Unification + +### Current Flow (double-download) + +``` +Python: download cover → save to romm_{rom_id}_cover.png (disk) +Python: emit sync_apply with cover_path +JS: AddShortcut() → get appId +JS: getArtworkBase64(romId) → Python reads file, base64-encodes → JS receives +JS: SetCustomArtworkForApp(appId, base64, "png", 0) +Python (later): rename romm_{rom_id}_cover.png → {appId}p.png +``` + +The cover bytes travel: RomM server → Python → disk → Python → base64 → JS → Steam + +### New Flow (single pass) + +``` +Python: download cover → save to romm_{rom_id}_cover.png (disk) +Python: emit sync_apply_unit with cover_path per shortcut +JS: AddShortcut() → get appId +JS: reportUnitResults({rom_id: appId, ...}) +Python: rename romm_{rom_id}_cover.png → {appId}p.png +Python: SetCustomArtworkForApp via SteamClient? +``` + +**Open question:** Can Python set Steam artwork directly, or must JS do it? If JS must call `SetCustomArtworkForApp`, we still need to transfer the image. Options: + +- **Option A (file path):** JS reads the staging file directly from the grid directory using Node.js `fs` (if accessible in Decky's CEF context). Avoids base64 encoding/decoding overhead. +- **Option B (keep base64 but per-unit):** Still do the base64 round-trip, but only for one unit's worth of ROMs at a time. Less impactful because the unit is small. +- **Option C (defer artwork to post-shortcut):** Create the shortcut, get the appId, report back to Python, Python renames the file to `{appId}p.png`. Steam may pick it up automatically from the grid directory without needing `SetCustomArtworkForApp` at all — the file naming convention `{appId}p.png` is what Steam's grid system looks for. + +**Recommended: Option C.** Steam's grid system reads artwork from `{appId}p.png` in the grid directory. If we rename the staging file to the correct name, Steam should display it without any explicit API call. This eliminates the base64 transfer entirely. Needs verification on Deck. + +--- + +## Inter-Unit State + +State that accumulates across units: + +| State | Purpose | When used | +|---|---|---| +| `synced_rom_ids: set[int]` | Deduplication — collections skip ROMs already synced via platforms | Collection unit fetch step | +| `all_rom_id_to_app_id: dict[str, int]` | Complete mapping for stale detection and platform collections | Final phase | +| `platform_app_ids: dict[str, list[int]]` | Platform name → appIds for Steam platform collections | Final phase | +| `collection_memberships: dict[str, list[int]]` | Collection name → ROM IDs for Steam collection grouping | Final phase (or per-collection-unit) | + +All of these are simple accumulators that grow as each unit completes. No complex merging required. + +--- + +## Event Protocol Changes + +### New Events + +| Event | Direction | When | Payload | +|---|---|---|---| +| `sync_plan` | Python → JS | After Phase 0 | `{units: [{type, name, rom_count}], total_roms: int}` | +| `sync_apply_unit` | Python → JS | Per unit | `{unit_type, unit_name, unit_index, total_units, shortcuts: [...]}` | +| `sync_unit_complete` | JS → Python (RPC) | Per unit | `{rom_id_to_app_id: {...}}` | +| `sync_stale` | Python → JS | Final phase | `{remove_rom_ids: [...]}` | +| `sync_collections` | Python → JS | Final phase | `{platform_app_ids: {...}, romm_collection_app_ids: {...}}` | + +### Removed Events + +| Event | Why | +|---|---| +| `sync_apply` (monolithic) | Replaced by per-unit `sync_apply_unit` | + +### Unchanged Events + +| Event | Why unchanged | +|---|---| +| `sync_progress` | Still emitted throughout, now with unit-level context | +| `sync_complete` | Still emitted once at the very end | + +--- + +## 50ms Delay Reduction + +The current 50ms `await delay(50)` between each shortcut creation exists to "yield to the event loop." With per-unit processing: + +- Units are smaller (362 shortcuts for Dreamcast vs 3,241 total). +- We can **reduce the delay to 5–10ms** or switch to yielding every N shortcuts instead of every single one. +- For 362 shortcuts at 10ms: 3.6s of delay vs 18s at 50ms. For 1,978 (PlayStation): 20s vs 99s. +- **Conservative approach:** Start at 20ms, measure, reduce further if no crashes. + +This alone saves ~2 minutes on the current 3,241-shortcut sync. + +--- + +## Crash Safety + +### Current State + +If the plugin crashes or the user kills it mid-sync, `report_sync_results()` never runs. The shortcut registry is not updated. All newly created shortcuts become orphans — present in Steam but unknown to the plugin. + +### Per-Unit Improvement + +After each unit completes, `report_unit_results()` persists the registry. If the plugin crashes between unit 3 and unit 4: + +- Units 1–3 are fully persisted in the registry. +- Unit 4's shortcuts may be orphaned (the usual risk), but it's at most one platform's worth. +- On next sync, incremental skip can detect that units 1–3 are unchanged and skip them. + +This is a significant durability improvement with no additional code — it's a natural consequence of per-unit state saves. + +--- + +## Changes Required by File + +### Python (backend) + +| File | Change | Scope | +|---|---|---| +| `library.py` | Restructure `_do_sync()` to iterate per-unit | Major — this is the main change | +| `library.py` | New `_sync_one_platform()` method | Extract from existing code | +| `library.py` | New `_sync_one_collection()` method | Extract from existing code | +| `library.py` | New `report_unit_results()` RPC method | Small — subset of `report_sync_results` | +| `library.py` | Stale detection moved to final phase | Move, not rewrite | +| `main.py` | Expose `report_unit_results` callable | 1 line | +| `artwork.py` | No changes | ✅ | +| `http.py` | No changes | ✅ | +| `perf.py` | No changes (phases still timed) | ✅ | + +### TypeScript (frontend) + +| File | Change | Scope | +|---|---|---| +| `syncManager.ts` | Handle `sync_apply_unit` instead of `sync_apply` | Moderate — same processing logic, different event shape | +| `syncManager.ts` | Call `reportUnitResults()` after each unit | Small | +| `syncManager.ts` | Handle `sync_stale` event for removals | Small | +| `backend.ts` | New `reportUnitResults()` API call | 1 function | +| `index.tsx` | Register new event listeners | Small | +| `types/index.ts` | New event types | Small | + +### Files NOT changed + +- `artwork.py` — already accepts a ROM list of any size +- `http.py` — HTTP adapter is unit-agnostic +- `perf.py` — phase timing still works (phases are just smaller/repeated) +- `steamShortcuts.ts` — `addShortcut()`/`removeShortcut()` unchanged +- `collections.ts` — collection creation logic unchanged +- `domain/shortcut_data.py` — unchanged +- `domain/sync_state.py` — unchanged + +--- + +## Testing Strategy + +### Unit Tests + +- `_sync_one_platform()` with mocked API returns correct shortcuts data +- `_sync_one_collection()` deduplicates against `synced_rom_ids` +- `report_unit_results()` updates registry and saves state +- Stale detection with partial sync (some units complete, some not) +- Work queue ordering (platforms before collections) + +### Integration Tests (on Deck) + +1. **Small sync (1 platform, 0 collections):** Verify single-unit pipeline works end to end. +2. **Multi-platform sync:** Verify platforms appear one at a time in Steam Library. +3. **Platform + collection sync:** Verify collection-only ROMs are handled after platforms. +4. **Cancel mid-unit:** Verify completed units are preserved, in-progress unit is partially applied. +5. **Cancel between units:** Verify all completed units are fully persisted. + +### Performance Measurement + +Compare against the Feature 1 baseline: + +| Metric | Baseline (all-at-once) | Target (per-unit) | +|---|---|---| +| Time to first visible game | ~40 min | ~2–4 min | +| Total sync time | ~40 min | ~35 min (reduced delay + no double-artwork) | +| Data transferred (artwork) | 2× (disk + base64) | 1× (disk only, if Option C works) | +| Crash recovery | 0 games preserved | All completed units preserved | + +--- + +## Open Questions + +1. **Does Steam auto-detect `{appId}p.png` in the grid directory?** If yes, Option C for artwork eliminates base64 entirely. If no, we fall back to Option B (per-unit base64, smaller batches). **Must verify on Deck before implementing.** + +2. **Should we parallelize artwork within a unit?** The current sequential download takes ~1s per cover. For a 362-ROM platform, that's ~6 minutes of artwork alone. Adding 4–8 concurrent downloads per unit would reduce this to ~1 min per platform. This is an orthogonal optimization that can be added later without changing the per-unit architecture. + +3. **How should `sync_preview` (delta mode) work?** Delta preview currently fetches everything to compute the diff. Per-unit pipeline could either: (a) keep preview as-is (fetch-all for diff summary), or (b) skip preview entirely in full-sync mode and only use it for delta syncs. **Recommend deferring this decision.** + +4. **Platform collection timing:** Platform Steam collections need all appIds for a platform. Since we process one platform at a time, we can create each platform's collection immediately after that unit completes. This is simpler than the current approach of building all collections at the end. + +--- + +## Deck Test Plan — April 5, 2026 + +### Test Configuration + +| Setting | Value | RomM ID | +|---|---|---| +| Platform: Dreamcast | Enabled | 4 | +| Platform: Nintendo 64 | Enabled | 8 | +| Collection: Metroid | Enabled | 90 | +| `collection_create_platform_groups` | `true` | — | +| `log_level` | `debug` | — | + +### Pre-Test State + +| Item | Count | Notes | +|---|---|---| +| Existing registry entries | 373 | 362 Dreamcast + 11 Metroid (from prior syncs) | +| Existing grid artwork files | 382 | | +| N64 entries in registry | 0 | Never synced before | + +### Expected Behavior (Incremental Sync) + +1. **Phase 0 — Build Work Queue:** 3 units (Dreamcast, N64, Metroid collection) +2. **Unit 1 — Dreamcast (362 ROMs):** Already in registry → should be fast. Shortcuts already exist, minimal changes. `sync_apply_unit` emitted, frontend processes, reports back. +3. **Unit 2 — N64 (329 ROMs):** Fresh sync. All 329 shortcuts created with artwork. This is the main test of new-platform add within the per-unit pipeline. +4. **Unit 3 — Metroid collection (11 ROMs):** Deduplication test. Metroid ROMs span multiple platforms (NGC, NDS, GBA, SNES, GB, Wii). ROMs already synced via platforms should be skipped; only collection membership matters. +5. **Stale cleanup:** Should find nothing to remove (only additions, no removals). +6. **Collections update:** Steam collections created/updated for Dreamcast, N64, and Metroid. + +### Success Criteria + +- [ ] All 3 units process sequentially (visible in logs as separate `sync_apply_unit` events) +- [ ] Dreamcast unit completes without re-downloading all artwork +- [ ] N64 unit creates ~329 new shortcuts +- [ ] Metroid collection deduplicates correctly (no duplicate shortcuts) +- [ ] Registry grows from 373 → ~702 entries (373 + 329 N64) +- [ ] Grid artwork grows by ~329 files (N64 covers) +- [ ] Steam collections exist for Dreamcast, N64, and Metroid +- [ ] No crashes, no orphaned shortcuts +- [ ] `report_unit_results` called successfully 3 times (once per unit + once for stale) + +### Test Results + +_(To be filled in after sync)_ diff --git a/main.py b/main.py index 6ed3d83..c76cb3c 100755 --- a/main.py +++ b/main.py @@ -498,6 +498,9 @@ async def sync_cancel_preview(self): async def report_sync_results(self, rom_id_to_app_id, removed_rom_ids, cancelled=False): return await self._sync_service.report_sync_results(rom_id_to_app_id, removed_rom_ids, cancelled) + async def report_unit_results(self, rom_id_to_app_id): + return self._sync_service.report_unit_results(rom_id_to_app_id) + async def get_registry_platforms(self): return self._sync_service.get_registry_platforms() diff --git a/py_modules/services/library.py b/py_modules/services/library.py index 971bba3..a58dec8 100644 --- a/py_modules/services/library.py +++ b/py_modules/services/library.py @@ -10,6 +10,7 @@ from __future__ import annotations import asyncio +import os import time import uuid from datetime import UTC, datetime @@ -862,97 +863,412 @@ async def _fetch_and_prepare(self): return all_roms, shortcuts_data, platforms, collection_memberships, platform_rom_ids - # ── Full sync ──────────────────────────────────────────── + # ── Per-unit sync helpers ──────────────────────────────── - async def _do_sync(self): - self._perf.start_sync() - try: + async def _build_work_queue(self): + """Phase 0: Build the ordered list of work units (platforms then collections). + + Returns (work_queue, platforms_list, enabled_collections_metadata). + """ + platforms = await self._fetch_enabled_platforms() + self._check_cancelling() + + work_queue = [] + for p in platforms: + work_queue.append({ + "type": "platform", + "id": p["id"], + "name": p.get("name", p.get("display_name", "Unknown")), + "slug": p.get("slug", ""), + "rom_count": p.get("rom_count", 0), + "_platform": p, + }) + + # Fetch collection metadata (list only, not ROMs) + enabled_collections = self._settings.get("enabled_collections", {}) + enabled_ids = {k for k, v in enabled_collections.items() if v} + collections_meta = [] + if enabled_ids: try: - fetch_result = await self._fetch_and_prepare() - all_roms, shortcuts_data, platforms, collection_memberships, platform_rom_ids = fetch_result - except asyncio.CancelledError: - await self._finish_sync(_SYNC_CANCELLED) - raise + user_collections = await self._loop.run_in_executor(None, self._romm_api.list_collections) + franchise_collections = [] + try: + franchise_collections = await self._loop.run_in_executor( + None, self._romm_api.list_virtual_collections, "franchise" + ) + except Exception as e: + self._logger.warning(f"Failed to fetch franchise collections: {e}") + + for c in user_collections + franchise_collections: + cid = str(c.get("id", "")) + if cid in enabled_ids: + collections_meta.append(c) + work_queue.append({ + "type": "collection", + "id": cid, + "name": c.get("name", cid), + "rom_count": c.get("rom_count", len(c.get("rom_ids", []))), + "_collection": c, + }) + except RommUnsupportedError: + self._logger.info("Collections not supported on this RomM version") except Exception as e: - self._logger.error(f"Failed to fetch platforms: {e}") - _code, _msg = classify_error(e) - await self._emit_progress("error", message=_msg, running=False) - self._sync_state = SyncState.IDLE - return + self._logger.warning(f"Failed to fetch collection list: {e}") - # Calculate step plan for full sync - has_artwork = len(all_roms) > 0 - has_shortcuts = len(shortcuts_data) > 0 - full_steps = [] - if has_artwork: - full_steps.append("artwork") - if has_shortcuts: - full_steps.append("shortcuts") - full_total_steps = len(full_steps) - full_current_step = 0 - - if has_artwork: - full_current_step += 1 - await self._emit_progress( - "applying", - total=len(all_roms), - message=f"Downloading artwork 0/{len(all_roms)}", - step=full_current_step, - total_steps=full_total_steps, - ) - with self._perf.time_phase("download_artwork"): - cover_paths = await self._download_artwork( - all_roms, progress_step=full_current_step, progress_total_steps=full_total_steps - ) - else: - cover_paths = {} + return work_queue, platforms, collections_meta - if self._sync_state == SyncState.CANCELLING: - await self._finish_sync(_SYNC_CANCELLED) - return + async def _sync_one_platform(self, unit, synced_rom_ids, unit_index, total_units): + """Sync a single platform unit: fetch ROMs → build shortcuts → artwork → emit. + + Returns (unit_roms, shortcuts_data) for the platform. + """ + platform = unit["_platform"] + platform_name = unit["name"] + platform_slug = unit["slug"] + platform_id = unit["id"] + + # Fetch ROMs (incremental skip or full fetch) + unit_roms = [] + last_sync = self._state.get("last_sync") + registry = self._state.get("shortcut_registry", {}) + + skipped = await self._try_incremental_skip( + platform, registry, last_sync, platform_name, platform_slug, + unit_roms, unit_index + 1, total_units, + ) + if not skipped: + await self._full_fetch_platform_roms( + platform_id, platform_name, platform_slug, + unit_roms, unit_index + 1, total_units, + ) + self._check_cancelling() + + # Track which ROMs came from this platform + for r in unit_roms: + synced_rom_ids.add(r["id"]) + + # Build shortcut data + shortcuts_data = self._build_shortcuts_data(unit_roms) + + # Cache metadata + if self._metadata_service is not None: + for rom in unit_roms: + rom_id_str = str(rom["id"]) + self._metadata_cache[rom_id_str] = self._metadata_service.extract_metadata(rom) + self._metadata_service.mark_metadata_dirty() + + # Download artwork + await self._emit_progress( + "applying", + total=len(unit_roms), + message=f"Downloading artwork for {platform_name} 0/{len(unit_roms)}", + ) + cover_paths = await self._download_artwork(unit_roms) + for sd in shortcuts_data: + sd["cover_path"] = cover_paths.get(sd["rom_id"], "") + + self._check_cancelling() + + # Store pending data for this unit + for sd in shortcuts_data: + self._pending_sync[sd["rom_id"]] = sd + + # Emit per-unit event + await self._emit_progress( + "applying", + total=len(shortcuts_data), + message=f"Applying {platform_name} shortcuts 0/{len(shortcuts_data)}", + ) + await self._emit("sync_apply_unit", { + "unit_type": "platform", + "unit_name": platform_name, + "unit_index": unit_index, + "total_units": total_units, + "shortcuts": shortcuts_data, + }) + + self._logger.info( + f"Unit {unit_index + 1}/{total_units}: {platform_name} — " + f"{len(unit_roms)} ROMs, {len(shortcuts_data)} shortcuts emitted" + ) + + return unit_roms, shortcuts_data + + async def _sync_one_collection(self, unit, synced_rom_ids, unit_index, total_units): + """Sync a single collection unit: fetch ROMs → build shortcuts → artwork → emit. + + Returns (collection_roms, shortcuts_data, collection_name, coll_rom_ids). + """ + collection = unit["_collection"] + coll_name = unit["name"] + + # Fetch collection ROMs (dedup against already-synced ROMs) + collection_only_roms = [] + all_seen = set(synced_rom_ids) + coll_rom_ids = await self._fetch_single_collection_roms( + collection, all_seen, collection_only_roms, + ) + + self._check_cancelling() + + # Track newly seen ROMs + synced_rom_ids.update(r["id"] for r in collection_only_roms) + + # Build shortcut data for collection-only ROMs (not already synced) + shortcuts_data = self._build_shortcuts_data(collection_only_roms) + + # Cache metadata + if self._metadata_service is not None: + for rom in collection_only_roms: + rom_id_str = str(rom["id"]) + self._metadata_cache[rom_id_str] = self._metadata_service.extract_metadata(rom) + self._metadata_service.mark_metadata_dirty() + + # Download artwork for collection-only ROMs + if collection_only_roms: + await self._emit_progress( + "applying", + total=len(collection_only_roms), + message=f"Downloading artwork for {coll_name} 0/{len(collection_only_roms)}", + ) + cover_paths = await self._download_artwork(collection_only_roms) for sd in shortcuts_data: sd["cover_path"] = cover_paths.get(sd["rom_id"], "") - # Determine stale rom_ids by comparing current sync with registry - current_rom_ids = {r["id"] for r in all_roms} - stale_rom_ids = [int(rid) for rid in self._state["shortcut_registry"] if int(rid) not in current_rom_ids] + self._check_cancelling() + + # Store pending data for this unit + for sd in shortcuts_data: + self._pending_sync[sd["rom_id"]] = sd - # Emit sync_apply for frontend to process via SteamClient - next_step = full_current_step + 1 + # Emit per-unit event (only collection-only ROMs need shortcuts) + if shortcuts_data: await self._emit_progress( "applying", total=len(shortcuts_data), - message=f"Applying shortcuts 0/{len(shortcuts_data)}", - step=next_step, - total_steps=full_total_steps, + message=f"Applying {coll_name} shortcuts 0/{len(shortcuts_data)}", + ) + await self._emit("sync_apply_unit", { + "unit_type": "collection", + "unit_name": coll_name, + "unit_index": unit_index, + "total_units": total_units, + "shortcuts": shortcuts_data, + }) + + self._logger.info( + f"Unit {unit_index + 1}/{total_units}: {coll_name} — " + f"{len(coll_rom_ids)} ROMs ({len(collection_only_roms)} new), " + f"{len(shortcuts_data)} shortcuts emitted" + ) + + return collection_only_roms, shortcuts_data, coll_name, coll_rom_ids + + async def _wait_for_unit_results(self, timeout_sec=60): + """Wait for the frontend to call report_unit_results for the current unit. + + Returns the rom_id_to_app_id dict, or None on timeout. + """ + self._unit_result_event.clear() + self._sync_last_heartbeat = time.monotonic() + + while not self._unit_result_event.is_set(): + try: + await asyncio.wait_for(self._unit_result_event.wait(), timeout=10) + except asyncio.TimeoutError: + elapsed = time.monotonic() - self._sync_last_heartbeat + if elapsed > timeout_sec: + self._logger.warning(f"Unit result timeout: no response for {elapsed:.0f}s") + return None + # Check for cancellation while waiting + if self._sync_state == SyncState.CANCELLING: + return None + + result = self._unit_result_data + self._unit_result_data = None + return result + + def report_unit_results(self, rom_id_to_app_id): + """Called by frontend after processing a sync_apply_unit event. + + Updates the shortcut registry for this unit's ROMs and signals + the sync loop to continue with the next unit. + """ + grid = self._steam_config.grid_dir() + + for rom_id_str, app_id in rom_id_to_app_id.items(): + pending = self._pending_sync.get(int(rom_id_str), {}) + cover_path = self._finalize_cover_path(grid, pending.get("cover_path", ""), app_id, rom_id_str) + self._state["shortcut_registry"][rom_id_str] = self._build_registry_entry(pending, app_id, cover_path) + + # Apply Steam Input mode for new shortcuts + steam_input_mode = self._settings.get("steam_input_mode", "default") + if steam_input_mode != "default" and rom_id_to_app_id: + try: + self._steam_config.set_steam_input_config( + [int(aid) for aid in rom_id_to_app_id.values()], mode=steam_input_mode + ) + except Exception as e: + self._logger.error(f"Failed to set Steam Input config: {e}") + + # Persist state (crash checkpoint) + self._save_state() + + self._logger.info(f"Unit results: {len(rom_id_to_app_id)} shortcuts registered") + + # Signal the sync loop to continue + self._unit_result_data = rom_id_to_app_id + self._unit_result_event.set() + + # Send heartbeat to keep safety timeout alive + self._sync_last_heartbeat = time.monotonic() + + return {"success": True} + + # ── Full sync ──────────────────────────────────────────── + + async def _do_sync(self): + self._perf.start_sync() + self._pending_sync = {} + self._unit_result_event = asyncio.Event() + self._unit_result_data = None + + try: + # Phase 0: Build work queue + await self._emit_progress("platforms", message="Building sync plan...") + with self._perf.time_phase("build_work_queue"): + work_queue, platforms, _ = await self._build_work_queue() + + self._check_cancelling() + + if not work_queue: + await self._emit_progress("done", message="Nothing to sync", running=False) + self._sync_state = SyncState.IDLE + return + + total_units = len(work_queue) + total_roms_estimate = sum(u.get("rom_count", 0) for u in work_queue) + + # Emit sync_plan so frontend knows the work queue + await self._emit("sync_plan", { + "units": [ + {"type": u["type"], "name": u["name"], "rom_count": u.get("rom_count", 0)} + for u in work_queue + ], + "total_roms": total_roms_estimate, + }) + + self._logger.info( + f"Sync plan: {total_units} units, ~{total_roms_estimate} ROMs — " + f"{[u['name'] for u in work_queue]}" + ) + + # Per-unit processing + synced_rom_ids: set[int] = set() + platform_rom_ids: set[int] = set() + collection_memberships: dict[str, list[int]] = {} + all_rom_id_to_app_id: dict[str, int] = {} + + for unit_index, unit in enumerate(work_queue): + self._check_cancelling() + unit_type = unit["type"] + + if unit_type == "platform": + with self._perf.time_phase(f"unit_platform_{unit['name']}"): + unit_roms, shortcuts_data = await self._sync_one_platform( + unit, synced_rom_ids, unit_index, total_units, + ) + platform_rom_ids.update(r["id"] for r in unit_roms) + + elif unit_type == "collection": + with self._perf.time_phase(f"unit_collection_{unit['name']}"): + _, shortcuts_data, coll_name, coll_rom_ids = await self._sync_one_collection( + unit, synced_rom_ids, unit_index, total_units, + ) + if coll_rom_ids: + collection_memberships[coll_name] = coll_rom_ids + + # Wait for frontend to process this unit's shortcuts + if shortcuts_data: + unit_result = await self._wait_for_unit_results() + if unit_result is None: + if self._sync_state == SyncState.CANCELLING: + await self._finish_sync(_SYNC_CANCELLED) + return + self._logger.warning(f"Unit {unit_index + 1} timed out, continuing...") + else: + all_rom_id_to_app_id.update(unit_result) + + self._perf.set_gauge("units_completed", unit_index + 1) + + # Flush metadata + if self._metadata_service is not None: + self._metadata_service.flush_metadata_if_dirty() + + # Final phase: Stale cleanup + self._check_cancelling() + stale_rom_ids = [ + int(rid) for rid in self._state["shortcut_registry"] + if int(rid) not in synced_rom_ids + ] + + if stale_rom_ids: + await self._emit("sync_stale", {"remove_rom_ids": stale_rom_ids}) + self._logger.info(f"Emitted {len(stale_rom_ids)} stale removals") + # Wait for frontend to process removals + stale_result = await self._wait_for_unit_results(timeout_sec=60) + if stale_result is not None: + # stale_result contains removed rom_ids as keys with app_id=0 + for rom_id_str in stale_result: + self._state["shortcut_registry"].pop(rom_id_str, None) + self._save_state() + + # Final phase: Build and emit collections + self._pending_collection_memberships = collection_memberships + self._pending_platform_rom_ids = platform_rom_ids + + platform_app_ids, romm_collection_app_ids = self._build_collection_app_ids( + self._state["shortcut_registry"], + platform_rom_ids, + collection_memberships, ) - # Save sync stats (registry updated by report_sync_results) + # Save final sync state self._state["sync_stats"] = { "platforms": len(platforms), - "roms": len(all_roms), + "roms": len(synced_rom_ids), } + self._state["last_sync"] = datetime.now(UTC).isoformat() + self._state["last_synced_collections"] = list(collection_memberships.keys()) + self._state["last_synced_platforms"] = list(platform_app_ids.keys()) self._save_state() - # Store pending data for report_sync_results to reference - self._pending_sync = {sd["rom_id"]: sd for sd in shortcuts_data} - self._pending_collection_memberships = collection_memberships - self._pending_platform_rom_ids = platform_rom_ids + # Emit sync_complete with collection data + total = len(self._state["shortcut_registry"]) + await self._emit("sync_complete", { + "platform_app_ids": platform_app_ids, + "romm_collection_app_ids": romm_collection_app_ids, + "total_games": total, + }) - await self._emit( - "sync_apply", - { - "shortcuts": shortcuts_data, - "remove_rom_ids": stale_rom_ids, - "next_step": next_step, - "total_steps": full_total_steps, - }, + await self._emit_progress( + "done", + current=total, + total=total, + message=f"Sync complete: {total} games from {len(platform_app_ids)} platforms", + running=False, ) - self._logger.info(f"Sync data emitted: {len(shortcuts_data)} shortcuts, {len(stale_rom_ids)} stale") - self._perf.set_gauge("shortcuts_emitted", len(shortcuts_data)) + self._logger.info(f"Sync complete: {total} games, {len(platform_app_ids)} platforms") + self._perf.set_gauge("total_roms", len(synced_rom_ids)) + self._perf.set_gauge("total_platforms", len(platforms)) + self._perf.set_gauge("shortcuts_emitted", len(all_rom_id_to_app_id)) self._perf.set_gauge("stale_rom_ids", len(stale_rom_ids)) + + except asyncio.CancelledError: + await self._finish_sync(_SYNC_CANCELLED) + raise except Exception as e: import traceback @@ -980,9 +1296,10 @@ async def _do_sync(self): self._logger.warning(f"[PerfCollector] Failed to save report: {_e}") if self._metadata_service is not None: self._metadata_service.flush_metadata_if_dirty() + self._pending_sync = {} + self._pending_collection_memberships = {} + self._pending_platform_rom_ids = None self._sync_state = SyncState.IDLE - if self._sync_progress.get("phase") != "error" and self._sync_progress.get("running"): - self._start_safety_timeout() async def _finish_sync(self, message): self._sync_progress = { diff --git a/src/api/backend.ts b/src/api/backend.ts index 1c719db..0e92e70 100755 --- a/src/api/backend.ts +++ b/src/api/backend.ts @@ -86,6 +86,7 @@ export const removeAllShortcuts = callable<[], { success: boolean; message: stri export const getArtworkBase64 = callable<[number], { base64: string | null }>("get_artwork_base64"); export const getSgdbArtworkBase64 = callable<[number, number], { base64: string | null; no_api_key?: boolean }>("get_sgdb_artwork_base64"); export const reportSyncResults = callable<[Record, number[], boolean], { success: boolean }>("report_sync_results"); +export const reportUnitResults = callable<[Record], { success: boolean }>("report_unit_results"); export const reportRemovalResults = callable<[(string | number)[]], { success: boolean; message: string }>("report_removal_results"); export const uninstallAllRoms = callable<[], { success: boolean; message: string; removed_count: number }>("uninstall_all_roms"); export const saveSgdbApiKey = callable<[string], { success: boolean; message: string }>("save_sgdb_api_key"); diff --git a/src/components/MainPage.tsx b/src/components/MainPage.tsx index e80bcfb..712910d 100755 --- a/src/components/MainPage.tsx +++ b/src/components/MainPage.tsx @@ -14,6 +14,7 @@ import { import { FaCheckCircle, FaTimesCircle } from "react-icons/fa"; import { testConnection, + startSync, cancelSync, getSyncStats, getSettings, @@ -150,32 +151,26 @@ export const MainPage: FC = ({ onNavigate }) => { setSyncProgress({ running: true, phase: "fetching", message: "Fetching library..." }); startPolling(true); try { - const result = await syncPreview(); - stopPolling(); - if (result.success) { - const hasChanges = result.summary.new_count + result.summary.changed_count + result.summary.remove_count > 0 || !!result.summary.collection_diff?.has_changes || !!result.summary.platform_collection_diff?.has_changes; - if (skipPreview && hasChanges) { - // Auto-apply: skip preview UI - setSyncProgress({ running: true, phase: "applying", message: "Applying changes..." }); - const applyResult = await syncApplyDelta(result.preview_id); - if (applyResult.success) { - startPolling(); - } else { - setStatus(applyResult.message); - setSyncing(false); - setLoading(false); - } - } else if (skipPreview) { - // Nothing to sync — show brief status - await syncCancelPreview(); - setStatus("Everything is up to date"); - setSyncing(false); - setLoading(false); + if (skipPreview) { + // Per-unit pipeline: skip preview, go straight to full sync + const result = await startSync(); + if (result.success) { + // Polling loop will detect completion via sync_progress + startPolling(); } else { - setPreview(result); + stopPolling(); + setStatus(result.message || "Sync failed to start"); setSyncing(false); setLoading(false); } + return; + } + const result = await syncPreview(); + stopPolling(); + if (result.success) { + setPreview(result); + setSyncing(false); + setLoading(false); } else { setStatus(result.message || "Preview failed"); setSyncing(false); diff --git a/src/index.tsx b/src/index.tsx index 0e0e659..614c871 100755 --- a/src/index.tsx +++ b/src/index.tsx @@ -253,7 +253,7 @@ export default definePlugin(() => { [{ platform_app_ids: Record; romm_collection_app_ids?: Record; total_games: number }] >("sync_complete", onSyncComplete); - const syncApplyListener = initSyncManager(); + const syncApplyListeners = initSyncManager(); // Backend emits sync_progress events throughout _do_sync — update the module-level store const syncProgressListener = addEventListener<[SyncProgress]>( @@ -352,7 +352,9 @@ export default definePlugin(() => { unregisterGameDetailPatch(); unregisterMetadataPatches(); removeEventListener("sync_complete", syncCompleteListener); - removeEventListener("sync_apply", syncApplyListener); + removeEventListener("sync_apply_unit", syncApplyListeners.unitListener); + removeEventListener("sync_stale", syncApplyListeners.staleListener); + removeEventListener("sync_apply", syncApplyListeners.legacyListener); removeEventListener("sync_progress", syncProgressListener); removeEventListener("download_progress", downloadProgressListener); removeEventListener("download_complete", downloadCompleteListener); diff --git a/src/types/index.ts b/src/types/index.ts index d2ffb1a..25ceb6c 100755 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -164,6 +164,39 @@ export interface SyncApplyData { total_steps?: number; } +/** Per-unit pipeline: work queue emitted at sync start */ +export interface SyncPlanUnit { + type: "platform" | "collection"; + id: number | string; + name: string; + rom_count: number; +} + +export interface SyncPlanData { + units: SyncPlanUnit[]; + total_roms: number; +} + +/** Per-unit pipeline: one unit's shortcuts emitted for processing */ +export interface SyncApplyUnitData { + unit_type: "platform" | "collection"; + unit_name: string; + unit_index: number; + total_units: number; + shortcuts: SyncAddItem[]; +} + +/** Per-unit pipeline: stale removals emitted after all units */ +export interface SyncStaleData { + remove_rom_ids: number[]; +} + +/** Per-unit pipeline: final collections emitted after stale cleanup */ +export interface SyncCollectionsData { + platform_app_ids: Record; + romm_collection_app_ids: Record; +} + export interface FirmwareFile { id: number; file_name: string; diff --git a/src/utils/syncManager.ts b/src/utils/syncManager.ts index 062dea9..973f894 100755 --- a/src/utils/syncManager.ts +++ b/src/utils/syncManager.ts @@ -1,11 +1,14 @@ import { addEventListener } from "@decky/api"; -import type { SyncApplyData, SyncChangedItem } from "../types"; -import { getArtworkBase64, reportSyncResults, syncHeartbeat, logInfo, logError } from "../api/backend"; +import type { SyncApplyData, SyncApplyUnitData, SyncStaleData, SyncChangedItem } from "../types"; +import { getArtworkBase64, reportSyncResults, reportUnitResults, syncHeartbeat, logInfo, logError } from "../api/backend"; import { getExistingRomMShortcuts, addShortcut, removeShortcut } from "./steamShortcuts"; import { updateSyncProgress } from "./syncProgress"; const delay = (ms: number) => new Promise((r) => setTimeout(r, ms)); +/** Delay between shortcut operations. 50ms is safe for CEF; 20ms caused crashes with large batches. */ +const SHORTCUT_DELAY_MS = 50; + let _cancelRequested = false; let _isSyncRunning = false; @@ -14,220 +17,392 @@ export function requestSyncCancel(): void { _cancelRequested = true; } +// ── Per-Unit Pipeline ────────────────────────────────────── + +/** + * Process a single unit's shortcuts: create/update shortcuts, fetch artwork, + * then report results back to the backend. + */ +async function processUnit(data: SyncApplyUnitData): Promise { + const { unit_type, unit_name, unit_index, total_units, shortcuts } = data; + logInfo(`sync_apply_unit: ${unit_type} "${unit_name}" (${unit_index + 1}/${total_units}), ${shortcuts.length} shortcuts`); + + if (!Array.isArray(shortcuts) || shortcuts.length === 0) { + logInfo(`sync_apply_unit: no shortcuts for "${unit_name}", reporting empty results`); + try { + await reportUnitResults({}); + } catch (e) { + logError(`Failed to report empty unit results for "${unit_name}": ${e}`); + } + return; + } + + _cancelRequested = false; + let lastHeartbeat = Date.now(); + const HEARTBEAT_INTERVAL_MS = 10_000; + + const existing = await getExistingRomMShortcuts(); + const romIdToAppId: Record = {}; + const artworkTargets: Array<{ appId: number; romId: number; name: string }> = []; + + updateSyncProgress({ + running: true, phase: "applying", + current: 0, total: shortcuts.length, + message: `${unit_name}: Applying shortcuts 0/${shortcuts.length}`, + }); + + for (let i = 0; i < shortcuts.length; i++) { + const item = shortcuts[i]; + try { + updateSyncProgress({ + current: i + 1, + message: `${unit_name}: Applying shortcuts ${i + 1}/${shortcuts.length}`, + }); + + const existingAppId = existing.get(item.rom_id); + let appId: number | undefined; + + if (existingAppId) { + SteamClient.Apps.SetShortcutName(existingAppId, item.name); + SteamClient.Apps.SetShortcutExe(existingAppId, item.exe); + SteamClient.Apps.SetShortcutStartDir(existingAppId, item.start_dir); + SteamClient.Apps.SetAppLaunchOptions(existingAppId, item.launch_options); + appId = existingAppId; + } else { + appId = await addShortcut(item) ?? undefined; + } + + if (appId) { + romIdToAppId[String(item.rom_id)] = appId; + artworkTargets.push({ appId, romId: item.rom_id, name: item.name }); + } + } catch (e) { + logError(`Failed to process shortcut for rom ${item.rom_id}: ${e}`); + } + await delay(SHORTCUT_DELAY_MS); + + if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL_MS) { + syncHeartbeat().catch(() => {}); + lastHeartbeat = Date.now(); + } + + if (_cancelRequested) { + logInfo(`Cancel requested during "${unit_name}" after ${i + 1}/${shortcuts.length}`); + break; + } + } + + // Batch artwork fetch (parallel, up to 8 at a time) + if (!_cancelRequested && artworkTargets.length > 0) { + const ART_CONCURRENCY = 8; + for (let i = 0; i < artworkTargets.length; i += ART_CONCURRENCY) { + if (_cancelRequested) break; + const batch = artworkTargets.slice(i, i + ART_CONCURRENCY); + await Promise.all(batch.map(async ({ appId, romId, name }) => { + try { + const artResult = await getArtworkBase64(romId); + if (artResult.base64) { + await SteamClient.Apps.SetCustomArtworkForApp(appId, artResult.base64, "png", 0); + } + } catch (artErr) { + logError(`Failed to fetch/set artwork for ${name}: ${artErr}`); + } + })); + } + } + + // Report unit results to backend (triggers registry save + signals next unit) + try { + await reportUnitResults(romIdToAppId); + } catch (e) { + logError(`Failed to report unit results for "${unit_name}": ${e}`); + } + + logInfo(`sync_apply_unit complete: "${unit_name}" — ${Object.keys(romIdToAppId).length} shortcuts`); +} + /** - * Initialize the sync manager that listens for sync_apply events from the backend. - * Returns the event listener handle for cleanup. + * Process stale ROM removals emitted after all units complete. */ -export function initSyncManager(): ReturnType { - return addEventListener("sync_apply", async (data: SyncApplyData) => { +async function processStale(data: SyncStaleData): Promise { + const { remove_rom_ids } = data; + if (!Array.isArray(remove_rom_ids) || remove_rom_ids.length === 0) { + try { await reportUnitResults({}); } catch { /* empty */ } + return; + } + + logInfo(`sync_stale: removing ${remove_rom_ids.length} stale shortcuts`); + const existing = await getExistingRomMShortcuts(); + const removedMap: Record = {}; + + for (let i = 0; i < remove_rom_ids.length; i++) { + const romId = remove_rom_ids[i]; + const appId = existing.get(romId); + if (appId) { + removeShortcut(appId); + } + removedMap[String(romId)] = 0; // signal removal with appId=0 + updateSyncProgress({ + current: i + 1, + message: `Removing stale shortcuts ${i + 1}/${remove_rom_ids.length}`, + }); + await delay(SHORTCUT_DELAY_MS); + + if (_cancelRequested) { + logInfo("Cancel requested during stale removals"); + break; + } + } + + try { + await reportUnitResults(removedMap); + } catch (e) { + logError(`Failed to report stale removal results: ${e}`); + } + + logInfo(`sync_stale complete: ${Object.keys(removedMap).length} removed`); +} + +/** + * Initialize the per-unit sync manager. + * Listens for sync_apply_unit and sync_stale events. + * Also retains the legacy sync_apply handler for delta mode. + * Returns cleanup handles. + */ +export function initSyncManager(): { unitListener: ReturnType; staleListener: ReturnType; legacyListener: ReturnType } { + const unitListener = addEventListener("sync_apply_unit", async (data: SyncApplyUnitData) => { + if (_isSyncRunning) { + logInfo("sync_apply_unit: already running, queuing will be handled by backend wait"); + } + _isSyncRunning = true; + try { + await processUnit(data); + } finally { + _isSyncRunning = false; + } + }); + + const staleListener = addEventListener("sync_stale", async (data: SyncStaleData) => { + _isSyncRunning = true; + try { + await processStale(data); + } finally { + _isSyncRunning = false; + } + }); + + // Legacy sync_apply handler for delta preview/apply mode + const legacyListener = addEventListener("sync_apply", async (data: SyncApplyData) => { if (_isSyncRunning) { logInfo("sync_apply: already running, ignoring duplicate event"); return; } _isSyncRunning = true; try { - // Defensive checks against malformed event data - if (!Array.isArray(data.shortcuts)) { - logError("sync_apply: data.shortcuts is not an array, aborting"); - return; - } - if (!Array.isArray(data.remove_rom_ids)) { - logError("sync_apply: data.remove_rom_ids is not an array, aborting"); - return; - } - const isDelta = Array.isArray(data.changed_shortcuts); - logInfo(`sync_apply received: ${data.shortcuts.length} new, ${isDelta ? data.changed_shortcuts!.length + " changed, " : ""}${data.remove_rom_ids.length} remove${isDelta ? " (delta)" : ""}`); - - _cancelRequested = false; - let cancelled = false; - let lastHeartbeat = Date.now(); - const HEARTBEAT_INTERVAL_MS = 10_000; - - const existing = await getExistingRomMShortcuts(); - const romIdToAppId: Record = {}; - const removedRomIds: number[] = []; - const artworkTargets: Array<{ appId: number; romId: number; name: string }> = []; - - // Step plan from backend - let currentStep = data.next_step ?? 1; - const totalSteps = data.total_steps ?? 3; - - // --- Step: Apply shortcuts (new + changed) --- - const totalNew = data.shortcuts.length; - const totalChanged = data.changed_shortcuts?.length ?? 0; - const totalShortcuts = totalNew + totalChanged; - - if (totalShortcuts > 0) { + await processLegacySyncApply(data); + } finally { + _isSyncRunning = false; + } + }); + + return { unitListener, staleListener, legacyListener }; +} + +// ── Legacy sync_apply handler (delta preview/apply) ──────── + +async function processLegacySyncApply(data: SyncApplyData): Promise { + if (!Array.isArray(data.shortcuts)) { + logError("sync_apply: data.shortcuts is not an array, aborting"); + return; + } + if (!Array.isArray(data.remove_rom_ids)) { + logError("sync_apply: data.remove_rom_ids is not an array, aborting"); + return; + } + const isDelta = Array.isArray(data.changed_shortcuts); + logInfo(`sync_apply received: ${data.shortcuts.length} new, ${isDelta ? data.changed_shortcuts!.length + " changed, " : ""}${data.remove_rom_ids.length} remove${isDelta ? " (delta)" : ""}`); + + _cancelRequested = false; + let cancelled = false; + let lastHeartbeat = Date.now(); + const HEARTBEAT_INTERVAL_MS = 10_000; + + const existing = await getExistingRomMShortcuts(); + const romIdToAppId: Record = {}; + const removedRomIds: number[] = []; + const artworkTargets: Array<{ appId: number; romId: number; name: string }> = []; + + let currentStep = data.next_step ?? 1; + const totalSteps = data.total_steps ?? 3; + + const totalNew = data.shortcuts.length; + const totalChanged = data.changed_shortcuts?.length ?? 0; + const totalShortcuts = totalNew + totalChanged; + + if (totalShortcuts > 0) { + updateSyncProgress({ + running: true, phase: "applying", + current: 0, total: totalShortcuts, + message: `Applying shortcuts 0/${totalShortcuts}`, + step: currentStep, totalSteps, + }); + + for (let i = 0; i < data.shortcuts.length; i++) { + const item = data.shortcuts[i]; + try { updateSyncProgress({ - running: true, phase: "applying", - current: 0, total: totalShortcuts, - message: `Applying shortcuts 0/${totalShortcuts}`, - step: currentStep, totalSteps, + current: i + 1, + message: `Applying shortcuts ${i + 1}/${totalShortcuts}`, }); - - for (let i = 0; i < data.shortcuts.length; i++) { - const item = data.shortcuts[i]; - try { - updateSyncProgress({ - current: i + 1, - message: `Applying shortcuts ${i + 1}/${totalShortcuts}`, - }); - let appId: number | undefined; - - if (isDelta) { - const newAppId = await addShortcut(item); - if (newAppId) { - appId = newAppId; - romIdToAppId[String(item.rom_id)] = newAppId; - } - } else { - const existingAppId = existing.get(item.rom_id); - if (existingAppId) { - SteamClient.Apps.SetShortcutName(existingAppId, item.name); - SteamClient.Apps.SetShortcutExe(existingAppId, item.exe); - SteamClient.Apps.SetShortcutStartDir(existingAppId, item.start_dir); - SteamClient.Apps.SetAppLaunchOptions(existingAppId, item.launch_options); - appId = existingAppId; - romIdToAppId[String(item.rom_id)] = existingAppId; - } else { - const newAppId = await addShortcut(item); - if (newAppId) { - appId = newAppId; - romIdToAppId[String(item.rom_id)] = newAppId; - } - } - } - - if (appId) { - artworkTargets.push({ appId, romId: item.rom_id, name: item.name }); - } - } catch (e) { - logError(`Failed to process shortcut for rom ${item.rom_id}: ${e}`); - } - await delay(50); - - if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL_MS) { - syncHeartbeat().catch(() => {}); - lastHeartbeat = Date.now(); - } - - if (_cancelRequested) { - logInfo(`Cancel requested after processing ${i + 1}/${totalShortcuts} shortcuts`); - cancelled = true; - break; + let appId: number | undefined; + + if (isDelta) { + const newAppId = await addShortcut(item); + if (newAppId) { + appId = newAppId; + romIdToAppId[String(item.rom_id)] = newAppId; } - } - - // Process changed shortcuts (delta mode only) - if (!cancelled && isDelta && data.changed_shortcuts) { - for (let i = 0; i < data.changed_shortcuts.length; i++) { - const item: SyncChangedItem = data.changed_shortcuts[i]; - const idx = totalNew + i; - try { - updateSyncProgress({ - current: idx + 1, - message: `Updating shortcuts ${idx + 1}/${totalShortcuts}`, - }); - const appId = item.existing_app_id; - - SteamClient.Apps.SetShortcutName(appId, item.name); - SteamClient.Apps.SetShortcutExe(appId, item.exe); - SteamClient.Apps.SetShortcutStartDir(appId, item.start_dir); - SteamClient.Apps.SetAppLaunchOptions(appId, item.launch_options); - romIdToAppId[String(item.rom_id)] = appId; - - artworkTargets.push({ appId, romId: item.rom_id, name: item.name }); - } catch (e) { - logError(`Failed to update shortcut for rom ${item.rom_id}: ${e}`); - } - await delay(50); - - if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL_MS) { - syncHeartbeat().catch(() => {}); - lastHeartbeat = Date.now(); - } - - if (_cancelRequested) { - logInfo(`Cancel requested during changed shortcuts processing`); - cancelled = true; - break; + } else { + const existingAppId = existing.get(item.rom_id); + if (existingAppId) { + SteamClient.Apps.SetShortcutName(existingAppId, item.name); + SteamClient.Apps.SetShortcutExe(existingAppId, item.exe); + SteamClient.Apps.SetShortcutStartDir(existingAppId, item.start_dir); + SteamClient.Apps.SetAppLaunchOptions(existingAppId, item.launch_options); + appId = existingAppId; + romIdToAppId[String(item.rom_id)] = existingAppId; + } else { + const newAppId = await addShortcut(item); + if (newAppId) { + appId = newAppId; + romIdToAppId[String(item.rom_id)] = newAppId; } } } - - currentStep++; - } - // --- Batch artwork fetch (parallel, up to 8 at a time) --- - if (!cancelled && artworkTargets.length > 0) { - const ART_CONCURRENCY = 8; - for (let i = 0; i < artworkTargets.length; i += ART_CONCURRENCY) { - if (_cancelRequested) { - logInfo("Cancel requested during artwork fetching"); - cancelled = true; - break; - } - const batch = artworkTargets.slice(i, i + ART_CONCURRENCY); - await Promise.all(batch.map(async ({ appId, romId, name }) => { - try { - const artResult = await getArtworkBase64(romId); - if (artResult.base64) { - await SteamClient.Apps.SetCustomArtworkForApp(appId, artResult.base64, "png", 0); - logInfo(`Set cover artwork for ${name} (appId=${appId})`); - } - } catch (artErr) { - logError(`Failed to fetch/set artwork for ${name}: ${artErr}`); - } - })); + if (appId) { + artworkTargets.push({ appId, romId: item.rom_id, name: item.name }); } + } catch (e) { + logError(`Failed to process shortcut for rom ${item.rom_id}: ${e}`); } + await delay(50); - // --- Step: Remove shortcuts --- - if (!cancelled && data.remove_rom_ids.length > 0) { - const totalRemovals = data.remove_rom_ids.length; - updateSyncProgress({ - phase: "applying", current: 0, total: totalRemovals, - message: `Removing shortcuts 0/${totalRemovals}`, - step: currentStep, totalSteps, - }); - - for (let i = 0; i < data.remove_rom_ids.length; i++) { - const romId = data.remove_rom_ids[i]; - const appId = existing.get(romId); - if (appId) { - removeShortcut(appId); - } - removedRomIds.push(romId); + if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL_MS) { + syncHeartbeat().catch(() => {}); + lastHeartbeat = Date.now(); + } + + if (_cancelRequested) { + logInfo(`Cancel requested after processing ${i + 1}/${totalShortcuts} shortcuts`); + cancelled = true; + break; + } + } + + if (!cancelled && isDelta && data.changed_shortcuts) { + for (let i = 0; i < data.changed_shortcuts.length; i++) { + const item: SyncChangedItem = data.changed_shortcuts[i]; + const idx = totalNew + i; + try { updateSyncProgress({ - current: i + 1, - message: `Removing shortcuts ${i + 1}/${totalRemovals}`, + current: idx + 1, + message: `Updating shortcuts ${idx + 1}/${totalShortcuts}`, }); - await delay(50); - - if (_cancelRequested) { - logInfo("Cancel requested during removals"); - cancelled = true; - break; + const appId = item.existing_app_id; + + SteamClient.Apps.SetShortcutName(appId, item.name); + SteamClient.Apps.SetShortcutExe(appId, item.exe); + SteamClient.Apps.SetShortcutStartDir(appId, item.start_dir); + SteamClient.Apps.SetAppLaunchOptions(appId, item.launch_options); + romIdToAppId[String(item.rom_id)] = appId; + + artworkTargets.push({ appId, romId: item.rom_id, name: item.name }); + } catch (e) { + logError(`Failed to update shortcut for rom ${item.rom_id}: ${e}`); + } + await delay(50); + + if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL_MS) { + syncHeartbeat().catch(() => {}); + lastHeartbeat = Date.now(); + } + + if (_cancelRequested) { + logInfo(`Cancel requested during changed shortcuts processing`); + cancelled = true; + break; + } + } + } + + currentStep++; + } + + // Batch artwork fetch + if (!cancelled && artworkTargets.length > 0) { + const ART_CONCURRENCY = 8; + for (let i = 0; i < artworkTargets.length; i += ART_CONCURRENCY) { + if (_cancelRequested) { + cancelled = true; + break; + } + const batch = artworkTargets.slice(i, i + ART_CONCURRENCY); + await Promise.all(batch.map(async ({ appId, romId, name }) => { + try { + const artResult = await getArtworkBase64(romId); + if (artResult.base64) { + await SteamClient.Apps.SetCustomArtworkForApp(appId, artResult.base64, "png", 0); + logInfo(`Set cover artwork for ${name} (appId=${appId})`); } + } catch (artErr) { + logError(`Failed to fetch/set artwork for ${name}: ${artErr}`); } - - currentStep++; + })); + } + } + + // Remove shortcuts + if (!cancelled && data.remove_rom_ids.length > 0) { + const totalRemovals = data.remove_rom_ids.length; + updateSyncProgress({ + phase: "applying", current: 0, total: totalRemovals, + message: `Removing shortcuts 0/${totalRemovals}`, + step: currentStep, totalSteps, + }); + + for (let i = 0; i < data.remove_rom_ids.length; i++) { + const romId = data.remove_rom_ids[i]; + const appId = existing.get(romId); + if (appId) { + removeShortcut(appId); } - - // Report results to backend - try { - await reportSyncResults(romIdToAppId, removedRomIds, cancelled); - } catch (e) { - logError(`Failed to report sync results: ${e}`); + removedRomIds.push(romId); + updateSyncProgress({ + current: i + 1, + message: `Removing shortcuts ${i + 1}/${totalRemovals}`, + }); + await delay(50); + + if (_cancelRequested) { + logInfo("Cancel requested during removals"); + cancelled = true; + break; } - - const doneMsg = cancelled - ? `Sync cancelled (${Object.keys(romIdToAppId).length} processed)` - : "Sync complete"; - updateSyncProgress({ running: false, phase: "done", message: doneMsg }); - logInfo(`sync_apply ${cancelled ? "cancelled" : "complete"}: ${Object.keys(romIdToAppId).length} added/updated, ${removedRomIds.length} removed`); - } finally { - _isSyncRunning = false; } - }); + + currentStep++; + } + + try { + await reportSyncResults(romIdToAppId, removedRomIds, cancelled); + } catch (e) { + logError(`Failed to report sync results: ${e}`); + } + + const doneMsg = cancelled + ? `Sync cancelled (${Object.keys(romIdToAppId).length} processed)` + : "Sync complete"; + updateSyncProgress({ running: false, phase: "done", message: doneMsg }); + logInfo(`sync_apply ${cancelled ? "cancelled" : "complete"}: ${Object.keys(romIdToAppId).length} added/updated, ${removedRomIds.length} removed`); } diff --git a/tests/services/test_per_unit_pipeline.py b/tests/services/test_per_unit_pipeline.py new file mode 100644 index 0000000..e921fee --- /dev/null +++ b/tests/services/test_per_unit_pipeline.py @@ -0,0 +1,308 @@ +"""Tests for the per-unit sync pipeline (Feature 2). + +Tests the work queue builder, per-unit sync helpers, and report_unit_results. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from domain.sync_state import SyncState +from services.library import LibraryService + + +@pytest.fixture +def svc(tmp_path): + """Create a minimal LibraryService for unit tests.""" + import decky + + state = { + "shortcut_registry": {}, + "installed_roms": {}, + "last_sync": None, + "sync_stats": {}, + } + settings = { + "enabled_platforms": {}, + "enabled_collections": {}, + } + + romm_api = MagicMock() + steam_config = MagicMock() + steam_config.grid_dir.return_value = str(tmp_path / "grid") + + svc = LibraryService( + romm_api=romm_api, + steam_config=steam_config, + state=state, + settings=settings, + metadata_cache={}, + loop=asyncio.get_event_loop(), + logger=decky.logger, + plugin_dir=str(tmp_path), + emit=decky.emit, + save_state=MagicMock(), + save_settings_to_disk=MagicMock(), + log_debug=MagicMock(), + ) + return svc + + +@pytest.fixture(autouse=True) +async def _set_loop(svc): + svc._loop = asyncio.get_event_loop() + + +class TestBuildWorkQueue: + @pytest.mark.asyncio + async def test_platforms_only(self, svc): + svc._romm_api.list_platforms.return_value = [ + {"id": 4, "name": "Dreamcast", "slug": "dreamcast", "rom_count": 362}, + {"id": 19, "name": "PlayStation", "slug": "psx", "rom_count": 1978}, + ] + svc._settings["enabled_platforms"] = {"4": True, "19": True} + + work_queue, platforms, collections_meta = await svc._build_work_queue() + + assert len(work_queue) == 2 + assert work_queue[0]["type"] == "platform" + assert work_queue[0]["name"] == "Dreamcast" + assert work_queue[1]["name"] == "PlayStation" + assert len(platforms) == 2 + assert len(collections_meta) == 0 + + @pytest.mark.asyncio + async def test_platforms_and_collections(self, svc): + svc._romm_api.list_platforms.return_value = [ + {"id": 4, "name": "Dreamcast", "slug": "dreamcast", "rom_count": 362}, + ] + svc._settings["enabled_platforms"] = {"4": True} + svc._settings["enabled_collections"] = {"90": True} + + svc._romm_api.list_collections.return_value = [ + {"id": 90, "name": "Metroid", "rom_count": 11}, + ] + svc._romm_api.list_virtual_collections.return_value = [] + + work_queue, platforms, collections_meta = await svc._build_work_queue() + + assert len(work_queue) == 2 + assert work_queue[0]["type"] == "platform" + assert work_queue[1]["type"] == "collection" + assert work_queue[1]["name"] == "Metroid" + assert len(collections_meta) == 1 + + @pytest.mark.asyncio + async def test_empty_queue(self, svc): + svc._romm_api.list_platforms.return_value = [] + + work_queue, platforms, collections_meta = await svc._build_work_queue() + + assert len(work_queue) == 0 + assert len(platforms) == 0 + + @pytest.mark.asyncio + async def test_disabled_platforms_excluded(self, svc): + svc._romm_api.list_platforms.return_value = [ + {"id": 4, "name": "Dreamcast", "slug": "dreamcast", "rom_count": 362}, + {"id": 19, "name": "PlayStation", "slug": "psx", "rom_count": 1978}, + ] + svc._settings["enabled_platforms"] = {"4": True, "19": False} + + work_queue, platforms, _ = await svc._build_work_queue() + + assert len(work_queue) == 1 + assert work_queue[0]["name"] == "Dreamcast" + + +class TestReportUnitResults: + def test_updates_registry(self, svc, tmp_path): + svc._pending_sync = { + 1: {"name": "Game A", "platform_name": "DC", "platform_slug": "dreamcast", + "cover_path": "", "rom_id": 1, "fs_name": "game_a.zip"}, + 2: {"name": "Game B", "platform_name": "DC", "platform_slug": "dreamcast", + "cover_path": "", "rom_id": 2, "fs_name": "game_b.zip"}, + } + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + + result = svc.report_unit_results({"1": 100001, "2": 100002}) + + assert result["success"] is True + assert "1" in svc._state["shortcut_registry"] + assert svc._state["shortcut_registry"]["1"]["app_id"] == 100001 + assert "2" in svc._state["shortcut_registry"] + assert svc._state["shortcut_registry"]["2"]["app_id"] == 100002 + + def test_saves_state(self, svc, tmp_path): + svc._pending_sync = { + 1: {"name": "Game A", "platform_name": "DC", "platform_slug": "dreamcast", + "cover_path": "", "rom_id": 1, "fs_name": "game_a.zip"}, + } + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + + svc.report_unit_results({"1": 100001}) + + svc._save_state.assert_called_once() + + def test_signals_event(self, svc, tmp_path): + svc._pending_sync = {} + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + + svc.report_unit_results({"1": 100001}) + + assert svc._unit_result_event.is_set() + assert svc._unit_result_data == {"1": 100001} + + def test_accumulates_across_units(self, svc, tmp_path): + """Calling report_unit_results multiple times accumulates registry entries.""" + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + + svc._pending_sync = { + 1: {"name": "Game A", "platform_name": "DC", "platform_slug": "dreamcast", + "cover_path": "", "rom_id": 1, "fs_name": "a.zip"}, + } + svc.report_unit_results({"1": 100001}) + + # Reset event for next unit + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + + svc._pending_sync[2] = { + "name": "Game B", "platform_name": "SNES", "platform_slug": "snes", + "cover_path": "", "rom_id": 2, "fs_name": "b.zip", + } + svc.report_unit_results({"2": 100002}) + + assert "1" in svc._state["shortcut_registry"] + assert "2" in svc._state["shortcut_registry"] + assert svc._state["shortcut_registry"]["1"]["platform_name"] == "DC" + assert svc._state["shortcut_registry"]["2"]["platform_name"] == "SNES" + + +class TestSyncOnePlatform: + @pytest.mark.asyncio + async def test_fetches_and_emits(self, svc): + """_sync_one_platform fetches ROMs, builds shortcuts, and emits sync_apply_unit.""" + import decky + + synced_rom_ids = set() + + # Mock the full fetch to return 2 ROMs + async def mock_full_fetch(pid, pname, pslug, roms_list, pi, total): + roms_list.extend([ + {"id": 1, "name": "Sonic Adventure", "platform_name": "Dreamcast", + "platform_slug": "dreamcast"}, + {"id": 2, "name": "Jet Set Radio", "platform_name": "Dreamcast", + "platform_slug": "dreamcast"}, + ]) + + svc._full_fetch_platform_roms = mock_full_fetch + svc._try_incremental_skip = AsyncMock(return_value=False) + svc._artwork = None + + unit = { + "type": "platform", + "id": 4, + "name": "Dreamcast", + "slug": "dreamcast", + "rom_count": 2, + "_platform": {"id": 4, "name": "Dreamcast", "slug": "dreamcast", "rom_count": 2}, + } + + unit_roms, shortcuts_data = await svc._sync_one_platform(unit, synced_rom_ids, 0, 1) + + assert len(unit_roms) == 2 + assert len(shortcuts_data) == 2 + assert {1, 2} == synced_rom_ids + + # Verify sync_apply_unit was emitted + emit_calls = [c for c in decky.emit.call_args_list if c[0][0] == "sync_apply_unit"] + assert len(emit_calls) >= 1 + payload = emit_calls[-1][0][1] + assert payload["unit_type"] == "platform" + assert payload["unit_name"] == "Dreamcast" + assert len(payload["shortcuts"]) == 2 + + +class TestSyncOneCollection: + @pytest.mark.asyncio + async def test_deduplicates_against_synced(self, svc): + """Collection sync skips ROMs already in synced_rom_ids.""" + import decky + + synced_rom_ids = {1, 2} # Already synced from platform + + # Collection has ROM 1 (already synced) and ROM 3 (new) + async def mock_fetch_single(coll, all_seen, coll_only_roms): + all_rom_ids = [1, 3] + for rid in all_rom_ids: + if rid not in all_seen: + all_seen.add(rid) + coll_only_roms.append({ + "id": rid, "name": f"ROM {rid}", + "platform_name": "GBA", "platform_slug": "gba", + }) + return all_rom_ids + + svc._fetch_single_collection_roms = mock_fetch_single + svc._artwork = None + + unit = { + "type": "collection", + "id": "90", + "name": "Metroid", + "rom_count": 2, + "_collection": {"id": 90, "name": "Metroid"}, + } + + coll_roms, shortcuts_data, coll_name, coll_rom_ids = await svc._sync_one_collection( + unit, synced_rom_ids, 0, 1, + ) + + # Only ROM 3 should be in collection_only_roms (ROM 1 was deduped) + assert len(coll_roms) == 1 + assert coll_roms[0]["id"] == 3 + assert len(shortcuts_data) == 1 + assert coll_rom_ids == [1, 3] + assert 3 in synced_rom_ids + + +class TestWaitForUnitResults: + @pytest.mark.asyncio + async def test_returns_data_when_signaled(self, svc): + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + + # Signal from another task after a short delay + async def signal(): + await asyncio.sleep(0.05) + svc._unit_result_data = {"1": 100001} + svc._unit_result_event.set() + + asyncio.get_event_loop().create_task(signal()) + + result = await svc._wait_for_unit_results(timeout_sec=5) + assert result == {"1": 100001} + + @pytest.mark.asyncio + async def test_returns_none_on_cancellation(self, svc): + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + svc._sync_state = SyncState.CANCELLING + + result = await svc._wait_for_unit_results(timeout_sec=1) + assert result is None + + @pytest.mark.asyncio + async def test_returns_none_on_timeout(self, svc): + svc._unit_result_event = asyncio.Event() + svc._unit_result_data = None + svc._sync_last_heartbeat = 0 # Way in the past + + result = await svc._wait_for_unit_results(timeout_sec=0) + assert result is None