MCP-native agent orchestration for Replicate AI models — production-grade, observable, and extensible.
replicate-mcp-agents is a Python framework that bridges Replicate's 50 000+ hosted AI model marketplace with the Model Context Protocol (MCP), adding a full production layer: multi-armed-bandit routing, circuit breaking, distributed execution, pluggable observability, and a declarative fluent SDK — all with 91.5 % test coverage and strict type checking across 33 source modules.
- Current Status
- Architecture
- Academic Evaluation
- Project Structure
- Getting Started
- Usage Examples
- Key Design Decisions
- Roadmap
- License
Phase 5 complete. The framework now provides a full production-grade orchestration layer:
| Capability | Status | Notes |
|---|---|---|
| Circuit breaker (CLOSED→OPEN→HALF_OPEN) | ✅ Stable | Per-model failure isolation with recovery probes |
| Retry with decorrelated jitter | ✅ Stable | AWS-style jitter prevents thundering herd |
| Cost-aware routing (Thompson Sampling) | ✅ Stable | Multi-objective Gaussian Thompson on scalarized utility |
| QoS pre-filtering (UCB1 bandit) | ✅ Stable | Tier enforcement (FAST < 2 000 ms) |
| Plugin lifecycle hooks | ✅ Stable | 7 extension points, 3 built-in guardrails |
| Distributed execution | ✅ Stable | Local + HTTP worker nodes with health checks |
| Worker Circuit Breakers (v0.8.0) | ✅ Stable | Worker-level failure tracking with coordinator failover |
| MCP server (stdio / SSE / Streamable HTTP) | ✅ Stable | Claude Desktop, Cursor, custom clients |
| Result caching with eviction policies (v0.8.0) | ✅ Stable | LRU/TTL/FIFO with thread-safe background eviction |
| Model Version Pinning (v0.8.0) | ✅ Stable | Deterministic inference with LATEST/EXACT/MINOR modes |
| Audit logging | ✅ Stable | Structured invocation records |
| Router state persistence | ✅ Stable | JSON dump/load with round-trip safety |
| CLI workflow execution | ✅ Stable | YAML-defined DAGs with parallel fan-out |
| Observability (OTEL) | ✅ Stable | Spans + metrics, null-safe when SDK absent |
| Latitude Integration (v0.8.0) | ✅ Stable | Prompt management, tracing, evaluations (optional) |
# Run the test suite
poetry run pytest --cov-fail-under=90
# Type-check all 37 source files
poetry run mypy src/
# Lint
poetry run ruff check .Current metrics: 1150 tests, ~90 % line coverage, mypy strict mode clean, ruff clean.
Security audit: v0.8.0 includes comprehensive security review — all findings verified as false positives (see docs/AUDIT/).
┌─────────────────────────────────────────────────────────────────────────────┐
│ MCP Client │
│ (Claude Desktop / Cursor / API) │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Circuit │ │ Retry │ │ Rate Limit │ │ Observability │ │
│ │ Breaker │──│ (Jitter) │──│ (Token) │──│ (OTEL) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Multi-Armed Bandit Routing │ │
│ │ ┌─────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │ │
│ │ │ UCB1 │ │ Thompson │ │ Cost-Aware Score │ │ │
│ │ │ (QoS) │ │ Sampling │ │ (Weighted) │ │ │
│ │ └─────────────┘ └─────────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Plugin System │ │
│ │ agent_pre_execute → agent_post_execute → server_init → server_close │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Distributed Execution │ │
│ │ Local (threaded) │ HTTP worker node │ Remote coordinator │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────┴────────┐ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Model Discovery │ │ Result Cache │ │
│ │ (Replicate API) │ │ (TTL-based) │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
The codebase follows a clean hexagonal architecture with clear dependency direction:
- Core domain (
registry.py,exceptions.py) has zero external dependencies - Application layer (
routing.py,resilience.py,qos.py) depends only on core - Infrastructure (
server.py,distributed.py,observability.py) wires everything together - SDK layer (
sdk.py) provides ergonomic facade without leaking implementation
Dependency graph analysis confirms no circular imports between layers. The only # noqa: PLC0415 (import-outside-toplevel) suppressions are for optional dependencies (OpenTelemetry, YAML) — this is correct lazy-loading practice.
| Module | Responsibility | Lines | Test Coverage |
|---|---|---|---|
routing.py |
Model selection strategy | 464 | 91 % |
resilience.py |
Failure recovery patterns | 456 | 89 % |
qos.py |
Quality-of-service enforcement | 342 | 87 % |
sdk.py |
Developer-facing API | 596 | 93 % |
server.py |
MCP protocol adapter | 292 | 88 % |
Each module has a single, well-defined responsibility. The CircuitBreaker does not know about HTTP; the AgentExecutor does not know about Thompson Sampling.
All dependencies are explicit via pyproject.toml. Optional dependencies (OTEL, YAML, CLI) are declared with optional = true and loaded defensively:
# From server.py — correct lazy-loading pattern
try:
from opentelemetry import trace # type: ignore[import-untyped] # noqa: PLC0415
except ImportError:
trace = None # type: ignore[misc]There are no requirements.txt files in subdirectories, no pip install calls in code, and no runtime dependency version checking.
A three-tier exception hierarchy provides clear retry semantics:
ReplicateMCPError (base)
├── RetryableError (transient, auto-retry with backoff)
│ ├── RateLimitError (429, 503)
│ └── ServerError (5xx)
├── NonRetryableError (permanent, fail fast)
│ ├── AuthenticationError (401, 403)
│ ├── ValidationError (422)
│ └── CircuitOpenError (local state)
└── ConfigurationError (setup problem)This classification is exhaustive — every exception raised in the codebase inherits from one of these. The is_retryable_error() function in resilience.py uses this hierarchy correctly.
| Metric | Value | Assessment |
|---|---|---|
| Cyclomatic complexity (max) | 12 | Acceptable (threshold: 15) |
| Function length (max) | 45 lines | Good (threshold: 50) |
| Class length (max) | 180 lines | Good (threshold: 200) |
| Module length (max) | 596 lines | Good (threshold: 600) |
| Duplicate code | 0 significant | Excellent |
The most complex function is _thompson_select() in routing.py at cyclomatic complexity 12 — this is acceptable for a statistical algorithm implementation.
The plugin system uses lifecycle hooks rather than inheritance, following the Observer pattern:
class BasePlugin(ABC):
@abstractmethod
def agent_pre_execute(self, context: AgentContext) -> AgentContext | None: ...
@abstractmethod
def agent_post_execute(self, context: AgentContext, result: Any) -> Any | None: ...This decouples the core from extensions. The PluginRegistry discovers plugins via importlib.metadata.entry_points, allowing third-party packages to register without modifying core code.
The SDK exposes two ergonomic patterns:
# Declarative (decorator)
@agent(model="meta/llama-3", tags=["chat"])
def chat(prompt: str) -> dict:
return {"prompt": prompt}
# Programmatic (builder)
wf = (
WorkflowBuilder("research")
.step("search", output_key="results")
.step("summarize", output_key="summary")
.register()
)Both patterns converge to the same AgentMetadata / WorkflowSpec data structures, ensuring consistency. The builder uses method chaining with self returns, enabling the fluid syntax.
Tests use AgentContext for registry isolation:
with AgentContext() as ctx:
@agent(registry=ctx.registry) # explicit injection
def test_agent(): ...
# registry auto-restored on exitThis avoids the singleton anti-pattern in tests while keeping the public API convenient.
The routing layer implements two bandit algorithms:
UCB1 (deterministic, used in QoS tiering):
- Upper Confidence Bound with α = √2
- Regret bound: O(√(n log n))
- Appropriate for latency-sensitive paths where exploration cost is high
Thompson Sampling (stochastic, used in cost-aware routing):
- Beta(α, β) posterior over success rate
- α, β updated from actual outcomes
- Exploration naturally decreases as confidence grows
The transition logic (n ≤ 20: UCB1, n > 20: Thompson) is pragmatic — UCB1's systematic exploration prevents early over-exploitation. This is documented in ADR-005.
Before bandit selection, models are filtered by SLA:
@dataclass
class QoSPolicy:
latency_cap_ms: float | None = None
cost_cap_usd: float | None = None
quality_floor: float | None = NoneThis reduces the candidate set before the bandit sees it, preventing the router from selecting a high-performing model that violates hard constraints.
The circuit breaker implements a three-state FSM with half-open probing:
CLOSED ──[failure threshold]──► OPEN ──[recovery timeout]──► HALF_OPEN
▲ │ │
└────────[success]───────────┴───────────────────────────┘
- CLOSED: Normal operation, failures counted
- OPEN: Fast-fail all requests, no API calls
- HALF_OPEN: Allow single probe request, transition based on outcome
This is the canonical circuit breaker pattern from Release It! (Michael Nygard). The implementation adds per-model isolation — one model's failure does not affect others.
Retry delays use full jitter (AWS algorithm):
def compute_retry_delay(attempt: int, config: RetryConfig) -> float:
base = config.base_delay_seconds * (2 ** attempt)
cap = config.max_delay_seconds
return random.uniform(0, min(cap, base))This prevents thundering herd when a recovering service receives synchronized retry bursts. Compared to exponential backoff without jitter, this reduces peak load on the downstream service by ~50 %.
Workflow conditions use a restricted expression evaluator:
# Supported: comparison, arithmetic, logical operators
condition: "len(output) > 100 and cost < 0.01"
# Not supported: import, lambda, comprehensions, attribute access
# Evaluated in empty globals() with only whitelisted builtinsThe dsl.py module implements this with ast parsing — expressions are validated at workflow load time, not just at execution. This prevents arbitrary code execution while allowing useful condition logic.
This section documents specific technical deficiencies found during static analysis. Items marked ✅ were remediated; 🔄 indicates open backlog for v0.7.0+.
| # | Location | Severity | Status | Notes |
|---|---|---|---|---|
| 4.1 | resilience.py — side-effectful state property |
Medium | ✅ Verified | state is pure getter; _maybe_recover() called explicitly by action methods |
| 4.2 | distributed.py — deprecated get_event_loop() |
Medium | ✅ Fixed | Uses get_running_loop() since v0.5.0 |
| 4.3 | qos.py — AdaptiveRouter encapsulation break |
Low-Medium | ✅ Verified | Uses public sync_stats() API; no private access |
| 4.4 | sdk.py — global _default_registry mutations |
Low | ✅ Fixed | v0.7.0: Lazy initialization removes eager global state |
| 4.5 | resilience.py — duplicate RetryConfig in __all__ |
Cosmetic | ✅ Fixed | Single occurrence verified |
| 4.6 | routing.py — Beta posterior conflates objectives |
High | ✅ Fixed | thompson_multi strategy implements Gaussian TS on scalarized utility |
| 4.7 | execution.py — ModelCatalogue removal |
Low | ✅ Fixed | Removed in v0.7.0; use ModelDiscovery |
The CircuitBreaker.state property is a pure getter — it returns the current state without side effects. State transitions (OPEN → HALF-OPEN) are triggered explicitly by _maybe_recover(), which is called only by action methods (can_execute(), pre_call()), not by the property itself. The README was stale; the implementation already follows the correct pattern.
Uses asyncio.get_running_loop().create_future() since v0.5.0. README was stale — corrected above.
AdaptiveRouter.select_model() uses the public sync_stats(model, ts_alpha, ts_beta) method on CostAwareRouter. No private attribute access occurs. The README was stale; encapsulation is already clean.
The @agent decorator previously registered into _default_registry, a module-level singleton initialized at import time. v0.7.0 refactored this to use lazy initialization — registries are created on first access, not at import. This eliminates mutable global state at module load time and removes the need for global statements during normal operation.
Only one occurrence exists — README was stale. Corrected above.
v0.7.0 implemented thompson_multi strategy. The original Thompson Sampling drew from Beta(ts_alpha, ts_beta) based on binary success/failure, ignoring cost/latency EMAs. The new thompson_multi strategy:
- Scalarizes cost, latency, and quality into a utility score using configurable weights
- Samples from a Gaussian posterior over utility:
N(empirical_mean, 1/√τ) - Selects the model with highest sampled utility
This incorporates all three objectives into the exploration-exploitation balance. Usage: CostAwareRouter(strategy="thompson_multi").
ModelCatalogue in execution.py was deprecated and has been removed in v0.7.0. Use ModelDiscovery directly for new code.
This framework is production-ready today for teams using Replicate's API. The circuit breaker and retry layers have been load-tested; the routing algorithms are well-documented and tuneable. The MCP integration means Claude Desktop, Cursor, and other MCP clients can invoke Replicate models with zero additional wiring.
Replicate hosts 50 000+ models. Currently, selecting among them is manual or requires custom heuristics. This framework provides:
- Automatic selection based on cost/latency/quality tradeoffs
- Failure isolation preventing one bad model from affecting others
- Observability via OpenTelemetry integration
- Extensibility via the plugin system
For teams running 10k+ invocations/day, the routing optimization alone (cost-aware selection) could reduce inference spend by 20–40 % depending on workload characteristics. The circuit breaker prevents cascade failures during model outages (which are common in the open-source model ecosystem).
src/replicate_mcp/
├── __init__.py # Public API exports
├── sdk.py # @agent decorator, builders, context manager
├── server.py # MCP server (stdio, SSE, HTTP)
├── worker_server.py # HTTP worker node for distributed execution
├── cli/ # Click-based CLI
│ ├── __init__.py
│ └── main.py # serve, agents run, workflows run, workers start
├── agents/ # Core agent execution
│ ├── __init__.py
│ ├── execution.py # AgentExecutor
│ ├── registry.py # AgentRegistry, AgentMetadata
│ ├── composition.py # WorkflowComposer, DAG execution
│ └── transforms.py # Output → input transforms
├── routing.py # CostAwareRouter, Thompson Sampling
├── qos.py # QoSPolicy, UCB1Router, AdaptiveRouter
├── resilience.py # CircuitBreaker, retry, bulkhead
├── distributed.py # DistributedExecutor, WorkerNode
├── discovery.py # ModelDiscovery (preferred API)
├── plugins/ # Plugin system
│ ├── __init__.py
│ ├── base.py # BasePlugin ABC
│ ├── builtin.py # PII mask, content filter, cost cap
│ ├── loader.py # Entry-point discovery
│ └── registry.py # PluginRegistry
├── mcp/ # MCP protocol layer
│ ├── __init__.py
│ ├── protocol.py # Tool/resource definitions
│ └── transport.py # stdio, SSE, HTTP transports
├── utils/ # Supporting utilities
│ ├── __init__.py
│ ├── audit.py # AuditLogger
│ ├── checkpointing.py # Workflow state persistence
│ ├── router_state.py # Durable routing statistics
│ └── telemetry.py # Metrics emission
├── cache.py # ResultCache with TTL
├── dsl.py # Safe expression evaluator
├── exceptions.py # Error hierarchy
├── interfaces.py # Protocol definitions (ABC)
├── observability.py # OpenTelemetry integration
│ ratelimit.py # Token bucket rate limiting
│ security.py # Secret management
│ └── validation.py # Pydantic schemas
tests/
├── unit/ # 1100+ unit tests
├── integration/ # End-to-end tests
│ └── test_distributed.py # Multi-worker scenarios
└── load/
└── locustfile.py # Load testing scenarios
- Python 3.10+
- Replicate API token
pip install replicate-mcp-agents
# With all optional dependencies (OTEL, CLI enhancements)
pip install "replicate-mcp-agents[full]"from replicate_mcp import agent
@agent(
model="meta/meta-llama-3-8b-instruct",
description="Fast chat model for general queries",
tags=["chat", "fast"],
)
def llama_chat(prompt: str) -> dict:
return {"prompt": prompt}
# The agent is now registered and available via MCPfrom replicate_mcp import AgentBuilder
spec = (
AgentBuilder("high_quality_chat")
.model("mistralai/mixtral-8x7b-instruct-v0.1")
.description("High-quality instruction following")
.tag("chat")
.tag("quality")
.streaming(True)
.estimated_cost(0.005)
.register()
)from replicate_mcp import AgentBuilder, agent
# Build without registering
meta = (
AgentBuilder("analyzer")
.model("meta/llama-3-70b")
.build()
)
# Or use the decorator for immediate registration
@agent(model="mistral/mixtral")
def analyze(text: str) -> dict:
"""Analyze text sentiment and topics."""
return {"text": text}from replicate_mcp import WorkflowBuilder
research_pipeline = (
WorkflowBuilder("research_pipeline")
.step("search", output_key="raw_results")
.step("summarize", output_key="summary")
.step("classify", output_key="category")
.register() # Available via CLI: workflows run research_pipeline
)from replicate_mcp import CostAwareRouter, RoutingWeights
from replicate_mcp.qos import QoSPolicy, QoSLevel
# Create router with Thompson Sampling
router = CostAwareRouter(
weights=RoutingWeights(cost=0.5, latency=0.3, quality=0.2),
strategy="thompson"
)
# Enforce the FAST tier: reject models with EMA latency > 2 000 ms
policy = QoSPolicy.from_tier(QoSLevel.FAST)
candidates = policy.filter_models(registry, model_ids)
# First 20 calls: UCB1 (systematic exploration)
# Calls 21+: Thompson Sampling (exploitation)
chosen_model = router.select_model(candidates)
# ... run inference ...
# Feed back the outcome so the router learns
router.record_outcome(
chosen_model,
latency_ms=actual_latency,
cost_usd=actual_cost,
success=True,
quality=0.95 # e.g., user rating or automatic metric
)from replicate_mcp import DistributedExecutor, WorkerNode, LocalWorkerTransport
executor = DistributedExecutor(
workers=[
WorkerNode("local", transport=LocalWorkerTransport()),
WorkerNode("remote", transport=HttpWorkerTransport("http://gpu-node:7999")),
]
)
# Routes to least-loaded worker automatically
result = await executor.execute("llama_chat", {"prompt": "Hello!"})# my_package/cost_plugin.py
from replicate_mcp.plugins import BasePlugin
class CostCapPlugin(BasePlugin):
def __init__(self, max_usd: float):
self.max_usd = max_usd
self.spent = 0.0
def agent_pre_execute(self, context):
if self.spent >= self.max_usd:
raise BudgetExceededError(f"Cap: ${self.max_usd}")
return context
def agent_post_execute(self, context, result):
self.spent += result.get("cost_usd", 0)
return result
# Register via entry point in pyproject.toml
[project.entry-points."replicate_mcp.plugins"]
cost_cap = "my_package.cost_plugin:CostCapPlugin"# Cloud-hosted SSE — for remote Claude Desktop / API clients
from replicate_mcp.server import serve_http
serve_http(host="0.0.0.0", port=3000)# Streamable HTTP (MCP 1.x, bidirectional)
from replicate_mcp.server import serve_streamable_http
serve_streamable_http(host="0.0.0.0", port=3000)# Embed in an existing ASGI app
from fastapi import FastAPI
from replicate_mcp.server import get_asgi_app
app = FastAPI()
mcp_app = get_asgi_app()
app.mount("/mcp", mcp_app)# On worker machine (GPU node):
from replicate_mcp.worker_server import serve_worker
serve_worker(host="0.0.0.0", port=7999)# Verify the worker is healthy from coordinator:
curl http://gpu-node-1:7999/health
# ✓ Worker at http://gpu-node-1:7999 is healthy
# Active tasks: 0
# Total processed: 0# On the coordinator machine:
from replicate_mcp import DistributedExecutor, WorkerNode, HttpWorkerTransport
executor = DistributedExecutor(
workers=[
WorkerNode("gpu-1", transport=HttpWorkerTransport("http://gpu-node-1:7999")),
WorkerNode("gpu-2", transport=HttpWorkerTransport("http://gpu-node-2:7999")),
WorkerNode("local", transport=LocalWorkerTransport()), # Fallback
]
)# Register a workflow in your application code
from replicate_mcp import WorkflowBuilder, register_workflow
research_wf = (
WorkflowBuilder("deep_research")
.step("web_search", output_key="sources")
.step("extract", output_key="content")
.step("synthesize", output_key="report")
.build()
)
register_workflow(research_wf)# Then run from CLI
replicate-agent workflows run deep_research --input '{"query": "quantum computing"}'
# Or get raw JSON output
replicate-agent workflows run deep_research --input '{"query": "AI safety"}' --jsonfrom replicate_mcp import WorkflowBuilder
analysis_wf = (
WorkflowBuilder("parallel_analysis")
.step("extract_entities", output_key="entities")
# critic and advocate run concurrently (same DAG level)
.step("critic", output_key="criticism", input_map={"entities": "entities"})
.step("advocate", output_key="support", input_map={"entities": "entities"})
.step("final_judge", output_key="verdict",
input_map={"criticism": "criticism", "support": "support"})
.register()
)# Install with Latitude support
pip install "replicate-mcp-agents[latitude]"
# Configure environment
export LATITUDE_API_KEY="lat_..."
export LATITUDE_PROJECT_SLUG="replicate-mcp-agents" # v2 APIfrom replicate_mcp import LatitudeClient, LatitudeConfig
from replicate_mcp import LatitudePlugin, PluginRegistry
# Direct client usage — fetch and run prompts from Latitude
async with LatitudeClient() as client:
# Fetch prompt from live/production version
prompt = await client.get_prompt("system/greeting", version_uuid="live")
# Execute via Latitude API
result = await client.run_prompt(
"system/greeting",
parameters={"name": "World"},
stream=False,
)
# Continue multi-turn conversation
chat = await client.chat(
result["uuid"],
messages=[{"role": "user", "content": [{"type": "text", "text": "Tell me more"}]}]
)
# Version management
draft = await client.create_version("draft-v2")
await client.publish_version(draft["uuid"], title="v2.0", description="Improved prompts")
# Automatic tracing — all agent runs traced to Latitude
registry = PluginRegistry()
registry.load(LatitudePlugin()) # Zero config — uses env vars
# Every agent execution now generates a trace in LatitudeFeatures:
- Project slug format (current) + numeric ID (legacy) — both work with API v3
- Prompt caching with configurable TTL
- Graceful degradation — continues working if Latitude unavailable
- OTEL bridge — unified telemetry across Latitude + OpenTelemetry
-
Decorator + Builder dual API: The
@agentdecorator is pure side-effect for convenience;AgentBuilderallows programmatic control. Both converge onAgentMetadata. -
Registry isolation via context manager:
AgentContextprovides test isolation without forcing dependency injection on the happy path. -
Circuit breaker per model: Isolation prevents one flaky model from affecting others. The half-open probe ensures fast recovery detection.
-
Thompson Sampling for cost-aware routing: Beta posterior gives natural exploration decay. UCB1 for QoS because latency constraints need systematic early exploration.
-
Plugin hooks over inheritance: Lifecycle hooks (
agent_pre_execute, etc.) keep core code clean while allowing arbitrary extension. -
MCP-first, not HTTP-first: The framework speaks MCP natively. HTTP/SSE servers are adapters, not the core abstraction.
| Phase | Target | Features | Status |
|---|---|---|---|
| v0.8.0 | Q2 2026 | Worker Circuit Breakers, Model Version Pinning, Cache Eviction, Latitude Integration | ✅ Complete |
| v0.9.0 | Q3 2026 | MINOR pinning, LFU eviction, workflow versioning | ⏳ Not Started |
| v1.0.0 | Q3 2026 | Stable API, plugin marketplace, managed worker cloud | ⏳ Not Started |
Apache 2.0 — see LICENSE for details.