diff --git a/docs/dependencies.md b/docs/dependencies.md index 33cdb4de..9c79b7cf 100644 --- a/docs/dependencies.md +++ b/docs/dependencies.md @@ -528,10 +528,15 @@ dependencies: version: "1.2.0" depth: 2 resolved_by: "microsoft/apm-sample-package" +mcp_servers: + - acme-kb + - github ``` The `deployed_files` field tracks exactly which files APM placed in your project. This enables safe cleanup on `apm uninstall` and `apm prune` — only tracked files are removed. +The `mcp_servers` field records the MCP dependency references (e.g. `io.github.github/github-mcp-server`) for servers currently managed by APM. It is used to detect and clean up stale servers when dependencies change. + ### How It Works 1. **First install**: APM resolves dependencies, downloads packages, and writes `apm.lock` diff --git a/docs/manifest-schema.md b/docs/manifest-schema.md index a13ff9eb..23feee4d 100644 --- a/docs/manifest-schema.md +++ b/docs/manifest-schema.md @@ -360,6 +360,7 @@ dependencies: # YAML list (not a map) depth: # 1 = direct, 2+ = transitive resolved_by: # Parent dependency (transitive only) deployed_files: > # Workspace-relative paths of installed files +mcp_servers: > # MCP dependency references managed by APM (OPTIONAL, e.g. "io.github.github/github-mcp-server") ``` ### 6.2. Resolver Behaviour diff --git a/src/apm_cli/cli.py b/src/apm_cli/cli.py index 2a363ec9..50f47e92 100644 --- a/src/apm_cli/cli.py +++ b/src/apm_cli/cli.py @@ -754,6 +754,17 @@ def install(ctx, packages, runtime, exclude, only, update, dry_run, force, verbo apm_count = 0 prompt_count = 0 agent_count = 0 + + # Capture old MCP servers from lockfile BEFORE _install_apm_dependencies + # regenerates it (which drops the mcp_servers field). + # We always read this — even when --only=apm — so we can restore the + # field after the lockfile is regenerated by the APM install step. + old_mcp_servers: builtins.set = builtins.set() + _lock_path = Path.cwd() / "apm.lock" + _existing_lock = LockFile.read(_lock_path) + if _existing_lock: + old_mcp_servers = builtins.set(_existing_lock.mcp_servers) + if should_install_apm and apm_deps: if not APM_DEPS_AVAILABLE: _rich_error("APM dependency system not available") @@ -774,6 +785,12 @@ def install(ctx, packages, runtime, exclude, only, update, dry_run, force, verbo elif should_install_apm and not apm_deps: _rich_info("No APM dependencies found in apm.yml") + # When --update is used, package files on disk may have changed. + # Clear the parse cache so transitive MCP collection reads fresh data. + if update: + from apm_cli.models.apm_package import clear_apm_yml_cache + clear_apm_yml_cache() + # Collect transitive MCP dependencies from resolved APM packages apm_modules_path = Path.cwd() / "apm_modules" if should_install_mcp and apm_modules_path.exists(): @@ -785,10 +802,28 @@ def install(ctx, packages, runtime, exclude, only, update, dry_run, force, verbo # Continue with MCP installation (existing logic) mcp_count = 0 + new_mcp_servers: builtins.set = builtins.set() if should_install_mcp and mcp_deps: mcp_count = _install_mcp_dependencies(mcp_deps, runtime, exclude, verbose) + new_mcp_servers = _get_mcp_dep_names(mcp_deps) + + # Remove stale MCP servers that are no longer needed + stale_servers = old_mcp_servers - new_mcp_servers + if stale_servers: + _remove_stale_mcp_servers(stale_servers, runtime, exclude) + + # Persist the new MCP server set in the lockfile + _update_lockfile_mcp_servers(new_mcp_servers) elif should_install_mcp and not mcp_deps: + # No MCP deps at all — remove any old APM-managed servers + if old_mcp_servers: + _remove_stale_mcp_servers(old_mcp_servers, runtime, exclude) + _update_lockfile_mcp_servers(builtins.set()) _rich_warning("No MCP dependencies found in apm.yml") + elif not should_install_mcp and old_mcp_servers: + # --only=apm: APM install regenerated the lockfile and dropped + # mcp_servers. Restore the previous set so it is not lost. + _update_lockfile_mcp_servers(old_mcp_servers) # Show beautiful post-install summary _rich_blank_line() @@ -1236,6 +1271,10 @@ def uninstall(ctx, packages, dry_run): lockfile_path = get_lockfile_path(Path(".")) lockfile = LockFile.read(lockfile_path) + # Capture MCP servers from lockfile *before* it is mutated/deleted so + # that stale-MCP cleanup can compute the diff even when all deps are removed. + _pre_uninstall_mcp_servers = builtins.set(lockfile.mcp_servers) if lockfile else builtins.set() + if apm_modules_dir.exists(): deleted_pkg_paths: list = [] for package in packages_to_remove: @@ -1476,6 +1515,19 @@ def _find_transitive_orphans(lockfile, removed_urls): instructions_cleaned = result.get("files_removed", 0) # Phase 2: Re-integrate from remaining installed packages in apm_modules/ + # Detect target so we only re-create Claude artefacts when appropriate + from apm_cli.core.target_detection import ( + detect_target, + should_integrate_claude, + ) + config_target = apm_package.target + detected_target, _ = detect_target( + project_root=project_root, + explicit_target=None, + config_target=config_target, + ) + integrate_claude = should_integrate_claude(detected_target) + prompt_integrator = PromptIntegrator() agent_integrator = AgentIntegrator() skill_integrator = SkillIntegrator() @@ -1511,13 +1563,14 @@ def _find_transitive_orphans(lockfile, removed_urls): prompt_integrator.integrate_package_prompts(pkg_info, project_root) if agent_integrator.should_integrate(project_root): agent_integrator.integrate_package_agents(pkg_info, project_root) - if Path(".claude").exists(): + if integrate_claude: agent_integrator.integrate_package_agents_claude(pkg_info, project_root) skill_integrator.integrate_package_skill(pkg_info, project_root) - if command_integrator.should_integrate(project_root): + if integrate_claude: command_integrator.integrate_package_commands(pkg_info, project_root) hook_integrator_reint.integrate_package_hooks(pkg_info, project_root) - hook_integrator_reint.integrate_package_hooks_claude(pkg_info, project_root) + if integrate_claude: + hook_integrator_reint.integrate_package_hooks_claude(pkg_info, project_root) instruction_integrator.integrate_package_instructions(pkg_info, project_root) except Exception: pass # Best effort re-integration @@ -1539,6 +1592,27 @@ def _find_transitive_orphans(lockfile, removed_urls): if instructions_cleaned > 0: _rich_info(f"✓ Cleaned up {instructions_cleaned} instruction(s)") + # Clean up stale MCP servers after uninstall + try: + old_mcp_servers = _pre_uninstall_mcp_servers + if old_mcp_servers: + # Recompute MCP deps from remaining packages + apm_modules_path = Path.cwd() / "apm_modules" + remaining_mcp = _collect_transitive_mcp_deps(apm_modules_path, lockfile_path, trust_private=True) + # Also include root-level MCP deps from apm.yml + try: + remaining_root_mcp = apm_package.get_mcp_dependencies() + except Exception: + remaining_root_mcp = [] + all_remaining_mcp = _deduplicate_mcp_deps(remaining_root_mcp + remaining_mcp) + new_mcp_servers = _get_mcp_dep_names(all_remaining_mcp) + stale_servers = old_mcp_servers - new_mcp_servers + if stale_servers: + _remove_stale_mcp_servers(stale_servers) + _update_lockfile_mcp_servers(new_mcp_servers) + except Exception: + pass # best-effort MCP cleanup + # Final summary summary_lines = [] summary_lines.append( @@ -1676,6 +1750,9 @@ def download_callback(dep_ref, modules_dir): deps_to_install = flat_deps.get_installation_list() # If specific packages were requested, filter to only those + # **and their full transitive dependency subtrees** so that + # sub-deps (and their MCP servers) are installed and recorded + # in the lockfile. if only_packages: # Build identity set from user-supplied package specs. # Accepts any input form: git URLs, FQDN, shorthand. @@ -1687,6 +1764,21 @@ def download_callback(dep_ref, modules_dir): except Exception: only_identities.add(p) + # Expand the set to include transitive descendants of the + # requested packages so their MCP servers, primitives, etc. + # are correctly installed and written to the lockfile. + tree = dependency_graph.dependency_tree + + def _collect_descendants(node): + """Walk the tree and add every child identity.""" + for child in node.children: + only_identities.add(child.dependency_ref.get_identity()) + _collect_descendants(child) + + for node in tree.nodes.values(): + if node.dependency_ref.get_identity() in only_identities: + _collect_descendants(node) + deps_to_install = [ dep for dep in deps_to_install if dep.get_identity() in only_identities @@ -2411,7 +2503,19 @@ def download_callback(dep_ref, modules_dir): for dep_key, dep_files in package_deployed_files.items(): if dep_key in lockfile.dependencies: lockfile.dependencies[dep_key].deployed_files = dep_files + lockfile_path = get_lockfile_path(project_root) + + # When installing a subset of packages (apm install ), + # merge new entries into the existing lockfile instead of + # overwriting it — otherwise the uninstalled packages disappear. + if only_packages: + existing = LockFile.read(lockfile_path) + if existing: + for key, dep in lockfile.dependencies.items(): + existing.add_dependency(dep) + lockfile = existing + lockfile.save(lockfile_path) _rich_info(f"Generated apm.lock with {len(lockfile.dependencies)} dependencies") except Exception as e: @@ -2666,6 +2770,145 @@ def _apply_mcp_overlay(server_info_cache: dict, dep) -> None: ) +def _get_mcp_dep_names(mcp_deps: list) -> builtins.set: + """Extract unique server names from a list of MCP dependencies. + + Args: + mcp_deps: List of MCP dependency entries (MCPDependency objects or strings). + + Returns: + Set of MCP server names. + """ + names: builtins.set = builtins.set() + for dep in mcp_deps: + if hasattr(dep, "name"): + names.add(dep.name) + elif isinstance(dep, str): + names.add(dep) + return names + + +def _remove_stale_mcp_servers( + stale_names: builtins.set, + runtime: str = None, + exclude: str = None, +) -> None: + """Remove MCP server entries that are no longer required by any dependency. + + Cleans up runtime configuration files only for the runtimes that were + actually targeted during installation. ``stale_names`` contains MCP + dependency references (e.g. ``"io.github.github/github-mcp-server"``). + For Copilot CLI and Codex, config keys are derived from the last path + segment, so we match against both the full reference and the short name. + + Args: + stale_names: Set of MCP dependency references to remove. + runtime: If set, only clean this specific runtime. + exclude: If set, skip this runtime during cleanup. + """ + if not stale_names: + return + + # Determine which runtimes to clean, mirroring install-time logic. + all_runtimes = {"vscode", "copilot", "codex"} + if runtime: + target_runtimes = {runtime} + else: + target_runtimes = builtins.set(all_runtimes) + if exclude: + target_runtimes.discard(exclude) + + # Build an expanded set that includes both the full reference and the + # last-segment short name so we match config keys in every runtime. + expanded_stale: builtins.set = builtins.set() + for n in stale_names: + expanded_stale.add(n) + if "/" in n: + expanded_stale.add(n.rsplit("/", 1)[-1]) + + # Clean .vscode/mcp.json + if "vscode" in target_runtimes: + vscode_mcp = Path.cwd() / ".vscode" / "mcp.json" + if vscode_mcp.exists(): + try: + import json as _json + + config = _json.loads(vscode_mcp.read_text(encoding="utf-8")) + servers = config.get("servers", {}) + removed = [n for n in expanded_stale if n in servers] + for name in removed: + del servers[name] + if removed: + vscode_mcp.write_text( + _json.dumps(config, indent=2), encoding="utf-8" + ) + for name in removed: + _rich_info(f"✓ Removed stale MCP server '{name}' from .vscode/mcp.json") + except Exception: + pass # best-effort cleanup + + # Clean ~/.copilot/mcp-config.json + if "copilot" in target_runtimes: + copilot_mcp = Path.home() / ".copilot" / "mcp-config.json" + if copilot_mcp.exists(): + try: + import json as _json + + config = _json.loads(copilot_mcp.read_text(encoding="utf-8")) + servers = config.get("mcpServers", {}) + removed = [n for n in expanded_stale if n in servers] + for name in removed: + del servers[name] + if removed: + copilot_mcp.write_text( + _json.dumps(config, indent=2), encoding="utf-8" + ) + for name in removed: + _rich_info(f"✓ Removed stale MCP server '{name}' from Copilot CLI config") + except Exception: + pass # best-effort cleanup + + # Clean ~/.codex/config.toml (mcp_servers section) + if "codex" in target_runtimes: + codex_cfg = Path.home() / ".codex" / "config.toml" + if codex_cfg.exists(): + try: + import toml as _toml + + config = _toml.loads(codex_cfg.read_text(encoding="utf-8")) + servers = config.get("mcp_servers", {}) + removed = [n for n in expanded_stale if n in servers] + for name in removed: + del servers[name] + if removed: + codex_cfg.write_text( + _toml.dumps(config), encoding="utf-8" + ) + for name in removed: + _rich_info(f"✓ Removed stale MCP server '{name}' from Codex CLI config") + except Exception: + pass # best-effort cleanup + + +def _update_lockfile_mcp_servers(mcp_server_names: builtins.set) -> None: + """Update the lockfile with the current set of APM-managed MCP server names. + + Args: + mcp_server_names: Set of MCP server names currently managed by APM. + """ + lock_path = Path.cwd() / "apm.lock" + if not lock_path.exists(): + return + try: + lockfile = LockFile.read(lock_path) + if lockfile is None: + return + lockfile.mcp_servers = sorted(mcp_server_names) + lockfile.save(lock_path) + except Exception: + pass # best-effort + + def _install_mcp_dependencies( mcp_deps: list, runtime: str = None, exclude: str = None, verbose: bool = False ): diff --git a/src/apm_cli/deps/lockfile.py b/src/apm_cli/deps/lockfile.py index a2fefbb6..e7cb370e 100644 --- a/src/apm_cli/deps/lockfile.py +++ b/src/apm_cli/deps/lockfile.py @@ -118,6 +118,7 @@ class LockFile: ) apm_version: Optional[str] = None dependencies: Dict[str, LockedDependency] = field(default_factory=dict) + mcp_servers: List[str] = field(default_factory=list) def add_dependency(self, dep: LockedDependency) -> None: """Add a dependency to the lock file.""" @@ -146,6 +147,8 @@ def to_yaml(self) -> str: if self.apm_version: data["apm_version"] = self.apm_version data["dependencies"] = [dep.to_dict() for dep in self.get_all_dependencies()] + if self.mcp_servers: + data["mcp_servers"] = sorted(self.mcp_servers) return yaml.dump( data, default_flow_style=False, sort_keys=False, allow_unicode=True ) @@ -165,6 +168,7 @@ def from_yaml(cls, yaml_str: str) -> "LockFile": ) for dep_data in data.get("dependencies", []): lock.add_dependency(LockedDependency.from_dict(dep_data)) + lock.mcp_servers = list(data.get("mcp_servers", [])) return lock def write(self, path: Path) -> None: diff --git a/tests/integration/test_selective_install_mcp.py b/tests/integration/test_selective_install_mcp.py new file mode 100644 index 00000000..a5758bb4 --- /dev/null +++ b/tests/integration/test_selective_install_mcp.py @@ -0,0 +1,439 @@ +"""Integration test: selective install collects transitive MCP dependencies. + +Exercises the full CLI install path — dependency resolution, lockfile generation, +transitive MCP collection, deduplication, and lockfile mcp_servers bookkeeping — +using a synthetic package tree with no network calls. + +This is the integration-level complement to the unit tests in +tests/unit/test_mcp_lifecycle_e2e.py, verifying the same flows through +the real CLI entry point instead of calling internal functions directly. +""" + +import json +import os +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +import yaml +from click.testing import CliRunner + +from apm_cli.deps.lockfile import LockedDependency, LockFile + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _write_apm_yml(path: Path, *, name: str = "test-project", deps: list = None, mcp: list = None): + """Write a minimal apm.yml.""" + data = {"name": name, "version": "1.0.0", "dependencies": {}} + if deps: + data["dependencies"]["apm"] = deps + if mcp: + data["dependencies"]["mcp"] = mcp + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.dump(data, default_flow_style=False), encoding="utf-8") + + +def _make_pkg(apm_modules: Path, repo_url: str, *, name: str = None, + mcp: list = None, apm_deps: list = None): + """Create a package directory with apm.yml under apm_modules.""" + pkg_dir = apm_modules / repo_url + pkg_dir.mkdir(parents=True, exist_ok=True) + pkg_name = name or repo_url.split("/")[-1] + _write_apm_yml( + pkg_dir / "apm.yml", + name=pkg_name, + deps=apm_deps, + mcp=mcp, + ) + + +def _seed_lockfile(path: Path, locked_deps: list, mcp_servers: list = None): + """Write a lockfile pre-populated with given dependencies.""" + lf = LockFile() + for dep in locked_deps: + lf.add_dependency(dep) + if mcp_servers: + lf.mcp_servers = mcp_servers + lf.write(path) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture() +def cli_env(tmp_path): + """Set up a synthetic project tree and return (tmp_path, runner). + + Layout:: + + apm.yml (root) — depends on acme/squad-alpha + apm_modules/ + acme/squad-alpha/apm.yml — depends on acme/infra-cloud + acme/infra-cloud/apm.yml — mcp: [ghcr.io/acme/mcp-alpha, ghcr.io/acme/mcp-beta] + apm.lock — pre-seeded so the install treats packages as cached + """ + orig_cwd = os.getcwd() + os.chdir(tmp_path) + + apm_modules = tmp_path / "apm_modules" + + # Root project declares squad-alpha as a dep + _write_apm_yml(tmp_path / "apm.yml", deps=["acme/squad-alpha"]) + + # squad-alpha has no MCP, depends on infra-cloud + _make_pkg(apm_modules, "acme/squad-alpha", apm_deps=["acme/infra-cloud"]) + + # infra-cloud declares two MCP servers + _make_pkg(apm_modules, "acme/infra-cloud", mcp=[ + "ghcr.io/acme/mcp-alpha", + "ghcr.io/acme/mcp-beta", + ]) + + # Pre-seed a lockfile so the install loop treats packages as cached + _seed_lockfile(tmp_path / "apm.lock", [ + LockedDependency(repo_url="acme/squad-alpha", depth=1, + resolved_by=None, resolved_commit="cached"), + LockedDependency(repo_url="acme/infra-cloud", depth=2, + resolved_by="acme/squad-alpha", resolved_commit="cached"), + ]) + + yield tmp_path, CliRunner() + + os.chdir(orig_cwd) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestSelectiveInstallTransitiveMCPIntegration: + """CLI-level integration: `apm install acme/squad-alpha` must collect + transitive MCP deps from acme/infra-cloud and persist them.""" + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli._validate_package_exists", return_value=True) + @patch("apm_cli.cli._install_mcp_dependencies", return_value=0) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_lockfile_records_transitive_mcp_servers( + self, mock_dl_cls, mock_mcp_install, mock_validate, mock_updates, cli_env + ): + tmp_path, runner = cli_env + from apm_cli.cli import cli + + result = runner.invoke(cli, [ + "install", "acme/squad-alpha", "--trust-transitive-mcp", + ]) + + # The command should succeed (exit 0) + assert result.exit_code == 0, f"CLI failed:\n{result.output}\n{getattr(result, 'stderr', '')}" + + # Lockfile must contain both packages + lockfile = LockFile.read(tmp_path / "apm.lock") + assert lockfile is not None + dep_keys = set(lockfile.dependencies.keys()) + assert "acme/squad-alpha" in dep_keys + assert "acme/infra-cloud" in dep_keys + + # Lockfile mcp_servers must list the transitive MCP deps + assert "ghcr.io/acme/mcp-alpha" in lockfile.mcp_servers + assert "ghcr.io/acme/mcp-beta" in lockfile.mcp_servers + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli._validate_package_exists", return_value=True) + @patch("apm_cli.cli._install_mcp_dependencies", return_value=0) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_install_mcp_receives_transitive_deps( + self, mock_dl_cls, mock_mcp_install, mock_validate, mock_updates, cli_env + ): + """_install_mcp_dependencies must be called with transitive deps.""" + tmp_path, runner = cli_env + from apm_cli.cli import cli + + runner.invoke(cli, [ + "install", "acme/squad-alpha", "--trust-transitive-mcp", + ]) + + # _install_mcp_dependencies should have been called with the MCP deps + mock_mcp_install.assert_called_once() + mcp_deps_arg = mock_mcp_install.call_args[0][0] + dep_names = {d.name for d in mcp_deps_arg} + assert "ghcr.io/acme/mcp-alpha" in dep_names + assert "ghcr.io/acme/mcp-beta" in dep_names + + +class TestDeepChainIntegration: + """CLI-level: A → B → C → D where only D declares MCP. + `apm install acme/pkg-a` must record D's MCP in the lockfile.""" + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli._validate_package_exists", return_value=True) + @patch("apm_cli.cli._install_mcp_dependencies", return_value=0) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_deep_chain_mcp_in_lockfile( + self, mock_dl_cls, mock_mcp_install, mock_validate, mock_updates, tmp_path + ): + orig_cwd = os.getcwd() + os.chdir(tmp_path) + try: + apm_modules = tmp_path / "apm_modules" + + _write_apm_yml(tmp_path / "apm.yml", deps=["acme/pkg-a"]) + _make_pkg(apm_modules, "acme/pkg-a", apm_deps=["acme/pkg-b"]) + _make_pkg(apm_modules, "acme/pkg-b", apm_deps=["acme/pkg-c"]) + _make_pkg(apm_modules, "acme/pkg-c", apm_deps=["acme/pkg-d"]) + _make_pkg(apm_modules, "acme/pkg-d", mcp=["ghcr.io/acme/mcp-deep"]) + + _seed_lockfile(tmp_path / "apm.lock", [ + LockedDependency(repo_url="acme/pkg-a", depth=1, + resolved_by=None, resolved_commit="cached"), + LockedDependency(repo_url="acme/pkg-b", depth=2, + resolved_by="acme/pkg-a", resolved_commit="cached"), + LockedDependency(repo_url="acme/pkg-c", depth=3, + resolved_by="acme/pkg-b", resolved_commit="cached"), + LockedDependency(repo_url="acme/pkg-d", depth=4, + resolved_by="acme/pkg-c", resolved_commit="cached"), + ]) + + from apm_cli.cli import cli + runner = CliRunner() + result = runner.invoke(cli, [ + "install", "acme/pkg-a", "--trust-transitive-mcp", + ]) + + assert result.exit_code == 0, ( + f"CLI failed:\n{result.output}\n{getattr(result, 'stderr', '')}" + ) + + lockfile = LockFile.read(tmp_path / "apm.lock") + assert "acme/pkg-d" in lockfile.dependencies + assert "ghcr.io/acme/mcp-deep" in lockfile.mcp_servers + finally: + os.chdir(orig_cwd) + + +class TestDiamondDependencyIntegration: + """CLI-level: A → B, A → C, B → D, C → D where D has MCP. + MCP from D must appear once in lockfile.""" + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli._validate_package_exists", return_value=True) + @patch("apm_cli.cli._install_mcp_dependencies", return_value=0) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_diamond_mcp_in_lockfile( + self, mock_dl_cls, mock_mcp_install, mock_validate, mock_updates, tmp_path + ): + orig_cwd = os.getcwd() + os.chdir(tmp_path) + try: + apm_modules = tmp_path / "apm_modules" + + _write_apm_yml(tmp_path / "apm.yml", deps=["acme/pkg-a"]) + _make_pkg(apm_modules, "acme/pkg-a", + apm_deps=["acme/pkg-b", "acme/pkg-c"]) + _make_pkg(apm_modules, "acme/pkg-b", apm_deps=["acme/pkg-d"]) + _make_pkg(apm_modules, "acme/pkg-c", apm_deps=["acme/pkg-d"]) + _make_pkg(apm_modules, "acme/pkg-d", mcp=[ + "ghcr.io/acme/mcp-shared", + ]) + + _seed_lockfile(tmp_path / "apm.lock", [ + LockedDependency(repo_url="acme/pkg-a", depth=1, + resolved_by=None, resolved_commit="cached"), + LockedDependency(repo_url="acme/pkg-b", depth=2, + resolved_by="acme/pkg-a", resolved_commit="cached"), + LockedDependency(repo_url="acme/pkg-c", depth=2, + resolved_by="acme/pkg-a", resolved_commit="cached"), + LockedDependency(repo_url="acme/pkg-d", depth=3, + resolved_by="acme/pkg-b", resolved_commit="cached"), + ]) + + from apm_cli.cli import cli + runner = CliRunner() + result = runner.invoke(cli, [ + "install", "acme/pkg-a", "--trust-transitive-mcp", + ]) + + assert result.exit_code == 0, ( + f"CLI failed:\n{result.output}\n{getattr(result, 'stderr', '')}" + ) + + lockfile = LockFile.read(tmp_path / "apm.lock") + assert "ghcr.io/acme/mcp-shared" in lockfile.mcp_servers + # No duplicates in lockfile + assert lockfile.mcp_servers.count("ghcr.io/acme/mcp-shared") == 1 + finally: + os.chdir(orig_cwd) + + +class TestMultiPackageSelectiveInstallIntegration: + """CLI-level: `apm install acme/pkg-x acme/pkg-y` — each package + brings its own transitive MCP deps, both must appear.""" + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli._validate_package_exists", return_value=True) + @patch("apm_cli.cli._install_mcp_dependencies", return_value=0) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_multiple_packages_mcp_merged( + self, mock_dl_cls, mock_mcp_install, mock_validate, mock_updates, tmp_path + ): + orig_cwd = os.getcwd() + os.chdir(tmp_path) + try: + apm_modules = tmp_path / "apm_modules" + + _write_apm_yml(tmp_path / "apm.yml", + deps=["acme/pkg-x", "acme/pkg-y"]) + + # pkg-x → dep-x (has mcp-x) + _make_pkg(apm_modules, "acme/pkg-x", apm_deps=["acme/dep-x"]) + _make_pkg(apm_modules, "acme/dep-x", mcp=["ghcr.io/acme/mcp-x"]) + + # pkg-y → dep-y (has mcp-y) + _make_pkg(apm_modules, "acme/pkg-y", apm_deps=["acme/dep-y"]) + _make_pkg(apm_modules, "acme/dep-y", mcp=["ghcr.io/acme/mcp-y"]) + + _seed_lockfile(tmp_path / "apm.lock", [ + LockedDependency(repo_url="acme/pkg-x", depth=1, + resolved_by=None, resolved_commit="cached"), + LockedDependency(repo_url="acme/dep-x", depth=2, + resolved_by="acme/pkg-x", resolved_commit="cached"), + LockedDependency(repo_url="acme/pkg-y", depth=1, + resolved_by=None, resolved_commit="cached"), + LockedDependency(repo_url="acme/dep-y", depth=2, + resolved_by="acme/pkg-y", resolved_commit="cached"), + ]) + + from apm_cli.cli import cli + runner = CliRunner() + result = runner.invoke(cli, [ + "install", "acme/pkg-x", "acme/pkg-y", + "--trust-transitive-mcp", + ]) + + assert result.exit_code == 0, ( + f"CLI failed:\n{result.output}\n{getattr(result, 'stderr', '')}" + ) + + lockfile = LockFile.read(tmp_path / "apm.lock") + assert "ghcr.io/acme/mcp-x" in lockfile.mcp_servers + assert "ghcr.io/acme/mcp-y" in lockfile.mcp_servers + finally: + os.chdir(orig_cwd) + + +class TestFullInstallTransitiveMCPIntegration: + """CLI-level integration: plain `apm install` (no specific packages) + must also collect transitive MCP deps.""" + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli._install_mcp_dependencies", return_value=0) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_full_install_collects_transitive_mcp( + self, mock_dl_cls, mock_mcp_install, mock_updates, cli_env + ): + tmp_path, runner = cli_env + from apm_cli.cli import cli + + result = runner.invoke(cli, ["install", "--trust-transitive-mcp"]) + + assert result.exit_code == 0, f"CLI failed:\n{result.output}\n{getattr(result, 'stderr', '')}" + + lockfile = LockFile.read(tmp_path / "apm.lock") + assert lockfile is not None + assert "ghcr.io/acme/mcp-alpha" in lockfile.mcp_servers + assert "ghcr.io/acme/mcp-beta" in lockfile.mcp_servers + + +class TestStaleRemovalAfterUpdate: + """When a package drops/renames an MCP server, stale entries must be + removed from .vscode/mcp.json during install --update.""" + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli._install_mcp_dependencies", return_value=0) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_stale_mcp_removed_on_update( + self, mock_dl_cls, mock_mcp_install, mock_updates, tmp_path + ): + orig_cwd = os.getcwd() + os.chdir(tmp_path) + try: + apm_modules = tmp_path / "apm_modules" + + # Root depends on infra-cloud + _write_apm_yml(tmp_path / "apm.yml", deps=["acme/infra-cloud"]) + + # infra-cloud NOW declares only mcp-beta (dropped mcp-alpha) + _make_pkg(apm_modules, "acme/infra-cloud", mcp=[ + "ghcr.io/acme/mcp-beta", + ]) + + # Pre-existing lockfile still references both servers + _seed_lockfile(tmp_path / "apm.lock", [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, + resolved_by=None, resolved_commit="cached"), + ], mcp_servers=["ghcr.io/acme/mcp-alpha", "ghcr.io/acme/mcp-beta"]) + + # Pre-existing .vscode/mcp.json has both servers + mcp_json = tmp_path / ".vscode" / "mcp.json" + mcp_json.parent.mkdir(parents=True, exist_ok=True) + mcp_json.write_text(json.dumps({ + "servers": { + "ghcr.io/acme/mcp-alpha": {"command": "npx", "args": ["alpha"]}, + "ghcr.io/acme/mcp-beta": {"command": "npx", "args": ["beta"]}, + } + }, indent=2), encoding="utf-8") + + from apm_cli.cli import cli + runner = CliRunner() + result = runner.invoke(cli, [ + "install", "--trust-transitive-mcp", + ]) + + assert result.exit_code == 0, f"CLI failed:\n{result.output}\n{getattr(result, 'stderr', '')}" + + # Stale server must be removed from mcp.json + updated = json.loads(mcp_json.read_text(encoding="utf-8")) + assert "ghcr.io/acme/mcp-alpha" not in updated["servers"] + assert "ghcr.io/acme/mcp-beta" in updated["servers"] + + # Lockfile must only list the remaining server + lockfile = LockFile.read(tmp_path / "apm.lock") + assert "ghcr.io/acme/mcp-alpha" not in lockfile.mcp_servers + assert "ghcr.io/acme/mcp-beta" in lockfile.mcp_servers + + finally: + os.chdir(orig_cwd) + + +class TestNoMCPWhenOnlyAPM: + """With --only=apm, MCP collection must be skipped but existing + lockfile mcp_servers must be preserved.""" + + @patch("apm_cli.cli.check_for_updates", return_value=None) + @patch("apm_cli.cli.GitHubPackageDownloader") + def test_only_apm_preserves_mcp_servers( + self, mock_dl_cls, mock_updates, cli_env + ): + tmp_path, runner = cli_env + + # Seed lockfile with existing MCP servers + _seed_lockfile(tmp_path / "apm.lock", [ + LockedDependency(repo_url="acme/squad-alpha", depth=1, + resolved_by=None, resolved_commit="cached"), + LockedDependency(repo_url="acme/infra-cloud", depth=2, + resolved_by="acme/squad-alpha", resolved_commit="cached"), + ], mcp_servers=["ghcr.io/acme/mcp-alpha", "ghcr.io/acme/mcp-beta"]) + + from apm_cli.cli import cli + result = runner.invoke(cli, ["install", "--only=apm"]) + + assert result.exit_code == 0, f"CLI failed:\n{result.output}\n{getattr(result, 'stderr', '')}" + + # MCP servers must be preserved (not wiped) even with --only=apm + lockfile = LockFile.read(tmp_path / "apm.lock") + assert "ghcr.io/acme/mcp-alpha" in lockfile.mcp_servers + assert "ghcr.io/acme/mcp-beta" in lockfile.mcp_servers diff --git a/tests/test_lockfile.py b/tests/test_lockfile.py index d50cec37..9f77cf28 100644 --- a/tests/test_lockfile.py +++ b/tests/test_lockfile.py @@ -70,6 +70,35 @@ def test_write_and_read(self, tmp_path): assert loaded is not None assert loaded.has_dependency("owner/repo") + def test_mcp_servers_round_trip(self, tmp_path): + """mcp_servers must survive a write → read cycle.""" + lock = LockFile(apm_version="1.0.0") + lock.mcp_servers = ["github", "acme-kb", "atlassian"] + lock.add_dependency(LockedDependency(repo_url="owner/repo")) + lock_path = tmp_path / "apm.lock" + lock.write(lock_path) + + loaded = LockFile.read(lock_path) + assert loaded is not None + assert loaded.mcp_servers == ["acme-kb", "atlassian", "github"] # sorted + + def test_mcp_servers_empty_by_default(self): + lock = LockFile() + assert lock.mcp_servers == [] + yaml_str = lock.to_yaml() + assert "mcp_servers" not in yaml_str # omitted when empty + + def test_mcp_servers_from_yaml(self): + yaml_str = ( + 'lockfile_version: "1"\n' + 'dependencies: []\n' + 'mcp_servers:\n' + ' - github\n' + ' - acme-kb\n' + ) + lock = LockFile.from_yaml(yaml_str) + assert lock.mcp_servers == ["github", "acme-kb"] + def test_read_nonexistent(self, tmp_path): loaded = LockFile.read(tmp_path / "apm.lock") assert loaded is None diff --git a/tests/unit/test_mcp_lifecycle_e2e.py b/tests/unit/test_mcp_lifecycle_e2e.py new file mode 100644 index 00000000..6575f85e --- /dev/null +++ b/tests/unit/test_mcp_lifecycle_e2e.py @@ -0,0 +1,650 @@ +"""End-to-end integration tests for the MCP lifecycle across install/update/uninstall. + +Exercises the full chain: transitive MCP collection, deduplication, +stale-server removal, and lockfile MCP bookkeeping — using synthetic +package names only (no private/project-specific identifiers). +""" + +import json +import os +import tempfile +from pathlib import Path + +import pytest +import yaml + +from apm_cli.cli import ( + _collect_transitive_mcp_deps, + _deduplicate_mcp_deps, + _get_mcp_dep_names, + _remove_stale_mcp_servers, + _update_lockfile_mcp_servers, +) +from apm_cli.deps.lockfile import LockedDependency, LockFile + + +# --------------------------------------------------------------------------- +# Helpers — mirror the per-file convention used across the test suite. +# --------------------------------------------------------------------------- + +def _write_apm_yml(path: Path, deps: list = None, mcp: list = None, name: str = "test-project"): + """Write a minimal apm.yml at *path* with optional APM and MCP deps.""" + data = {"name": name, "version": "1.0.0", "dependencies": {}} + if deps: + data["dependencies"]["apm"] = deps + if mcp: + data["dependencies"]["mcp"] = mcp + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.dump(data, default_flow_style=False), encoding="utf-8") + + +def _make_pkg_dir(apm_modules: Path, repo_url: str, mcp: list = None, + apm_deps: list = None, name: str = None, virtual_path: str = None): + """Create a package directory under apm_modules with an apm.yml.""" + base = apm_modules / repo_url + if virtual_path: + base = base / virtual_path + base.mkdir(parents=True, exist_ok=True) + pkg_name = name or repo_url.split("/")[-1] + data = {"name": pkg_name, "version": "1.0.0", "dependencies": {}} + if mcp: + data["dependencies"]["mcp"] = mcp + if apm_deps: + data["dependencies"]["apm"] = apm_deps + (base / "apm.yml").write_text(yaml.dump(data, default_flow_style=False), encoding="utf-8") + + +def _write_lockfile(path: Path, locked_deps: list, mcp_servers: list = None): + """Write a lockfile from LockedDependency list and optional MCP server names.""" + lf = LockFile() + for dep in locked_deps: + lf.add_dependency(dep) + if mcp_servers: + lf.mcp_servers = mcp_servers + lf.write(path) + + +def _write_mcp_json(path: Path, servers: dict): + """Write a .vscode/mcp.json file.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps({"servers": servers}, indent=2), encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Scenario 1 — Selective install with transitive MCP deps +# --------------------------------------------------------------------------- +class TestSelectiveInstallTransitiveMCP: + """When `apm install acme/squad-alpha` is requested, the lockfile-scoped + MCP collector should find MCP servers declared by transitive deps + of squad-alpha even though squad-alpha itself has no MCP section.""" + + def setup_method(self): + self._orig_cwd = os.getcwd() + + def teardown_method(self): + os.chdir(self._orig_cwd) + + def test_transitive_mcp_collected_through_lockfile(self, tmp_path): + os.chdir(tmp_path) + apm_modules = tmp_path / "apm_modules" + + # squad-alpha has no MCP, depends on infra-cloud + _make_pkg_dir(apm_modules, "acme/squad-alpha", apm_deps=["acme/infra-cloud"]) + + # infra-cloud declares two MCP servers + _make_pkg_dir(apm_modules, "acme/infra-cloud", mcp=[ + "ghcr.io/acme/mcp-server-alpha", + "ghcr.io/acme/mcp-server-beta", + ]) + + # Lockfile records both packages + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/squad-alpha", depth=1, resolved_by="root"), + LockedDependency(repo_url="acme/infra-cloud", depth=2, resolved_by="acme/squad-alpha"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + names = [d.name for d in result] + assert "ghcr.io/acme/mcp-server-alpha" in names + assert "ghcr.io/acme/mcp-server-beta" in names + + def test_orphan_pkg_mcp_not_collected(self, tmp_path): + """A package in apm_modules but NOT in the lockfile should be ignored.""" + os.chdir(tmp_path) + apm_modules = tmp_path / "apm_modules" + + _make_pkg_dir(apm_modules, "acme/squad-alpha") + _make_pkg_dir(apm_modules, "acme/orphan-pkg", mcp=["ghcr.io/acme/orphan-server"]) + + # Only squad-alpha is locked + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/squad-alpha", depth=1, resolved_by="root"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + names = [d.name for d in result] + assert "ghcr.io/acme/orphan-server" not in names + + +# --------------------------------------------------------------------------- +# Scenario 1b — Deep transitive chain (3+ levels) +# --------------------------------------------------------------------------- +class TestDeepTransitiveChainMCP: + """MCP servers declared at depth 4 (A → B → C → D) must be collected + when only A is locked at depth 1. Exercises the full recursive walk.""" + + def setup_method(self): + self._orig_cwd = os.getcwd() + + def teardown_method(self): + os.chdir(self._orig_cwd) + + def test_depth_four_mcp_collected(self, tmp_path): + os.chdir(tmp_path) + apm_modules = tmp_path / "apm_modules" + + # A → B → C → D (D has MCP) + _make_pkg_dir(apm_modules, "acme/pkg-a", apm_deps=["acme/pkg-b"]) + _make_pkg_dir(apm_modules, "acme/pkg-b", apm_deps=["acme/pkg-c"]) + _make_pkg_dir(apm_modules, "acme/pkg-c", apm_deps=["acme/pkg-d"]) + _make_pkg_dir(apm_modules, "acme/pkg-d", mcp=[ + "ghcr.io/acme/mcp-deep-server", + ]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/pkg-a", depth=1, resolved_by="root"), + LockedDependency(repo_url="acme/pkg-b", depth=2, resolved_by="acme/pkg-a"), + LockedDependency(repo_url="acme/pkg-c", depth=3, resolved_by="acme/pkg-b"), + LockedDependency(repo_url="acme/pkg-d", depth=4, resolved_by="acme/pkg-c"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + names = [d.name for d in result] + assert "ghcr.io/acme/mcp-deep-server" in names + + def test_mcp_at_every_level_collected(self, tmp_path): + """Each level in the chain has its own MCP — all must appear.""" + os.chdir(tmp_path) + apm_modules = tmp_path / "apm_modules" + + _make_pkg_dir(apm_modules, "acme/pkg-a", apm_deps=["acme/pkg-b"], + mcp=["ghcr.io/acme/mcp-level-1"]) + _make_pkg_dir(apm_modules, "acme/pkg-b", apm_deps=["acme/pkg-c"], + mcp=["ghcr.io/acme/mcp-level-2"]) + _make_pkg_dir(apm_modules, "acme/pkg-c", + mcp=["ghcr.io/acme/mcp-level-3"]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/pkg-a", depth=1, resolved_by="root"), + LockedDependency(repo_url="acme/pkg-b", depth=2, resolved_by="acme/pkg-a"), + LockedDependency(repo_url="acme/pkg-c", depth=3, resolved_by="acme/pkg-b"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + names = [d.name for d in result] + assert "ghcr.io/acme/mcp-level-1" in names + assert "ghcr.io/acme/mcp-level-2" in names + assert "ghcr.io/acme/mcp-level-3" in names + + +# --------------------------------------------------------------------------- +# Scenario 1c — Diamond dependency (A → B, A → C, B → D, C → D) +# --------------------------------------------------------------------------- +class TestDiamondDependencyMCP: + """When two branches of the tree converge on the same leaf (diamond), + MCP servers from the shared leaf must appear exactly once.""" + + def setup_method(self): + self._orig_cwd = os.getcwd() + + def teardown_method(self): + os.chdir(self._orig_cwd) + + def test_diamond_mcp_collected_once(self, tmp_path): + os.chdir(tmp_path) + apm_modules = tmp_path / "apm_modules" + + # A → B, A → C, B → D, C → D + _make_pkg_dir(apm_modules, "acme/pkg-a", apm_deps=["acme/pkg-b", "acme/pkg-c"]) + _make_pkg_dir(apm_modules, "acme/pkg-b", apm_deps=["acme/pkg-d"]) + _make_pkg_dir(apm_modules, "acme/pkg-c", apm_deps=["acme/pkg-d"]) + _make_pkg_dir(apm_modules, "acme/pkg-d", mcp=[ + "ghcr.io/acme/mcp-shared-server", + ]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/pkg-a", depth=1, resolved_by="root"), + LockedDependency(repo_url="acme/pkg-b", depth=2, resolved_by="acme/pkg-a"), + LockedDependency(repo_url="acme/pkg-c", depth=2, resolved_by="acme/pkg-a"), + LockedDependency(repo_url="acme/pkg-d", depth=3, resolved_by="acme/pkg-b"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + names = [d.name for d in result] + assert "ghcr.io/acme/mcp-shared-server" in names + + # After dedup, exactly one entry + merged = _deduplicate_mcp_deps(result) + merged_names = [d.name for d in merged] + assert merged_names.count("ghcr.io/acme/mcp-shared-server") == 1 + + def test_diamond_multiple_mcp_from_branches(self, tmp_path): + """Each branch also contributes its own MCP — all must be present.""" + os.chdir(tmp_path) + apm_modules = tmp_path / "apm_modules" + + _make_pkg_dir(apm_modules, "acme/pkg-a", apm_deps=["acme/pkg-b", "acme/pkg-c"]) + _make_pkg_dir(apm_modules, "acme/pkg-b", apm_deps=["acme/pkg-d"], + mcp=["ghcr.io/acme/mcp-branch-b"]) + _make_pkg_dir(apm_modules, "acme/pkg-c", apm_deps=["acme/pkg-d"], + mcp=["ghcr.io/acme/mcp-branch-c"]) + _make_pkg_dir(apm_modules, "acme/pkg-d", mcp=["ghcr.io/acme/mcp-leaf"]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/pkg-a", depth=1, resolved_by="root"), + LockedDependency(repo_url="acme/pkg-b", depth=2, resolved_by="acme/pkg-a"), + LockedDependency(repo_url="acme/pkg-c", depth=2, resolved_by="acme/pkg-a"), + LockedDependency(repo_url="acme/pkg-d", depth=3, resolved_by="acme/pkg-b"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + merged = _deduplicate_mcp_deps(result) + names = [d.name for d in merged] + assert "ghcr.io/acme/mcp-branch-b" in names + assert "ghcr.io/acme/mcp-branch-c" in names + assert "ghcr.io/acme/mcp-leaf" in names + assert len(names) == 3 + + +# --------------------------------------------------------------------------- +# Scenario 2 — Uninstall removes transitive MCP servers +# --------------------------------------------------------------------------- +class TestUninstallRemovesTransitiveMCP: + """After uninstalling a package, _remove_stale_mcp_servers should remove + MCP entries that are no longer referenced by any remaining package.""" + + def setup_method(self): + self._orig_cwd = os.getcwd() + + def teardown_method(self): + os.chdir(self._orig_cwd) + + def test_stale_servers_removed_from_mcp_json(self, tmp_path): + os.chdir(tmp_path) + + # Pre-existing .vscode/mcp.json with servers from two packages + mcp_json = tmp_path / ".vscode" / "mcp.json" + _write_mcp_json(mcp_json, { + "ghcr.io/acme/mcp-server-alpha": {"command": "npx", "args": ["alpha"]}, + "ghcr.io/acme/mcp-server-beta": {"command": "npx", "args": ["beta"]}, + "ghcr.io/acme/mcp-server-gamma": {"command": "npx", "args": ["gamma"]}, + }) + + # Suppose infra-cloud (alpha + beta) was uninstalled, gamma remains. + old_servers = {"ghcr.io/acme/mcp-server-alpha", "ghcr.io/acme/mcp-server-beta", "ghcr.io/acme/mcp-server-gamma"} + new_servers = {"ghcr.io/acme/mcp-server-gamma"} + stale = old_servers - new_servers + + _remove_stale_mcp_servers(stale, runtime="vscode") + + updated = json.loads(mcp_json.read_text(encoding="utf-8")) + assert "ghcr.io/acme/mcp-server-alpha" not in updated["servers"] + assert "ghcr.io/acme/mcp-server-beta" not in updated["servers"] + assert "ghcr.io/acme/mcp-server-gamma" in updated["servers"] + + def test_lockfile_mcp_list_updated_after_uninstall(self, tmp_path): + os.chdir(tmp_path) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/base-lib", depth=1, resolved_by="root"), + ], mcp_servers=["ghcr.io/acme/mcp-server-alpha", "ghcr.io/acme/mcp-server-beta"]) + + # After uninstall, only beta remains + _update_lockfile_mcp_servers({"ghcr.io/acme/mcp-server-beta"}) + + reloaded = LockFile.read(lock_path) + assert reloaded.mcp_servers == ["ghcr.io/acme/mcp-server-beta"] + + def test_lockfile_mcp_cleared_when_all_removed(self, tmp_path): + os.chdir(tmp_path) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/base-lib", depth=1, resolved_by="root"), + ], mcp_servers=["ghcr.io/acme/mcp-server-alpha"]) + + _update_lockfile_mcp_servers(set()) + + reloaded = LockFile.read(lock_path) + assert reloaded.mcp_servers == [] + + +# --------------------------------------------------------------------------- +# Scenario 3 — Update with MCP rename (stale removed, new present) +# --------------------------------------------------------------------------- +class TestUpdateMCPRename: + """When a dependency renames an MCP server between versions, the stale + name must be removed and the new name must be present.""" + + def setup_method(self): + self._orig_cwd = os.getcwd() + + def teardown_method(self): + os.chdir(self._orig_cwd) + + def test_rename_produces_correct_stale_set(self, tmp_path): + os.chdir(tmp_path) + + # Before update: lockfile knows about the old server name + old_mcp = {"ghcr.io/acme/mcp-server-old", "ghcr.io/acme/mcp-server-gamma"} + + # After update: the package now declares a renamed server + apm_modules = tmp_path / "apm_modules" + _make_pkg_dir(apm_modules, "acme/infra-cloud", mcp=[ + "ghcr.io/acme/mcp-server-new", # renamed from mcp-server-old + "ghcr.io/acme/mcp-server-gamma", + ]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, resolved_by="root"), + ], mcp_servers=sorted(old_mcp)) + + transitive = _collect_transitive_mcp_deps(apm_modules, lock_path) + new_mcp = _get_mcp_dep_names(transitive) + stale = old_mcp - new_mcp + + assert "ghcr.io/acme/mcp-server-old" in stale + assert "ghcr.io/acme/mcp-server-new" not in stale + assert "ghcr.io/acme/mcp-server-new" in new_mcp + assert "ghcr.io/acme/mcp-server-gamma" in new_mcp + + def test_rename_removes_stale_from_mcp_json(self, tmp_path): + os.chdir(tmp_path) + + mcp_json = tmp_path / ".vscode" / "mcp.json" + _write_mcp_json(mcp_json, { + "ghcr.io/acme/mcp-server-old": {"command": "npx", "args": ["old"]}, + "ghcr.io/acme/mcp-server-gamma": {"command": "npx", "args": ["gamma"]}, + }) + + _remove_stale_mcp_servers({"ghcr.io/acme/mcp-server-old"}, runtime="vscode") + + updated = json.loads(mcp_json.read_text(encoding="utf-8")) + assert "ghcr.io/acme/mcp-server-old" not in updated["servers"] + assert "ghcr.io/acme/mcp-server-gamma" in updated["servers"] + + +# --------------------------------------------------------------------------- +# Scenario 4 — Update with MCP removal +# --------------------------------------------------------------------------- +class TestUpdateMCPRemoval: + """When a dependency drops an MCP server entirely, the server must be + removed from both .vscode/mcp.json and the lockfile.""" + + def setup_method(self): + self._orig_cwd = os.getcwd() + + def teardown_method(self): + os.chdir(self._orig_cwd) + + def test_removed_mcp_detected_as_stale(self, tmp_path): + os.chdir(tmp_path) + + old_mcp = {"ghcr.io/acme/mcp-server-alpha", "ghcr.io/acme/mcp-server-beta"} + + # After update, the package no longer declares any MCP servers + apm_modules = tmp_path / "apm_modules" + _make_pkg_dir(apm_modules, "acme/infra-cloud") # no mcp arg + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, resolved_by="root"), + ], mcp_servers=sorted(old_mcp)) + + transitive = _collect_transitive_mcp_deps(apm_modules, lock_path) + new_mcp = _get_mcp_dep_names(transitive) + stale = old_mcp - new_mcp + + assert stale == old_mcp # all old servers are stale + + def test_removal_cleans_mcp_json_and_lockfile(self, tmp_path): + os.chdir(tmp_path) + + mcp_json = tmp_path / ".vscode" / "mcp.json" + _write_mcp_json(mcp_json, { + "ghcr.io/acme/mcp-server-alpha": {"command": "npx", "args": ["alpha"]}, + }) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, resolved_by="root"), + ], mcp_servers=["ghcr.io/acme/mcp-server-alpha"]) + + _remove_stale_mcp_servers({"ghcr.io/acme/mcp-server-alpha"}, runtime="vscode") + _update_lockfile_mcp_servers(set()) + + updated = json.loads(mcp_json.read_text(encoding="utf-8")) + assert updated["servers"] == {} + + reloaded = LockFile.read(lock_path) + assert reloaded.mcp_servers == [] + + +# --------------------------------------------------------------------------- +# Scenario 5 — Deduplication across root and transitive MCP +# --------------------------------------------------------------------------- +class TestDeduplicationRootAndTransitive: + """Root-declared MCP deps take precedence over transitive ones. + Dedup must collapse duplicates while keeping root declarations first.""" + + def test_root_overrides_transitive_duplicate(self, tmp_path): + apm_modules = tmp_path / "apm_modules" + _make_pkg_dir(apm_modules, "acme/infra-cloud", mcp=[ + "ghcr.io/acme/mcp-server-alpha", + ]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, resolved_by="root"), + ]) + + # Root declares alpha with extra config (dict form) + root_mcp = [{"name": "ghcr.io/acme/mcp-server-alpha", "type": "http", "url": "https://custom.example.com"}] + transitive_mcp = _collect_transitive_mcp_deps(apm_modules, lock_path) + + merged = _deduplicate_mcp_deps(root_mcp + transitive_mcp) + assert len(merged) == 1 + # Root's dict form should win (first occurrence) + assert isinstance(merged[0], dict) + assert merged[0]["url"] == "https://custom.example.com" + + def test_dedup_preserves_distinct_servers(self, tmp_path): + apm_modules = tmp_path / "apm_modules" + _make_pkg_dir(apm_modules, "acme/infra-cloud", mcp=["ghcr.io/acme/mcp-server-alpha"]) + _make_pkg_dir(apm_modules, "acme/base-lib", mcp=["ghcr.io/acme/mcp-server-beta"]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, resolved_by="root"), + LockedDependency(repo_url="acme/base-lib", depth=2, resolved_by="acme/infra-cloud"), + ]) + + transitive_mcp = _collect_transitive_mcp_deps(apm_modules, lock_path) + merged = _deduplicate_mcp_deps(transitive_mcp) + names = [d.name for d in merged] + assert len(names) == 2 + assert "ghcr.io/acme/mcp-server-alpha" in names + assert "ghcr.io/acme/mcp-server-beta" in names + + +# --------------------------------------------------------------------------- +# Scenario 6 — Virtual-path packages in lockfile +# --------------------------------------------------------------------------- +class TestVirtualPathMCPCollection: + """Packages with virtual_path in the lockfile must be correctly resolved + to their subdirectory inside apm_modules.""" + + def test_virtual_path_mcp_collected(self, tmp_path): + apm_modules = tmp_path / "apm_modules" + + # Virtual package: acme/monorepo with virtual_path=packages/web-api + _make_pkg_dir( + apm_modules, "acme/monorepo", + virtual_path="packages/web-api", + name="web-api", + mcp=["ghcr.io/acme/mcp-server-web"], + ) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency( + repo_url="acme/monorepo", + virtual_path="packages/web-api", + is_virtual=True, + depth=1, + resolved_by="root", + ), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + names = [d.name for d in result] + assert "ghcr.io/acme/mcp-server-web" in names + + def test_virtual_and_non_virtual_together(self, tmp_path): + apm_modules = tmp_path / "apm_modules" + + _make_pkg_dir(apm_modules, "acme/base-lib", mcp=["ghcr.io/acme/mcp-base"]) + _make_pkg_dir( + apm_modules, "acme/monorepo", + virtual_path="packages/api", + name="api", + mcp=["ghcr.io/acme/mcp-api"], + ) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/base-lib", depth=1, resolved_by="root"), + LockedDependency( + repo_url="acme/monorepo", + virtual_path="packages/api", + is_virtual=True, + depth=1, + resolved_by="root", + ), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path) + names = [d.name for d in result] + assert len(names) == 2 + assert "ghcr.io/acme/mcp-base" in names + assert "ghcr.io/acme/mcp-api" in names + + +# --------------------------------------------------------------------------- +# Scenario 7 — Self-defined MCP trust_private gating +# --------------------------------------------------------------------------- +class TestSelfDefinedMCPTrustGating: + """Self-defined (non-registry) MCP servers from transitive packages are + gated behind trust_private.""" + + def test_self_defined_skipped_by_default(self, tmp_path): + apm_modules = tmp_path / "apm_modules" + _make_pkg_dir(apm_modules, "acme/infra-cloud", mcp=[ + "ghcr.io/acme/mcp-registry-server", + {"name": "private-srv", "registry": False, "transport": "http", "url": "https://private.example.com"}, + ]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, resolved_by="root"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path, trust_private=False) + names = [d.name for d in result] + assert "ghcr.io/acme/mcp-registry-server" in names + assert "private-srv" not in names + + def test_self_defined_included_when_trusted(self, tmp_path): + apm_modules = tmp_path / "apm_modules" + _make_pkg_dir(apm_modules, "acme/infra-cloud", mcp=[ + "ghcr.io/acme/mcp-registry-server", + {"name": "private-srv", "registry": False, "transport": "http", "url": "https://private.example.com"}, + ]) + + lock_path = tmp_path / "apm.lock" + _write_lockfile(lock_path, [ + LockedDependency(repo_url="acme/infra-cloud", depth=1, resolved_by="root"), + ]) + + result = _collect_transitive_mcp_deps(apm_modules, lock_path, trust_private=True) + names = [d.name for d in result] + assert "ghcr.io/acme/mcp-registry-server" in names + assert "private-srv" in names + + +class TestStaleCleanupKeyNormalization: + """_remove_stale_mcp_servers should match config keys that use only the + last path segment (Copilot CLI, Codex) even when stale_names contains + full registry references with '/'.""" + + def test_last_segment_removed_from_copilot_config(self, tmp_path, monkeypatch): + """Stale name 'io.github.github/github-mcp-server' should remove + config key 'github-mcp-server' from Copilot CLI config.""" + monkeypatch.setattr(Path, "cwd", lambda: tmp_path) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + + copilot_dir = tmp_path / ".copilot" + copilot_dir.mkdir() + copilot_config = copilot_dir / "mcp-config.json" + copilot_config.write_text(json.dumps({ + "mcpServers": {"github-mcp-server": {"command": "npx", "args": ["mcp-server"]}} + })) + + stale = {"io.github.github/github-mcp-server"} + _remove_stale_mcp_servers(stale, runtime="copilot") + + result = json.loads(copilot_config.read_text()) + assert "github-mcp-server" not in result["mcpServers"] + + def test_full_ref_removed_from_vscode_config(self, tmp_path, monkeypatch): + """VS Code uses the full reference as key — should still match.""" + monkeypatch.setattr(Path, "cwd", lambda: tmp_path) + + vscode_dir = tmp_path / ".vscode" + vscode_dir.mkdir() + mcp_json = vscode_dir / "mcp.json" + mcp_json.write_text(json.dumps({ + "servers": {"io.github.github/github-mcp-server": {"type": "stdio"}} + })) + + stale = {"io.github.github/github-mcp-server"} + _remove_stale_mcp_servers(stale, runtime="vscode") + + result = json.loads(mcp_json.read_text()) + assert "io.github.github/github-mcp-server" not in result["servers"] + + def test_short_name_without_slash_still_works(self, tmp_path, monkeypatch): + """A stale name without '/' (e.g. 'acme-kb') should still match directly.""" + monkeypatch.setattr(Path, "cwd", lambda: tmp_path) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + + copilot_dir = tmp_path / ".copilot" + copilot_dir.mkdir() + copilot_config = copilot_dir / "mcp-config.json" + copilot_config.write_text(json.dumps({ + "mcpServers": {"acme-kb": {"command": "npx", "args": ["acme-kb"]}} + })) + + stale = {"acme-kb"} + _remove_stale_mcp_servers(stale, runtime="copilot") + + result = json.loads(copilot_config.read_text()) + assert "acme-kb" not in result["mcpServers"]