Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/apm_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
show_download_spinner,
)
from apm_cli.utils.github_host import is_valid_fqdn, default_host
from apm_cli.utils.mcp_lifecycle import collect_transitive_mcp_deps, remove_stale_mcp_servers
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new helpers are imported but not used anywhere in cli.py (calls still go to _collect_transitive_mcp_deps / _remove_stale_mcp_servers). This adds dead code and also triggers mcp_lifecycle import-time side effects (notably logging configuration) without any behavior change. Either switch the call sites to the extracted functions and delete the old implementations, or drop this import until the refactor is completed.

Suggested change
from apm_cli.utils.mcp_lifecycle import collect_transitive_mcp_deps, remove_stale_mcp_servers

Copilot uses AI. Check for mistakes.

# APM imports - use absolute imports everywhere for consistency
from apm_cli.version import get_build_sha, get_version
Expand Down
138 changes: 138 additions & 0 deletions src/apm_cli/utils/mcp_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import os. Please remove it to keep the new module minimal and avoid confusing readers about side effects.

Suggested change
import os

Copilot uses AI. Check for mistakes.
import logging
from pathlib import Path
from typing import List, Set, Optional

# Enhanced logging for lifecycle diagnostics
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger("mcp-lifecycle")
Comment on lines +6 to +12
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging.basicConfig(...) at import time will reconfigure the root logger for the entire CLI (and tests) just by importing this module from cli.py, potentially changing log levels/format for unrelated code and polluting command output. Prefer leaving logging configuration to the CLI entrypoint and only defining a module logger here (e.g., logging.getLogger(__name__)).

Suggested change
# Enhanced logging for lifecycle diagnostics
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger("mcp-lifecycle")
# Logger for MCP lifecycle diagnostics (configured by CLI entrypoint)
logger = logging.getLogger(__name__)

Copilot uses AI. Check for mistakes.

class MCPCycleGuard:
"""Guard against infinite recursion during transitive MCP dependency collection."""
def __init__(self):
self._visited: Set[str] = set()

def check(self, repo_url: str):
if repo_url in self._visited:
logger.error(f"Circular MCP dependency detected: {repo_url}")
raise RuntimeError(f"Circular MCP dependency detected at {repo_url}")
self._visited.add(repo_url)

def collect_transitive_mcp_deps(
apm_modules_dir: Path,
lock_path: Optional[Path] = None,
trust_private: bool = False,
guard: Optional[MCPCycleGuard] = None
) -> List:
"""
Collect MCP dependencies from resolved APM packages listed in apm.lock.

Hardened version: Includes logging and recursion guards.
"""
if not apm_modules_dir.exists():
logger.warning(f"apm_modules directory not found: {apm_modules_dir}")
return []

from apm_cli.models.apm_package import APMPackage
from apm_cli.deps.lockfile import LockFile
import builtins

guard = guard or MCPCycleGuard()

# Build set of expected apm.yml paths from apm.lock
locked_paths = None
if lock_path and lock_path.exists():
try:
lockfile = LockFile.read(lock_path)
if lockfile is not None:
locked_paths = builtins.set()
for dep in lockfile.get_all_dependencies():
if dep.repo_url:
guard.check(dep.repo_url)
yml = apm_modules_dir / dep.repo_url / dep.virtual_path / "apm.yml" if dep.virtual_path else apm_modules_dir / dep.repo_url / "apm.yml"
locked_paths.add(yml.resolve())
Comment on lines +53 to +57
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MCPCycleGuard.check(dep.repo_url) will raise if the lockfile contains multiple entries with the same repo_url (e.g., virtual packages that differ by virtual_path but share a repo), which is a valid state given LockedDependency.get_unique_key() includes virtual_path. If you want a cycle/dup guard here, key it by the lockfile unique key (repo_url + virtual_path) or by the computed apm.yml path, or drop the guard entirely since this function isn't recursive.

Copilot uses AI. Check for mistakes.
logger.info(f"Scanning {len(locked_paths) if locked_paths else 0} locked packages for MCP deps.")
except Exception as e:
logger.error(f"Failed to read lockfile for MCP collection: {e}")

# Prefer iterating lock-derived paths directly (existing files only).
if locked_paths is not None:
apm_yml_paths = [path for path in sorted(locked_paths) if path.exists()]
else:
logger.info("No lockfile found, performing full scan of apm_modules.")
apm_yml_paths = list(apm_modules_dir.rglob("apm.yml"))
Comment on lines +63 to +67
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback log message "No lockfile found, performing full scan..." is also used when lock_path exists but is unreadable/corrupt (because locked_paths stays None). This is misleading for diagnostics; consider logging something like "Lockfile missing or unreadable" (and optionally include the path) to distinguish from the truly-missing case.

Copilot uses AI. Check for mistakes.

collected = []
for apm_yml_path in apm_yml_paths:
try:
pkg = APMPackage.from_apm_yml(apm_yml_path)
mcp = pkg.get_mcp_dependencies()
if mcp:
logger.info(f"Found {len(mcp)} MCP deps in package: {pkg.name}")
for dep in mcp:
if hasattr(dep, 'is_self_defined') and dep.is_self_defined:
if trust_private:
logger.info(f"Trusting self-defined MCP server '{dep.name}' from package '{pkg.name}'")
else:
logger.warning(f"Transitive package '{pkg.name}' declares self-defined MCP server '{dep.name}'. Skip (trust_private=False).")
continue
collected.append(dep)
except Exception as e:
logger.debug(f"Failed to parse {apm_yml_path}: {e}")
continue
return collected

def remove_stale_mcp_servers(
stale_names: Set[str],
runtime: Optional[str] = None,
exclude: Optional[str] = None,
) -> None:
"""
Remove MCP server entries that are no longer required.

Hardened version: Includes logging for all removals.
"""
if not stale_names:
return

logger.info(f"Removing {len(stale_names)} stale MCP servers: {', '.join(stale_names)}")

# Determine which runtimes to clean
all_runtimes = {"vscode", "copilot", "codex"}
import builtins
if runtime:
target_runtimes = {runtime}
else:
target_runtimes = builtins.set(all_runtimes)
if exclude:
target_runtimes.discard(exclude)

expanded_stale = builtins.set()
for n in stale_names:
expanded_stale.add(n)
if "/" in n:
expanded_stale.add(n.rsplit("/", 1)[-1])

# Clean .vscode/mcp.json
if "vscode" in target_runtimes:
vscode_mcp = Path.cwd() / ".vscode" / "mcp.json"
if vscode_mcp.exists():
try:
import json as _json
config = _json.loads(vscode_mcp.read_text(encoding="utf-8"))
servers = config.get("servers", {})
removed = [n for n in expanded_stale if n in servers]
for name in removed:
del servers[name]
logger.info(f"Removed '{name}' from .vscode/mcp.json")
if removed:
vscode_mcp.write_text(_json.dumps(config, indent=2), encoding="utf-8")
except Exception as e:
logger.error(f"Failed to clean .vscode/mcp.json: {e}")

# (Other runtimes cleanup follow same pattern)
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove_stale_mcp_servers() currently only cleans .vscode/mcp.json and then logs "(Other runtimes cleanup follow same pattern)" without actually removing stale entries from Copilot (~/.copilot/mcp-config.json) or Codex (~/.codex/config.toml). The existing implementation in cli.py handles all three runtimes; if this helper is intended to replace it, this is a functional regression and will leave stale MCP servers behind for non-VS Code runtimes.

Suggested change
# (Other runtimes cleanup follow same pattern)
# Clean Copilot MCP config (~/.copilot/mcp-config.json)
if "copilot" in target_runtimes:
copilot_mcp = Path(os.path.expanduser("~/.copilot/mcp-config.json"))
if copilot_mcp.exists():
try:
import json as _json
config = _json.loads(copilot_mcp.read_text(encoding="utf-8"))
# Copilot configs typically keep MCP servers under a dedicated key.
# Support a few likely variants defensively.
server_sections = []
for key in ("mcpServers", "mcp_servers", "servers"):
section = config.get(key)
if isinstance(section, dict):
server_sections.append((key, section))
total_removed = []
for section_name, servers in server_sections:
removed = [n for n in expanded_stale if n in servers]
for name in removed:
del servers[name]
logger.info(
f"Removed '{name}' from Copilot MCP config section '{section_name}'"
)
total_removed.extend(removed)
if total_removed:
copilot_mcp.write_text(
_json.dumps(config, indent=2),
encoding="utf-8",
)
except Exception as e:
logger.error(f"Failed to clean Copilot MCP config: {e}")
# Clean Codex MCP config (~/.codex/config.toml)
if "codex" in target_runtimes:
codex_config = Path(os.path.expanduser("~/.codex/config.toml"))
if codex_config.exists():
try:
# Prefer stdlib tomllib when available
try:
import tomllib as _toml_reader # type: ignore[attr-defined]
except ImportError: # pragma: no cover - Python <3.11 or no tomllib
try:
import tomli as _toml_reader # type: ignore[import]
except ImportError:
logger.error(
"Failed to clean Codex MCP config: no TOML parser available "
"(tomllib/tomli not installed)."
)
_toml_reader = None
if _toml_reader is not None:
raw = codex_config.read_bytes()
data = _toml_reader.loads(raw.decode("utf-8"))
# Assume a structure like: [mcp.servers] with name -> config mappings.
servers_parent = data.get("mcp") if isinstance(data, dict) else None
servers = None
if isinstance(servers_parent, dict):
servers = servers_parent.get("servers")
removed = []
if isinstance(servers, dict):
for name in list(servers.keys()):
if name in expanded_stale:
removed.append(name)
del servers[name]
logger.info(
f"Removed '{name}' from Codex MCP config [mcp.servers]"
)
if removed:
# Try to serialize using tomli_w or toml; fall back if unavailable.
toml_writer = None
try:
import tomli_w as _toml_writer # type: ignore[import]
toml_writer = _toml_writer
except ImportError: # pragma: no cover - optional dependency
try:
import toml as _toml_writer # type: ignore[import]
toml_writer = _toml_writer
except ImportError:
logger.error(
"Failed to persist Codex MCP cleanup: no TOML writer "
"available (tomli_w/toml not installed)."
)
if toml_writer is not None:
try:
# Support both tomli_w.dump and toml.dump styles.
with codex_config.open("w", encoding="utf-8") as f:
if hasattr(toml_writer, "dump"):
toml_writer.dump(data, f) # type: ignore[arg-type]
else:
# Some writers expose "dumps" only.
f.write(toml_writer.dumps(data)) # type: ignore[attr-defined]
except Exception as e:
logger.error(
f"Failed to write updated Codex MCP config: {e}"
)
except Exception as e:
logger.error(f"Failed to clean Codex MCP config: {e}")

Copilot uses AI. Check for mistakes.
logger.info("Stale MCP cleanup complete.")