Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/dependencies.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ mcp:
- `url` — required for `http`, `sse`, `streamable-http` transports
- `command` — required for `stdio` transport

⚠️ **Transitive trust rule:** Self-defined servers from transitive APM packages are skipped with a warning by default. You can either re-declare them in your own `apm.yml`, or use `--trust-transitive-mcp` to trust all self-defined servers from upstream packages:
⚠️ **Transitive trust rule:** Self-defined servers from direct dependencies (depth=1 in the lockfile) are auto-trusted. Self-defined servers from transitive dependencies (depth > 1) are skipped with a warning by default. You can either re-declare them in your own `apm.yml`, or use `--trust-transitive-mcp` to trust all self-defined servers from upstream packages:

```bash
apm install --trust-transitive-mcp
Expand Down
30 changes: 30 additions & 0 deletions docs/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,36 @@ By default APM looks for `agents/`, `skills/`, `commands/`, and `hooks/` directo
- For **agents**, directory contents are flattened into `.apm/agents/` (agents are flat files, not named directories)
- `hooks` also accepts an inline object: `"hooks": {"hooks": {"PreToolUse": [...]}}`

#### MCP Server Definitions

Plugins can ship MCP servers that are automatically deployed through APM's MCP pipeline. Define servers using `mcpServers` in `plugin.json`:

```json
{
"name": "my-plugin",
"mcpServers": {
"my-server": {
"command": "npx",
"args": ["-y", "my-mcp-server"]
},
"my-api": {
"url": "https://api.example.com/mcp"
}
}
}
```

`mcpServers` supports three forms:
- **Object** — inline server definitions (as above)
- **String** — path to a JSON file containing `mcpServers`
- **Array** — list of JSON file paths (merged, last-wins on name conflicts)

When `mcpServers` is absent, APM auto-discovers `.mcp.json` at the plugin root (then `.github/.mcp.json` as fallback), matching Claude Code's auto-discovery behavior.

Servers with `command` are configured as `stdio` transport; servers with `url` use `http` (or the `type` field if it specifies `sse` or `streamable-http`). All plugin-defined MCP servers are treated as self-defined (`registry: false`).

**Trust model**: Self-defined MCP servers from direct dependencies (depth=1) are auto-trusted. Transitive dependencies require `--trust-transitive-mcp`. See [dependencies.md](./dependencies.md#self-defined-servers) for details.

## Examples

### Installing Plugins from GitHub
Expand Down
9 changes: 9 additions & 0 deletions src/apm_cli/adapters/client/codex.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ def _format_server_config(self, server_info, env_overrides=None, runtime_vars=No
"env": {},
"id": server_info.get("id", "") # Add registry UUID for conflict detection
}

# Self-defined stdio deps carry raw command/args — use directly
raw = server_info.get("_raw_stdio")
if raw:
config["command"] = raw["command"]
config["args"] = raw["args"]
if raw.get("env"):
config["env"] = raw["env"]
return config

# Note: Remote servers (SSE type) are handled in configure_mcp_server and rejected early
# This method only handles local servers with packages
Expand Down
13 changes: 13 additions & 0 deletions src/apm_cli/adapters/client/copilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ def _format_server_config(self, server_info, env_overrides=None, runtime_vars=No
"tools": ["*"], # Required by Copilot CLI specification - default to all tools
"id": server_info.get("id", "") # Add registry UUID for conflict detection
}

# Self-defined stdio deps carry raw command/args — use directly
raw = server_info.get("_raw_stdio")
if raw:
config["command"] = raw["command"]
config["args"] = raw["args"]
if raw.get("env"):
config["env"] = raw["env"]
# Apply tools override if present
tools_override = server_info.get("_apm_tools_override")
if tools_override:
config["tools"] = tools_override
return config

# Check for remote endpoints first (registry-defined priority)
remotes = server_info.get("remotes", [])
Expand Down
12 changes: 12 additions & 0 deletions src/apm_cli/adapters/client/vscode.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ def _format_server_config(self, server_info):
# Initialize the base config structure
server_config = {}
input_vars = []

# Self-defined stdio deps carry raw command/args — use directly
raw = server_info.get("_raw_stdio")
if raw:
server_config = {
"type": "stdio",
"command": raw["command"],
"args": raw["args"],
}
if raw.get("env"):
server_config["env"] = raw["env"]
return server_config, input_vars

# Check for packages information
if "packages" in server_info and server_info["packages"]:
Expand Down
190 changes: 189 additions & 1 deletion src/apm_cli/deps/plugin_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
import shutil
from pathlib import Path
from typing import Dict, Any, Optional
from typing import Dict, Any, List, Optional
import yaml


Expand Down Expand Up @@ -108,6 +108,13 @@ def synthesize_apm_yml_from_plugin(plugin_path: Path, manifest: Dict[str, Any])
# Map plugin structure into .apm/ subdirectories
_map_plugin_artifacts(plugin_path, apm_dir, manifest)

# Extract MCP servers from plugin and convert to dependency format
mcp_servers = _extract_mcp_servers(plugin_path, manifest)
if mcp_servers:
mcp_deps = _mcp_servers_to_apm_deps(mcp_servers, plugin_path)
if mcp_deps:
manifest['_mcp_deps'] = mcp_deps

# Generate apm.yml from plugin metadata
apm_yml_content = _generate_apm_yml(manifest)
apm_yml_path = plugin_path / "apm.yml"
Expand All @@ -123,6 +130,182 @@ def _ignore_symlinks(directory, contents):
return [name for name in contents if (Path(directory) / name).is_symlink()]


def _extract_mcp_servers(plugin_path: Path, manifest: Dict[str, Any]) -> Dict[str, Any]:
"""Extract MCP server definitions from a plugin manifest.

Resolves ``mcpServers`` by type (per Claude Code spec):
- ``str`` → read that file path relative to plugin root, parse JSON,
extract ``mcpServers`` key.
- ``list`` → read each file path, merge (last-wins on name conflict).
- ``dict`` → use directly as inline server definitions.

When ``mcpServers`` is absent and ``.mcp.json`` (or ``.github/.mcp.json``)
exists at plugin root, read it as the default (matches Claude Code
auto-discovery).

Security: symlinks are skipped, JSON parse errors are logged as warnings.

``${CLAUDE_PLUGIN_ROOT}`` in string values is replaced with the absolute
plugin path.

Args:
plugin_path: Root of the plugin directory.
manifest: Parsed plugin.json dict.

Returns:
dict mapping server name → server config. Empty on failure.
"""
logger = logging.getLogger("apm")
mcp_value = manifest.get("mcpServers")

if mcp_value is not None:
# Manifest explicitly defines mcpServers
if isinstance(mcp_value, dict):
servers = dict(mcp_value)
elif isinstance(mcp_value, str):
servers = _read_mcp_file(plugin_path, mcp_value, logger)
elif isinstance(mcp_value, list):
servers = {}
for entry in mcp_value:
if isinstance(entry, str):
servers.update(_read_mcp_file(plugin_path, entry, logger))
else:
logger.warning("Ignoring non-string entry in mcpServers array: %s", entry)
else:
logger.warning("Unsupported mcpServers type %s; ignoring", type(mcp_value).__name__)
return {}
else:
# Fall back to auto-discovery: .mcp.json then .github/.mcp.json
servers = {}
for fallback in (".mcp.json", ".github/.mcp.json"):
candidate = plugin_path / fallback
if candidate.exists() and candidate.is_file() and not candidate.is_symlink():
servers = _read_mcp_json(candidate, logger)
if servers:
break

# Substitute ${CLAUDE_PLUGIN_ROOT} in all string values
if servers:
abs_root = str(plugin_path.resolve())
servers = _substitute_plugin_root(servers, abs_root, logger)

return servers


def _read_mcp_file(plugin_path: Path, rel_path: str, logger: logging.Logger) -> Dict[str, Any]:
"""Read a JSON file relative to *plugin_path* and return its ``mcpServers`` dict."""
target = (plugin_path / rel_path).resolve()
# Security: must stay inside plugin_path and not be a symlink
try:
target.relative_to(plugin_path.resolve())
except ValueError:
logger.warning("MCP file path escapes plugin root: %s", rel_path)
return {}
candidate = plugin_path / rel_path
if not candidate.exists() or not candidate.is_file():
logger.warning("MCP file not found: %s", candidate)
return {}
if candidate.is_symlink():
logger.warning("Skipping symlinked MCP file: %s", candidate)
return {}
return _read_mcp_json(candidate, logger)


def _read_mcp_json(path: Path, logger: logging.Logger) -> Dict[str, Any]:
"""Parse a JSON file and return the ``mcpServers`` mapping."""
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as exc:
logger.warning("Failed to read MCP config %s: %s", path, exc)
return {}
if not isinstance(data, dict):
return {}
servers = data.get("mcpServers", {})
return dict(servers) if isinstance(servers, dict) else {}


def _substitute_plugin_root(
servers: Dict[str, Any], abs_root: str, logger: logging.Logger
) -> Dict[str, Any]:
"""Replace ``${CLAUDE_PLUGIN_ROOT}`` in server config string values."""
token = "${CLAUDE_PLUGIN_ROOT}"
substituted = False

def _walk(obj: Any) -> Any:
nonlocal substituted
if isinstance(obj, str) and token in obj:
substituted = True
return obj.replace(token, abs_root)
if isinstance(obj, dict):
return {k: _walk(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_walk(item) for item in obj]
return obj

result = {name: _walk(cfg) for name, cfg in servers.items()}
if substituted:
logger.info("Substituted ${CLAUDE_PLUGIN_ROOT} with %s", abs_root)
return result


def _mcp_servers_to_apm_deps(
servers: Dict[str, Any], plugin_path: Path
) -> List[Dict[str, Any]]:
"""Convert raw MCP server configs to ``dependencies.mcp`` dicts.

Transport inference:
- ``command`` present → stdio
- ``url`` present → http (or ``type`` if it's a valid transport)
- Neither → skipped with warning

Every entry gets ``registry: false`` (self-defined, not registry lookups).

Args:
servers: Mapping of server name → server config dict.
plugin_path: Plugin root (used for log context only).

Returns:
List of dicts consumable by ``MCPDependency.from_dict()``.
"""
logger = logging.getLogger("apm")
deps: List[Dict[str, Any]] = []

for name, cfg in servers.items():
if not isinstance(cfg, dict):
logger.warning("Skipping non-dict MCP server config '%s'", name)
continue

dep: Dict[str, Any] = {"name": name, "registry": False}

if "command" in cfg:
dep["transport"] = "stdio"
dep["command"] = cfg["command"]
if "args" in cfg:
dep["args"] = cfg["args"]
elif "url" in cfg:
raw_type = cfg.get("type", "http")
valid_transports = {"http", "sse", "streamable-http"}
dep["transport"] = raw_type if raw_type in valid_transports else "http"
dep["url"] = cfg["url"]
if "headers" in cfg:
dep["headers"] = cfg["headers"]
else:
logger.warning(
"Skipping MCP server '%s' from plugin '%s': no 'command' or 'url'",
name, plugin_path.name,
)
continue

if "env" in cfg:
dep["env"] = cfg["env"]
if "tools" in cfg:
dep["tools"] = cfg["tools"]

deps.append(dep)

return deps


def _map_plugin_artifacts(plugin_path: Path, apm_dir: Path, manifest: Optional[Dict[str, Any]] = None) -> None:
"""Map plugin artifacts to .apm/ subdirectories and copy pass-through files.

Expand Down Expand Up @@ -305,6 +488,11 @@ def _generate_apm_yml(manifest: Dict[str, Any]) -> str:
if manifest.get('dependencies'):
apm_package['dependencies'] = {'apm': manifest['dependencies']}

# Inject MCP deps extracted from plugin mcpServers / .mcp.json
mcp_deps = manifest.get('_mcp_deps')
if mcp_deps:
apm_package.setdefault('dependencies', {})['mcp'] = mcp_deps

# Install behavior is driven by file presence (SKILL.md, etc.), not this
# field. Default to hybrid so the standard pipeline handles all components.
apm_package['type'] = 'hybrid'
Expand Down
31 changes: 27 additions & 4 deletions src/apm_cli/integration/mcp_integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ def collect_transitive(
picking up stale/orphaned packages from previous installs.
Falls back to scanning all apm.yml files if no lock file is available.

Self-defined servers (registry: false) from transitive packages are
skipped with a warning unless *trust_private* is True.
Self-defined servers (registry: false) from direct dependencies
(depth == 1) are auto-trusted. Self-defined servers from transitive
dependencies (depth > 1) are skipped with a warning unless
*trust_private* is True.
"""
if not apm_modules_dir.exists():
return []
Expand All @@ -65,6 +67,8 @@ def collect_transitive(

# Build set of expected apm.yml paths from apm.lock
locked_paths = None
direct_paths: builtins.set = builtins.set()
lockfile = None
if lock_path and lock_path.exists():
lockfile = LockFile.read(lock_path)
if lockfile is not None:
Expand All @@ -77,6 +81,8 @@ def collect_transitive(
else apm_modules_dir / dep.repo_url / "apm.yml"
)
locked_paths.add(yml.resolve())
if dep.depth == 1:
direct_paths.add(yml.resolve())

# Prefer iterating lock-derived paths directly (existing files only).
# Fall back to full scan only when lock parsing is unavailable.
Expand All @@ -91,9 +97,15 @@ def collect_transitive(
pkg = APMPackage.from_apm_yml(apm_yml_path)
mcp = pkg.get_mcp_dependencies()
if mcp:
is_direct = apm_yml_path.resolve() in direct_paths
for dep in mcp:
if hasattr(dep, "is_self_defined") and dep.is_self_defined:
if trust_private:
if is_direct:
_rich_info(
f"Trusting direct dependency MCP '{dep.name}' "
f"from '{pkg.name}'"
)
elif trust_private:
_rich_info(
f"Trusting self-defined MCP server '{dep.name}' "
f"from transitive package '{pkg.name}' (--trust-transitive-mcp)"
Expand Down Expand Up @@ -157,6 +169,17 @@ def _build_self_defined_info(dep) -> dict:
"""
info: dict = {"name": dep.name}

# For stdio self-defined deps, store raw command/args so adapters
# can bypass registry-specific formatting (npm, docker, etc.).
if dep.transport == "stdio" or (
dep.transport not in ("http", "sse", "streamable-http") and dep.command
):
info["_raw_stdio"] = {
"command": dep.command or dep.name,
"args": list(dep.args) if dep.args else [],
"env": dict(dep.env) if dep.env else {},
}

if dep.transport in ("http", "sse", "streamable-http"):
# Build as a remote endpoint
remote = {
Expand Down Expand Up @@ -192,7 +215,7 @@ def _build_self_defined_info(dep) -> dict:
{
"runtime_hint": dep.command or dep.name,
"name": dep.name,
"registry_name": "",
"registry_name": "self-defined",
"runtime_arguments": runtime_args,
"package_arguments": [],
"environment_variables": env_vars,
Expand Down
Loading