From 48d6d2e9509c5bc49d045ec43d2e308981e7480d Mon Sep 17 00:00:00 2001 From: skaar Date: Mon, 10 Nov 2025 12:50:08 -0500 Subject: [PATCH 1/3] Examples using DurableMCP/Reboot builtins and stdlib --- examples/README.md | 259 +++++++++++++++++++ examples/audit/README.md | 225 +++++++++++++++++ examples/audit/audit.py | 168 +++++++++++++ examples/audit/client.py | 108 ++++++++ examples/audit/example.py | 213 ++++++++++++++++ examples/define/README.md | 448 +++++++++++++++++++++++++++++++++ examples/define/client.py | 177 +++++++++++++ examples/define/example.py | 371 +++++++++++++++++++++++++++ examples/document/README.md | 302 ++++++++++++++++++++++ examples/document/client.py | 124 +++++++++ examples/document/example.py | 356 ++++++++++++++++++++++++++ examples/processing/README.md | 198 +++++++++++++++ examples/processing/client.py | 125 +++++++++ examples/processing/example.py | 284 +++++++++++++++++++++ examples/run.py | 235 +++++++++++++++++ examples/steps/README.md | 189 ++++++++++++++ examples/steps/client.py | 92 +++++++ examples/steps/example.py | 171 +++++++++++++ 18 files changed, 4045 insertions(+) create mode 100644 examples/README.md create mode 100644 examples/audit/README.md create mode 100644 examples/audit/audit.py create mode 100644 examples/audit/client.py create mode 100644 examples/audit/example.py create mode 100644 examples/define/README.md create mode 100644 examples/define/client.py create mode 100644 examples/define/example.py create mode 100644 examples/document/README.md create mode 100644 examples/document/client.py create mode 100644 examples/document/example.py create mode 100644 examples/processing/README.md create mode 100644 examples/processing/client.py create mode 100644 examples/processing/example.py create mode 100755 examples/run.py create mode 100644 examples/steps/README.md create mode 100644 examples/steps/client.py create mode 100644 examples/steps/example.py diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..c0ed7b9 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,259 @@ +# DurableMCP Examples + +Examples demonstrating idempotency patterns and durable storage +primitives for Model Context Protocol servers. + +## Concepts + +### Idempotency Guards + +**`at_least_once(alias, context, callable, type)`** + +Operation completes at least once. Caches result on success. Retries +on all failures. + +```python +user_id = await at_least_once( + "create_user", + context, + create_user, + type=str, +) +``` + +**`at_most_once(alias, context, callable, type, retryable_exceptions)`** + +Operation executes at most once. Only retries on specified exceptions. +Raises `AtMostOnceFailedBeforeCompleting` on subsequent calls after +non-retryable failure. + +```python +result = await at_most_once( + "payment", + context, + make_payment, + type=dict, + retryable_exceptions=[NetworkError], +) +``` + +### Storage Primitives + +**SortedMap** + +Larger-than-memory key-value store with lexicographic ordering. +Supports batch operations and range queries. + +```python +map = SortedMap.ref("name") +await map.insert(context, entries={"key": b"value"}) +response = await map.get(context, key="key") +response = await map.range(context, start_key="a", limit=100) +response = await map.reverse_range(context, limit=100) +await map.remove(context, keys=["key"]) +``` + +When calling methods on the same named SortedMap multiple times within +the same context, use `.idempotently()` with unique aliases: + +```python +map = SortedMap.ref("results") +await map.idempotently("store_step1").insert(context, entries={...}) +await map.idempotently("store_step2").insert(context, entries={...}) +``` + +Different named maps don't require idempotency guards. + +**UUIDv7** + +Time-ordered UUID with embedded timestamp. Sorts chronologically in +SortedMap. + +```python +from uuid7 import create as uuid7 + +key = str(uuid7()) # Embeds current timestamp. +await map.insert(context, entries={key: data}) +response = await map.reverse_range(context, limit=10) # Most recent. +``` + +### Tool Lifecycle + +Each `@mcp.tool()` invocation has its own idempotency manager. Guards +only deduplicate within a single tool call, not across multiple calls. + +## Examples + +### audit + +Audit logging with `@audit()` decorator. Stores tool invocations in +SortedMap with UUIDv7 keys for chronological access. + +**Demonstrates**: Decorator pattern, time-range queries, `reverse_range` +for recent entries. + +### steps + +Multi-step operations where each step is independently idempotent. If +tool is retried after step 1 succeeds but before step 2 completes, +step 1 returns cached result. + +**Demonstrates**: Multiple `at_least_once` guards with separate aliases, +sequential dependencies. + +### processing + +Payment processing with `at_most_once` to prevent duplicate charges. +Distinguishes retryable (network errors) from non-retryable (payment +rejected) failures. + +**Demonstrates**: `retryable_exceptions` parameter, +`AtMostOnceFailedBeforeCompleting` exception, error classification. + +### document + +Document processing pipeline combining `at_least_once` (idempotent +reads/writes) and `at_most_once` (external API calls) in a single +workflow. + +**Demonstrates**: Mixed patterns, OCR and translation APIs, multi-step +error handling. + +### define + +Technical glossary demonstrating all SortedMap CRUD operations. +Maintains dual indexes: alphabetical (by term) and chronological +(by UUIDv7). + +**Demonstrates**: `insert`, `get`, `range`, `reverse_range`, `remove`, +prefix search, dual indexing. + +## Running Examples + +### Interactive Harness (Recommended) + +The interactive harness runs examples end-to-end with client +demonstrations: + +```bash +cd examples +python run.py +``` + +**What it does:** + +1. Shows menu of available examples +2. Starts selected server on port 9991 +3. Waits for server to be ready +4. Runs corresponding client script +5. Shows full client output with examples +6. Cleans up server process on exit + +**Exit:** Press `q` at the menu or `Ctrl-C` to exit. + +### Client Pattern + +All example clients follow this pattern: + +```python +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + +async def main(): + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + # List tools. + tools = await session.list_tools() + + # Call tools. + result = await session.call_tool("tool_name", {"arg": "value"}) +``` + +### Running Servers Directly + +To run servers standalone without the harness: + +```bash +cd examples/ +uv run python example.py +``` + +Each example is a standalone MCP server exposing tools via the Model +Context Protocol on `http://localhost:9991/mcp`. + +## Patterns + +### Idempotent Multi-Step Operations + +```python +# Step 1: Cached on success. +step1_result = await at_least_once( + "step1", + context, + do_step1, + type=dict, +) + +# Step 2: Uses result from step 1. +step2_result = await at_least_once( + "step2", + context, + do_step2, + type=dict, +) +``` + +### External API with Retry Policy + +```python +try: + result = await at_most_once( + "api_call", + context, + call_api, + type=dict, + retryable_exceptions=[NetworkError], + ) +except NetworkError: + # Retries exhausted. + return {"error": "service unavailable"} +except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + return {"error": "operation failed previously"} +``` + +### Recent Items with UUIDv7 + +```python +# Store with time-ordered keys. +key = str(uuid7()) +await map.insert(context, entries={key: data}) + +# Query most recent. +response = await map.reverse_range(context, limit=20) +``` + +### Prefix Search + +```python +# Find all keys starting with "api". +start_key = "api" +end_key = "apj" # Increment last character. +response = await map.range( + context, + start_key=start_key, + end_key=end_key, + limit=100, +) +``` + +## Notes + +- Idempotency guards are per-tool-invocation, not per-server. +- SortedMap operations are not atomic across multiple maps. +- UUIDv7 provides millisecond precision for time ordering. +- All storage is persistent and survives server restarts. diff --git a/examples/audit/README.md b/examples/audit/README.md new file mode 100644 index 0000000..f0b1d0e --- /dev/null +++ b/examples/audit/README.md @@ -0,0 +1,225 @@ +# Audit Logging + +Durable audit trails for MCP tool invocations using SortedMap and +UUIDv7. + +## Overview + +Store audit entries with time-ordered UUIDv7 keys for chronological +access. Provides both decorator and explicit logging patterns. + +## Features + +- Decorator pattern for automatic tool auditing +- Explicit logging with custom data +- Time-range queries via UUIDv7 boundaries +- Non-blocking (audit failures don't break tools) + +## Usage + +### Decorator Pattern + +Automatically log tool invocations: + +```python +@mcp.tool() +@audit("user_operations") +async def create_user( + name: str, + email: str, + context: DurableContext = None, +) -> dict: + """Create a new user.""" + user_id = f"user_{hash(name) % 10000}" + return {"status": "success", "user_id": user_id} +``` + +Logged data: + +```json +{ + "timestamp": 1699123456789, + "tool": "create_user", + "inputs": {"name": "Alice", "email": "alice@example.com"}, + "outputs": {"status": "success", "user_id": "1234"}, + "success": true, + "duration_seconds": 0.123 +} +``` + +### Explicit Logging + +Add custom audit entries: + +```python +@mcp.tool() +async def delete_user( + user_id: str, + reason: str = None, + context: DurableContext = None, +) -> dict: + """Delete a user.""" + # Perform deletion. + # ... + + # Log with custom fields. + await audit("user_operations", context, { + "action": "delete_user", + "user_id": user_id, + "reason": reason or "no reason provided", + "severity": "high", + }) + + return {"status": "success"} +``` + +### Querying Audit Logs + +```python +@mcp.tool() +async def get_audit_log( + log_name: str, + begin: int = None, + end: int = None, + limit: int = 100, + context: DurableContext = None, +) -> dict: + """Query audit logs by time range.""" + audit_map = SortedMap.ref(f"audit:{log_name}") + + if begin and end: + # Range query with UUIDv7 boundaries. + response = await audit_map.range( + context, + start_key=str(timestamp_to_uuidv7(begin)), + end_key=str(timestamp_to_uuidv7(end)), + limit=limit, + ) + else: + # Get most recent entries. + response = await audit_map.reverse_range(context, limit=limit) + + # Parse and return entries. + # ... +``` + +## How It Works + +### UUIDv7 Keys + +UUIDv7 embeds timestamp in first 48 bits, providing natural +chronological sorting: + +```python +from uuid7 import create as uuid7 + +key = str(uuid7()) # "018b8c5a-3f7e-7abc-9012-3456789abcdef" +``` + +Later keys sort after earlier keys lexicographically. + +### Storage Structure + +Audit entries stored in SortedMap `audit:{log_name}`: + +``` +audit:user_operations +├─ 018b8c5a-3f7e-7abc-9012-... → {"tool": "create_user", ...} +├─ 018b8c5b-1234-7abc-9012-... → {"tool": "delete_user", ...} +└─ 018b8c5c-5678-7abc-9012-... → {"tool": "update_user", ...} +``` + +### Timestamp to UUIDv7 Conversion + +Convert timestamps to UUIDv7 for range boundaries: + +```python +begin_uuid = timestamp_to_uuidv7(1699000000000) +end_uuid = timestamp_to_uuidv7(1699100000000) + +response = await audit_map.range( + context, + start_key=str(begin_uuid), + end_key=str(end_uuid), + limit=100, +) +``` + +## Examples + +```python +# Get last 50 entries. +get_audit_log("user_operations", limit=50) + +# Get entries from last hour. +import time +one_hour_ago = int((time.time() - 3600) * 1000) +get_audit_log("user_operations", begin=one_hour_ago) + +# Get entries in specific time range. +get_audit_log( + "user_operations", + begin=1699000000000, + end=1699100000000, +) +``` + +## API Reference + +### `audit(log_name, context=None, data=None)` + +Dual-purpose function for audit logging. + +**As decorator:** + +```python +@audit("log_name") +async def my_tool(arg: str, context: DurableContext = None): + ... +``` + +**As function:** + +```python +await audit("log_name", context, { + "action": "example", + "status": "success", +}) +``` + +### `timestamp_to_uuidv7(timestamp_ms)` + +Convert Unix timestamp (milliseconds) to UUIDv7 for range queries. + +## Best Practices + +Choose meaningful log names: + +```python +await audit("user_operations", ...) +await audit("security_events", ...) +await audit("api_calls", ...) +``` + +Use decorator for standard logging, explicit for custom context: + +```python +@mcp.tool() +@audit("user_operations") +async def promote_user(user_id: str, context: DurableContext = None): + # Decorator logs invocation. + + # Also log security event. + await audit("security_events", context, { + "action": "privilege_escalation", + "user_id": user_id, + "severity": "critical", + }) +``` + +## Running + +```bash +cd examples/audit +uv run python example.py +``` diff --git a/examples/audit/audit.py b/examples/audit/audit.py new file mode 100644 index 0000000..4a8023a --- /dev/null +++ b/examples/audit/audit.py @@ -0,0 +1,168 @@ +""" +Audit logging to SortedMap for DurableMCP tools. + +Provides both decorator and explicit logging for storing audit data in a +durable, chronologically-ordered audit trail using UUIDv7. +""" + +import functools +import json +import time +from typing import Any, Callable, Dict, Optional, Union +from uuid import UUID + +from reboot.mcp.server import DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap +from uuid7 import create as uuid7 # type: ignore[import-untyped] + + +def timestamp_to_uuidv7(timestamp_ms: int) -> UUID: + """ + Create a UUIDv7 from a Unix timestamp in milliseconds. + + This is useful for creating range boundaries when querying audit logs. + The UUIDv7 will have the timestamp embedded and minimal random bits. + + Args: + timestamp_ms: Unix timestamp in milliseconds. + + Returns: + UUIDv7 with the given timestamp. + """ + # UUIDv7 format (128 bits): + # - 48 bits: Unix timestamp in milliseconds + # - 4 bits: version (0111 = 7) + # - 12 bits: random + # - 2 bits: variant (10) + # - 62 bits: random + + # Create minimal UUIDv7 with timestamp and zeros for random bits. + timestamp_48 = timestamp_ms & 0xFFFFFFFFFFFF # 48 bits + + # Build the 128-bit UUID. + uuid_int = (timestamp_48 << 80) | (0x7 << 76) # timestamp + version + uuid_int |= (0x2 << 62) # variant bits + + return UUID(int=uuid_int) + + +async def _write_audit( + log_name: str, + context: DurableContext, + data: Dict[str, Any], +) -> None: + """ + Internal function to write audit entry to SortedMap. + + Args: + log_name: Name of the audit log. + context: The durable context. + data: Dictionary of audit data to store. + """ + timestamp = int(time.time() * 1000) + # Use UUIDv7 for time-ordered, unique keys. + key = str(uuid7()) + + # Add timestamp to data if not present. + audit_data = {"timestamp": timestamp, **data} + + try: + audit_map = SortedMap.ref(f"audit:{log_name}") + await audit_map.insert( + context, + entries={key: json.dumps(audit_data).encode("utf-8")}, + ) + except Exception: + # Don't fail the original operation if audit fails. + pass + + +def audit( + log_name: str, + context: Optional[DurableContext] = None, + data: Optional[Dict[str, Any]] = None, +) -> Union[Callable, None]: + """ + Audit logging - works as both decorator and explicit function. + + As a decorator: + @mcp.tool() + @audit("user_operations") + async def my_tool(name: str, context: DurableContext = None): + return {"status": "success"} + + As an explicit function: + await audit("user_operations", context, { + "action": "custom_event", + "user": "alice", + "result": "success", + }) + + Args: + log_name: Name of the audit log (creates SortedMap "audit:{log_name}"). + context: The durable context (required for explicit logging). + data: Freeform dictionary (required for explicit logging). + + Returns: + Decorator function if used as decorator, coroutine if used explicitly. + + Storage: + Audit entries are stored in SortedMap with UUIDv7 keys for + time-ordered, unique identification. Query with reverse_range() + for chronological order (newest first). + + Decorator behavior: + - Captures all function arguments (except context) + - Records function return value or error + - Measures execution duration + - Automatically adds: tool, inputs, outputs, success, duration_seconds + """ + # Explicit logging: audit(log_name, context, data) + if context is not None and data is not None: + return _write_audit(log_name, context, data) + + # Decorator mode: @audit(log_name) + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> Any: + # Assume context is in kwargs. + ctx = kwargs.get("context") + + # Capture inputs (exclude context). + inputs = {k: v for k, v in kwargs.items() if k != "context"} + + # Call function and capture timing. + start_time = time.time() + success = False + result = None + error = None + + try: + result = await func(*args, **kwargs) + success = True + except Exception as e: + error = f"{type(e).__name__}: {str(e)}" + raise + finally: + # Log if we have context. + if ctx: + duration = time.time() - start_time + audit_data = { + "tool": func.__name__, + "inputs": inputs, + "success": success, + "duration_seconds": round(duration, 3), + } + + if success: + audit_data["outputs"] = result + else: + audit_data["error"] = error + + await _write_audit(log_name, ctx, audit_data) + + return result + + return wrapper + + return decorator diff --git a/examples/audit/client.py b/examples/audit/client.py new file mode 100644 index 0000000..27ed198 --- /dev/null +++ b/examples/audit/client.py @@ -0,0 +1,108 @@ +""" +Example client for audit logging demonstration. +""" + +import asyncio +import time +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run audit logging example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to audit example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Create users (decorator auditing). + print("=" * 60) + print("Example 1: Creating users (decorator auditing)") + print("=" * 60) + + users = [ + ("alice", "alice@example.com"), + ("bob", "bob@example.com"), + ("carol", "carol@example.com"), + ] + + for name, email in users: + result = await session.call_tool( + "create_user", + {"name": name, "email": email}, + ) + print(f"Created: {result.content[0].text}") + + print() + + # Example 2: Delete user (explicit auditing). + print("=" * 60) + print("Example 2: Deleting user (explicit auditing)") + print("=" * 60) + + result = await session.call_tool( + "delete_user", + { + "user_id": "user_1234", + "reason": "Account inactive for 2 years", + }, + ) + print(f"Deleted: {result.content[0].text}") + print() + + # Example 3: Query recent audit logs. + print("=" * 60) + print("Example 3: Query recent audit logs") + print("=" * 60) + + result = await session.call_tool( + "get_audit_log", + {"log_name": "user_operations", "limit": 10}, + ) + print(f"Audit log: {result.content[0].text}") + print() + + # Example 4: Update user (mixed pattern). + print("=" * 60) + print("Example 4: Update user role (mixed pattern)") + print("=" * 60) + + result = await session.call_tool( + "update_user", + { + "user_id": "user_5678", + "updates": {"role": "admin", "verified": True}, + }, + ) + print(f"Updated: {result.content[0].text}") + print() + + # Query security events log. + result = await session.call_tool( + "get_audit_log", + {"log_name": "security_events", "limit": 5}, + ) + print(f"Security events: {result.content[0].text}") + print() + + print("Audit example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/audit/example.py b/examples/audit/example.py new file mode 100644 index 0000000..200b101 --- /dev/null +++ b/examples/audit/example.py @@ -0,0 +1,213 @@ +""" +Example DurableMCP server with audit logging. + +Demonstrates both decorator and explicit audit logging patterns. +""" + +import asyncio +import json +import sys +from pathlib import Path +from typing import Any, Dict, Optional + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from audit import audit, timestamp_to_uuidv7 +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +# Example 1: Using @audit decorator +@mcp.tool() +@audit("user_operations") +async def create_user( + name: str, + email: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Create a new user. + + This tool is decorated with @audit, so all invocations are automatically + logged with inputs, outputs, duration, and success/failure. + """ + # Simulate user creation. + user_id = f"user_{hash(name) % 10000}" + + return { + "status": "success", + "user_id": user_id, + "name": name, + "email": email, + } + + +# Example 2: Using explicit audit logging +@mcp.tool() +async def delete_user( + user_id: str, + reason: Optional[str] = None, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Delete a user. + + This tool uses explicit audit logging to record additional context + beyond what the decorator would capture. + """ + # Perform deletion logic here. + # ... + + # Explicit audit with custom fields. + await audit( + "user_operations", + context, + { + "action": "delete_user", + "user_id": user_id, + "reason": reason or "no reason provided", + "severity": "high", + "status": "success", + }, + ) + + return {"status": "success", "user_id": user_id} + + +# Example 3: Tool to query audit logs +@mcp.tool() +async def get_audit_log( + log_name: str, + begin: Optional[int] = None, + end: Optional[int] = None, + limit: int = 100, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Query audit log entries by time range. + + Args: + log_name: Name of the audit log to query. + begin: Start timestamp in milliseconds (Unix epoch). Optional. + end: End timestamp in milliseconds (Unix epoch). Optional. + limit: Maximum number of entries to return (default 100). + context: The durable context. + + Returns: + Dictionary with count and entries list. + + Examples: + # Get last 50 entries. + get_audit_log("user_operations", limit=50) + + # Get entries from last hour. + get_audit_log("user_operations", begin=time.time()*1000 - 3600000) + + # Get entries in specific range. + get_audit_log("user_operations", begin=1699000000000, end=1699100000000) + """ + audit_map = SortedMap.ref(f"audit:{log_name}") + + # Build range query using UUIDv7 boundaries. + if begin is not None and end is not None: + # Range query: begin to end. + begin_key = str(timestamp_to_uuidv7(begin)) + end_key = str(timestamp_to_uuidv7(end)) + + response = await audit_map.range( + context, + start_key=begin_key, + end_key=end_key, + limit=limit, + ) + elif begin is not None: + # Query from begin onwards. + begin_key = str(timestamp_to_uuidv7(begin)) + + response = await audit_map.range( + context, + start_key=begin_key, + limit=limit, + ) + elif end is not None: + # Query up to end (newest first, then filter). + end_key = str(timestamp_to_uuidv7(end)) + + response = await audit_map.reverse_range( + context, + end_key=end_key, + limit=limit, + ) + else: + # No time bounds - get most recent entries. + response = await audit_map.reverse_range( + context, + limit=limit, + ) + + # Parse entries. + entries = [] + for entry in response.entries: + data = json.loads(entry.value.decode("utf-8")) + entries.append(data) + + return { + "log_name": log_name, + "count": len(entries), + "entries": entries, + } + + +# Example 4: Mixed pattern - decorator + explicit logging +@mcp.tool() +@audit("user_operations") +async def update_user( + user_id: str, + updates: Dict[str, Any], + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Update user information. + + Uses both decorator (for automatic capture) and explicit logging + (for additional context). + """ + # Perform update. + # ... + + # The decorator will log this automatically, but we can add + # additional entries for specific events. + if "role" in updates: + await audit( + "security_events", + context, + { + "action": "role_change", + "user_id": user_id, + "old_role": "user", + "new_role": updates["role"], + "severity": "medium", + }, + ) + + return { + "status": "success", + "user_id": user_id, + "updated_fields": list(updates.keys()), + } + + +async def main(): + """Start the example audit server.""" + # Reboot application that runs everything necessary for `DurableMCP`. + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/define/README.md b/examples/define/README.md new file mode 100644 index 0000000..b212527 --- /dev/null +++ b/examples/define/README.md @@ -0,0 +1,448 @@ +# Technical Glossary + +Complete SortedMap CRUD reference using a technical glossary. + +## Overview + +Demonstrates all SortedMap operations through a practical use case: +maintaining a technical glossary with both alphabetical and +chronological indexes. + +## SortedMap Operations + +| Operation | Method | Use Case | +|-----------|--------|----------| +| Insert | `insert(context, entries={...})` | Add terms | +| Get | `get(context, key="...")` | Look up term | +| Range | `range(context, start_key=..., limit=...)` | Browse alphabetically | +| Range (bounded) | `range(context, start_key=..., end_key=..., limit=...)` | Prefix search | +| Reverse Range | `reverse_range(context, limit=...)` | Recent additions | +| Remove | `remove(context, keys=[...])` | Delete terms | + +## Architecture + +Two SortedMaps for different access patterns: + +```python +# Map 1: Alphabetical index (keyed by term name). +terms_map = SortedMap.ref("terms") +# Key: "api" -> Value: {"term": "API", "definition": "...", ...} + +# Map 2: Chronological index (keyed by UUIDv7). +recent_map = SortedMap.ref("recent") +# Key: "018c1234-..." -> Value: {"term": "API", "definition": "...", ...} +``` + +## Usage + +### Insert + +Add terms to both indexes: + +```python +@mcp.tool() +async def add_term( + term: str, + definition: str, + category: str = "general", + context: DurableContext = None, +) -> dict: + """Add a technical term to the glossary.""" + term_data = { + "term": term, + "definition": definition, + "category": category, + "timestamp": int(time.time() * 1000), + } + + # Insert into alphabetical map (keyed by term). + await terms_map.insert( + context, + entries={term.lower(): json.dumps(term_data).encode("utf-8")}, + ) + + # Insert into chronological map (keyed by UUIDv7). + recent_key = str(uuid7()) + await recent_map.insert( + context, + entries={recent_key: json.dumps(term_data).encode("utf-8")}, + ) + + return {"status": "success", "term": term} +``` + +### Get + +Point lookup for term definition: + +```python +@mcp.tool() +async def define( + term: str, + context: DurableContext = None, +) -> dict: + """Look up a term's definition.""" + response = await terms_map.get(context, key=term.lower()) + + if not response.HasField("value"): + return {"status": "error", "message": "Term not found"} + + term_data = json.loads(response.value.decode("utf-8")) + return {"status": "success", "term": term_data} +``` + +### Range + +Browse terms alphabetically: + +```python +@mcp.tool() +async def list_terms( + start_with: str = "", + limit: int = 50, + context: DurableContext = None, +) -> dict: + """List terms alphabetically.""" + if start_with: + # Range starting from prefix. + response = await terms_map.range( + context, + start_key=start_with.lower(), + limit=limit, + ) + else: + # Range from beginning. + response = await terms_map.range( + context, + limit=limit, + ) + + # Parse and return entries. + # ... +``` + +### Range (Bounded) + +Prefix search with start and end boundaries: + +```python +@mcp.tool() +async def search_terms( + prefix: str, + limit: int = 20, + context: DurableContext = None, +) -> dict: + """Search for terms by prefix.""" + # Calculate end key for prefix range. + start_key = prefix.lower() + # Increment last character for upper bound. + end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1) + + response = await terms_map.range( + context, + start_key=start_key, + end_key=end_key.lower(), + limit=limit, + ) + + # Parse and return entries. + # ... +``` + +### Reverse Range + +Get recently added terms (UUIDv7 keys): + +```python +@mcp.tool() +async def recent_terms( + limit: int = 20, + context: DurableContext = None, +) -> dict: + """Get recently added terms.""" + response = await recent_map.reverse_range( + context, + limit=limit, + ) + + # Parse and return entries (newest first). + # ... +``` + +### Remove + +Delete terms from glossary: + +```python +@mcp.tool() +async def remove_term( + term: str, + context: DurableContext = None, +) -> dict: + """Remove a term from the glossary.""" + # Check if term exists first. + response = await terms_map.get(context, key=term.lower()) + + if not response.HasField("value"): + return {"status": "error", "message": "Term not found"} + + # Remove from alphabetical map. + await terms_map.remove( + context, + keys=[term.lower()], + ) + + return {"status": "success", "message": f"Removed '{term}'"} +``` + +## Key Concepts + +### Point Lookup with get() + +Single key retrieval: + +```python +response = await terms_map.get(context, key="api") + +if not response.HasField("value"): + # Key not found. + return {"error": "Not found"} + +# Key found. +data = json.loads(response.value.decode("utf-8")) +``` + +Returns `GetResponse` with optional `value` field. + +### Range Queries + +Ascending order traversal: + +```python +# All keys from "api" onwards. +response = await terms_map.range( + context, + start_key="api", + limit=50, +) + +# Keys from "api" to "apz" (exclusive). +response = await terms_map.range( + context, + start_key="api", + end_key="apz", + limit=50, +) + +# First 50 keys in map. +response = await terms_map.range( + context, + limit=50, +) +``` + +Parameters: + +- `start_key`: Inclusive lower bound (optional) +- `end_key`: Exclusive upper bound (optional) +- `limit`: Maximum entries to return (required) + +Returns `RangeResponse` with `entries` list. + +### Reverse Range + +Descending order traversal (largest to smallest keys): + +```python +# Get 20 most recent entries (UUIDv7 keys are time-ordered). +response = await recent_map.reverse_range( + context, + limit=20, +) + +# Keys from "z" down to "m" (exclusive). +response = await terms_map.reverse_range( + context, + start_key="z", + end_key="m", + limit=50, +) +``` + +Use cases: Recent items, reverse alphabetical browsing. + +### UUIDv7 for Time Ordering + +UUIDv7 embeds timestamp in first 48 bits: + +```python +from uuid7 import create as uuid7 + +# Generate time-ordered key. +key = str(uuid7()) # "018c1234-5678-7abc-9012-3456789abcdef" + +# Later keys sort after earlier keys. +key1 = str(uuid7()) # At time T1. +time.sleep(0.1) +key2 = str(uuid7()) # At time T2. +# key1 < key2 (lexicographically) +``` + +Benefits: Natural chronological sorting, no collisions, works with +`reverse_range()` for recent items. + +### Prefix Search Pattern + +Find all keys starting with prefix "api": + +```python +prefix = "api" +start_key = prefix.lower() +# Increment last character to get exclusive upper bound. +end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1) # "api" -> "apj" + +response = await terms_map.range( + context, + start_key=start_key, + end_key=end_key, + limit=100, +) +``` + +Works because SortedMap uses lexicographic ordering. + +## Best Practices + +Always check `HasField("value")` for `get()`: + +```python +# Correct. +response = await map.get(context, key="term") +if not response.HasField("value"): + return {"error": "Not found"} + +# Wrong (will raise AttributeError if value is unset). +if not response.value: + pass +``` + +Use `limit` parameter for `range()` queries: + +```python +# Required - limit prevents unbounded results. +response = await map.range(context, limit=100) + +# Error - limit is required. +response = await map.range(context) +``` + +Lowercase keys for case-insensitive lookup while preserving original +capitalization: + +```python +# Store original term in data, use lowercase for key. +term_data = { + "term": term, # Preserves "gRPC", "Kubernetes", etc. + "definition": definition, + # ... +} + +await terms_map.insert( + context, + entries={term.lower(): json.dumps(term_data).encode("utf-8")}, +) + +# Lookup with lowercase (case-insensitive). +response = await terms_map.get(context, key=term.lower()) +# Returns: {"term": "gRPC", ...} regardless of input case +``` + +This allows lookups like `define("grpc")`, `define("GRPC")`, and +`define("gRPC")` to all return the same term with its original +capitalization. + +## Common Patterns + +### Recent Items with UUIDv7 + +```python +# Store with UUIDv7 keys for time ordering. +recent_map = SortedMap.ref("recent") +await recent_map.insert( + context, + entries={str(uuid7()): data}, +) + +# Get N most recent. +response = await recent_map.reverse_range(context, limit=10) +``` + +### Dual Indexing + +```python +# Primary index: optimized for lookups. +terms_map = SortedMap.ref("terms") +await terms_map.insert(context, entries={term.lower(): data}) + +# Secondary index: optimized for chronological access. +recent_map = SortedMap.ref("recent") +await recent_map.insert(context, entries={str(uuid7()): data}) +``` + +### Batch Operations + +```python +# Insert multiple entries at once (single call). +await terms_map.insert( + context, + entries={ + "api": json.dumps({...}).encode("utf-8"), + "rest": json.dumps({...}).encode("utf-8"), + "grpc": json.dumps({...}).encode("utf-8"), + }, +) + +# Remove multiple keys at once (single call). +await terms_map.remove( + context, + keys=["api", "rest", "grpc"], +) +``` + +### Multiple Operations on Same Map + +When calling methods on the same named SortedMap multiple times within +the same context, use `.idempotently()` with unique aliases: + +```python +# Multiple inserts on same map require idempotency guards. +terms_map = SortedMap.ref("terms") + +await terms_map.idempotently("insert_api").insert( + context, + entries={"api": data1}, +) + +await terms_map.idempotently("insert_rest").insert( + context, + entries={"rest": data2}, +) +``` + +Different named maps don't require guards: + +```python +# These are different maps - no conflict. +terms_map = SortedMap.ref("terms") +recent_map = SortedMap.ref("recent") + +await terms_map.insert(context, entries={...}) # Fine +await recent_map.insert(context, entries={...}) # Also fine +``` + +## Running + +```bash +cd examples/define +uv run python example.py +``` diff --git a/examples/define/client.py b/examples/define/client.py new file mode 100644 index 0000000..7233d02 --- /dev/null +++ b/examples/define/client.py @@ -0,0 +1,177 @@ +""" +Example client for technical glossary demonstration. +""" + +import asyncio +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run technical glossary example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to define example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Add technical terms. + print("=" * 60) + print("Example 1: Add technical terms to glossary") + print("=" * 60) + + terms = [ + ( + "API", + "Application Programming Interface - A set of protocols " + "for building software", + "architecture", + ["REST API", "GraphQL API"], + ), + ( + "REST", + "Representational State Transfer - Architectural style " + "for distributed systems", + "architecture", + ["RESTful services", "HTTP methods"], + ), + ( + "gRPC", + "Google Remote Procedure Call - High-performance RPC " + "framework", + "networking", + ["Protocol Buffers", "HTTP/2"], + ), + ( + "Docker", + "Platform for developing and running containerized " + "applications", + "devops", + ["containers", "images", "Dockerfile"], + ), + ( + "Kubernetes", + "Container orchestration platform for automating " + "deployment and scaling", + "devops", + ["K8s", "pods", "deployments"], + ), + ] + + for term, definition, category, examples in terms: + result = await session.call_tool( + "add_term", + { + "term": term, + "definition": definition, + "category": category, + "examples": examples, + }, + ) + print(f"Added: {result.content[0].text}") + + print() + + # Example 2: Look up term definitions. + print("=" * 60) + print("Example 2: Look up term definitions") + print("=" * 60) + + for term in ["API", "gRPC", "Docker"]: + result = await session.call_tool( + "define", + {"term": term}, + ) + print(f"Definition of {term}: {result.content[0].text}") + print() + + # Example 3: Case-insensitive lookup. + print("=" * 60) + print("Example 3: Case-insensitive lookup") + print("=" * 60) + + # Demonstrate that lookups work regardless of case. + for query in ["grpc", "GRPC", "GrPc"]: + result = await session.call_tool( + "define", + {"term": query}, + ) + print(f"Lookup '{query}': {result.content[0].text}") + print() + + # Example 4: List terms alphabetically. + print("=" * 60) + print("Example 4: List terms alphabetically") + print("=" * 60) + + result = await session.call_tool( + "list_terms", + {"start_with": "", "limit": 10}, + ) + print(f"Terms (all): {result.content[0].text}") + print() + + # Example 5: Search by prefix. + print("=" * 60) + print("Example 5: Search terms by prefix") + print("=" * 60) + + result = await session.call_tool( + "search_terms", + {"prefix": "gR", "limit": 5}, + ) + print(f"Terms starting with 'gR': {result.content[0].text}") + print() + + # Example 6: Get recently added terms. + print("=" * 60) + print("Example 6: Get recently added terms") + print("=" * 60) + + result = await session.call_tool( + "recent_terms", + {"limit": 3}, + ) + print(f"Recent terms: {result.content[0].text}") + print() + + # Example 7: Remove a term. + print("=" * 60) + print("Example 7: Remove a term") + print("=" * 60) + + result = await session.call_tool( + "remove_term", + {"term": "Docker"}, + ) + print(f"Removed: {result.content[0].text}") + print() + + # Verify removal. + result = await session.call_tool( + "define", + {"term": "Docker"}, + ) + print(f"Lookup after removal: {result.content[0].text}") + print() + + print("Define example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/define/example.py b/examples/define/example.py new file mode 100644 index 0000000..4ed0032 --- /dev/null +++ b/examples/define/example.py @@ -0,0 +1,371 @@ +""" +Technical Glossary with SortedMap CRUD Operations. + +Demonstrates all SortedMap operations: insert, get, range, reverse_range, +and remove, using a technical terms glossary as an example. +""" + +import asyncio +import json +import sys +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap +from uuid7 import create as uuid7 # type: ignore[import-untyped] + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +@mcp.tool() +async def add_term( + term: str, + definition: str, + category: str = "general", + examples: List[str] = None, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Add a technical term to the glossary. + + Stores the term in two SortedMaps: + - Alphabetically by term name (for lookup and browsing). + - Chronologically by UUIDv7 (for recent additions). + + Args: + term: The technical term to define. + definition: Definition of the term. + category: Category (e.g., "programming", "architecture"). + examples: Optional list of usage examples. + context: The durable context. + + Returns: + Confirmation with the term and timestamp. + """ + terms_map = SortedMap.ref("terms") + recent_map = SortedMap.ref("recent") + + timestamp = int(time.time() * 1000) + + term_data = { + "term": term, + "definition": definition, + "category": category, + "examples": examples or [], + "timestamp": timestamp, + } + + # Insert into alphabetical map (keyed by term). + await terms_map.insert( + context, + entries={term.lower(): json.dumps(term_data).encode("utf-8")}, + ) + + # Insert into chronological map (keyed by UUIDv7). + recent_key = str(uuid7()) + await recent_map.insert( + context, + entries={recent_key: json.dumps(term_data).encode("utf-8")}, + ) + + return { + "status": "success", + "term": term, + "timestamp": timestamp, + } + + +@mcp.tool() +async def define( + term: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Look up a term's definition. + + Uses the `get` method for point lookup. + + Args: + term: The term to look up. + context: The durable context. + + Returns: + Term definition or error if not found. + """ + terms_map = SortedMap.ref("terms") + + response = await terms_map.get(context, key=term.lower()) + + if not response.HasField("value"): + return { + "status": "error", + "message": f"Term '{term}' not found in glossary", + } + + term_data = json.loads(response.value.decode("utf-8")) + + return { + "status": "success", + "term": term_data, + } + + +@mcp.tool() +async def remove_term( + term: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Remove a term from the glossary. + + Demonstrates the `remove` method. Note: Only removes from alphabetical + map. The recent map entry remains (showing historical additions). + + Args: + term: The term to remove. + context: The durable context. + + Returns: + Confirmation of removal. + """ + terms_map = SortedMap.ref("terms") + + # Check if term exists first. + response = await terms_map.get(context, key=term.lower()) + + if not response.HasField("value"): + return { + "status": "error", + "message": f"Term '{term}' not found", + } + + # Remove from alphabetical map. + await terms_map.remove( + context, + keys=[term.lower()], + ) + + return { + "status": "success", + "message": f"Removed '{term}' from glossary", + } + + +@mcp.tool() +async def list_terms( + start_with: str = "", + limit: int = 50, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + List terms alphabetically. + + Demonstrates the `range` method with optional start key. + + Args: + start_with: Optional prefix to start listing from. + limit: Maximum number of terms to return. + context: The durable context. + + Returns: + List of terms in alphabetical order. + """ + terms_map = SortedMap.ref("terms") + + if start_with: + # Range starting from prefix. + response = await terms_map.range( + context, + start_key=start_with.lower(), + limit=limit, + ) + else: + # Range from beginning. + response = await terms_map.range( + context, + limit=limit, + ) + + terms = [] + for entry in response.entries: + term_data = json.loads(entry.value.decode("utf-8")) + terms.append({ + "term": term_data["term"], + "definition": term_data["definition"][:100] + "..." + if len(term_data["definition"]) > 100 + else term_data["definition"], + "category": term_data["category"], + }) + + return { + "status": "success", + "count": len(terms), + "terms": terms, + } + + +@mcp.tool() +async def browse_category( + category: str, + limit: int = 50, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Browse terms by category. + + Uses prefix-based range query on category-prefixed keys. + + Args: + category: Category to browse. + limit: Maximum number of terms. + context: The durable context. + + Returns: + Terms in the specified category. + """ + terms_map = SortedMap.ref("terms") + + # Get all terms and filter by category. + # Note: In production, you'd use a separate category-indexed map. + response = await terms_map.range( + context, + limit=1000, # Fetch more to filter. + ) + + terms = [] + for entry in response.entries: + term_data = json.loads(entry.value.decode("utf-8")) + if term_data["category"] == category: + terms.append({ + "term": term_data["term"], + "definition": term_data["definition"], + "examples": term_data["examples"], + }) + if len(terms) >= limit: + break + + return { + "status": "success", + "category": category, + "count": len(terms), + "terms": terms, + } + + +@mcp.tool() +async def recent_terms( + limit: int = 20, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Get recently added terms. + + Demonstrates `reverse_range` on UUIDv7-keyed map to get chronological + order (newest first). + + Args: + limit: Maximum number of recent terms. + context: The durable context. + + Returns: + Recently added terms in reverse chronological order. + """ + recent_map = SortedMap.ref("recent") + + response = await recent_map.reverse_range( + context, + limit=limit, + ) + + terms = [] + for entry in response.entries: + term_data = json.loads(entry.value.decode("utf-8")) + terms.append({ + "term": term_data["term"], + "definition": term_data["definition"][:100] + "..." + if len(term_data["definition"]) > 100 + else term_data["definition"], + "category": term_data["category"], + "added_at": term_data["timestamp"], + }) + + return { + "status": "success", + "count": len(terms), + "recent_terms": terms, + } + + +@mcp.tool() +async def search_terms( + prefix: str, + limit: int = 20, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Search for terms by prefix. + + Demonstrates range query with start and end boundaries. + + Args: + prefix: Search prefix. + limit: Maximum results. + context: The durable context. + + Returns: + Terms matching the prefix. + """ + terms_map = SortedMap.ref("terms") + + # Calculate end key for prefix range. + # For prefix "api", we want keys >= "api" and < "apj". + start_key = prefix.lower() + # Increment last character for upper bound. + end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1) if prefix else None + + if end_key: + response = await terms_map.range( + context, + start_key=start_key, + end_key=end_key.lower(), + limit=limit, + ) + else: + response = await terms_map.range( + context, + start_key=start_key, + limit=limit, + ) + + terms = [] + for entry in response.entries: + term_data = json.loads(entry.value.decode("utf-8")) + terms.append({ + "term": term_data["term"], + "definition": term_data["definition"], + "category": term_data["category"], + }) + + return { + "status": "success", + "prefix": prefix, + "count": len(terms), + "terms": terms, + } + + +async def main(): + """Start the technical glossary server.""" + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/document/README.md b/examples/document/README.md new file mode 100644 index 0000000..98cf80c --- /dev/null +++ b/examples/document/README.md @@ -0,0 +1,302 @@ +# Document Processing + +Document processing pipeline combining `at_least_once` and +`at_most_once` patterns. + +## Overview + +Real-world workflows often need both idempotency patterns. Use +`at_least_once` for idempotent operations (reads, storage) and +`at_most_once` for operations with side effects (external APIs). + +## Workflow + +``` +Upload File -> Process Document + |- Step 1: Get file metadata (at_least_once) + |- Step 2: OCR extraction (at_most_once) + |- Step 3: Translation (at_most_once) + |- Step 4: Store result (at_least_once) +``` + +## Pattern + +```python +@mcp.tool() +async def process_document( + file_id: str, + target_language: str = "en", + context: DurableContext = None, +) -> dict: + """Process document through OCR and translation pipeline.""" + + # Step 1: Idempotent file lookup. + async def get_file_metadata(): + response = await files_map.get(context, key=file_id) + if not response.HasField("value"): + raise ValueError(f"File {file_id} not found") + return json.loads(response.value.decode("utf-8")) + + file_metadata = await at_least_once( + f"get_file_{file_id}", + context, + get_file_metadata, + type=dict, + ) + + # Step 2: OCR (external API, at most once). + async def perform_ocr(): + extracted_text = await simulate_ocr_api(file_metadata["content"]) + # Store intermediate result with idempotency guard. + await results_map.idempotently(f"store_ocr_{job_id}").insert( + context, + entries={...}, + ) + return extracted_text + + try: + ocr_text = await at_most_once( + f"ocr_{job_id}", + context, + perform_ocr, + type=str, + retryable_exceptions=[NetworkError], + ) + except NetworkError: + return {"status": "error", "step": "ocr", "error": "..."} + except AtMostOnceFailedBeforeCompleting: + return {"status": "error", "step": "ocr", "error": "..."} + except InvalidDocumentError as e: + return {"status": "error", "step": "ocr", "error": str(e)} + + # Step 3: Translation (external API, at most once). + async def perform_translation(): + translated_text = await simulate_translation_api( + ocr_text, + target_language, + ) + # Store intermediate result with idempotency guard. + await results_map.idempotently(f"store_translation_{job_id}").insert( + context, + entries={...}, + ) + return translated_text + + try: + translated_text = await at_most_once( + f"translate_{job_id}", + context, + perform_translation, + type=str, + retryable_exceptions=[NetworkError], + ) + except NetworkError: + return {"status": "error", "step": "translation", "error": "..."} + except AtMostOnceFailedBeforeCompleting: + return {"status": "error", "step": "translation", "error": "..."} + except QuotaExceededError as e: + return {"status": "error", "step": "translation", "error": str(e)} + + # Step 4: Idempotent final storage. + async def store_job_result(): + await jobs_map.insert(context, entries={job_id: ...}) + return job_id + + final_job_id = await at_least_once( + f"store_job_{job_id}", + context, + store_job_result, + type=str, + ) + + return {"status": "success", "job_id": final_job_id} +``` + +## Pattern Selection + +| Operation | Pattern | Reason | +|-----------|---------|--------| +| File lookup | `at_least_once` | Idempotent read | +| OCR API call | `at_most_once` | External API side effects | +| Translation API | `at_most_once` | External API quota | +| Store result | `at_least_once` | Idempotent write | + +## Error Handling + +### For at_most_once Steps + +Each external API call needs three exception handlers: + +```python +try: + result = await at_most_once( + "operation", + context, + operation_func, + type=str, + retryable_exceptions=[NetworkError], + ) +except NetworkError: + # Retryable error after retries exhausted. + return {"status": "error", "retryable": True} +except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + return {"status": "error", "retryable": False} +except (InvalidDocumentError, QuotaExceededError) as e: + # First attempt with non-retryable error. + return {"status": "error", "message": str(e)} +``` + +### For at_least_once Steps + +Let exceptions propagate (they'll cause workflow retry): + +```python +# No try/except needed - idempotent operations can safely retry. +result = await at_least_once( + "operation", + context, + operation_func, + type=dict, +) +``` + +## Multiple Operations on Same SortedMap + +When calling a method on the **same named SortedMap** multiple times +within the same context, use `.idempotently()` with a unique alias for +each call: + +```python +# Inside an `at_most_once` or `at_least_once` callable: +async def perform_operation(): + results_map = SortedMap.ref("results") + + # First insert on "results" map. + await results_map.idempotently("store_step1").insert( + context, + entries={key1: value1}, + ) + + # Second insert on same "results" map - needs different alias. + await results_map.idempotently("store_step2").insert( + context, + entries={key2: value2}, + ) + + return result +``` + +Without `.idempotently()`, the second call raises: + +``` +ValueError: To call 'rbt.std.collections.v1.SortedMapMethods.Insert' +of 'results' more than once using the same context an idempotency +alias or key must be specified +``` + +**Different maps don't need idempotency guards:** + +```python +# These are different named maps - no conflict. +results_map = SortedMap.ref("results") +jobs_map = SortedMap.ref("jobs") + +await results_map.insert(context, entries={...}) # Fine +await jobs_map.insert(context, entries={...}) # Also fine +``` + +**For loop operations:** + +Use dynamic aliases when calling the same map in loops: + +```python +items_map = SortedMap.ref("items") +for i in range(5): + await items_map.idempotently(f"insert_item_{i}").insert( + context, + entries={f"item_{i}": data}, + ) +``` + +This pattern applies to all SortedMap methods (`insert`, `get`, +`range`, `remove`) when called multiple times on the same named map +within the same context. + +## Retry Scenarios + +### Network Error During OCR + +1. Step 1: File lookup succeeds +2. Step 2: OCR API raises `NetworkError` +3. `at_most_once` retries OCR +4. OCR succeeds on retry +5. Steps 3-4 proceed normally + +### Invalid Document Error + +1. Step 1: File lookup succeeds +2. Step 2: OCR API raises `InvalidDocumentError` +3. Exception propagates (not in `retryable_exceptions`) +4. Tool returns error response + +### Tool Retry After OCR Success + +1. Initial call: Steps 1-2 succeed, network issue prevents response +2. Tool retried by MCP framework +3. Step 1: `at_least_once` returns cached file metadata +4. Step 2: `at_most_once` returns cached OCR text +5. Step 3: Translation proceeds + +## Best Practices + +Choose the right pattern for each step: + +```python +# Good: Idempotent read uses `at_least_once`. +data = await at_least_once("read", context, read_func, type=dict) + +# Good: External API uses `at_most_once`. +result = await at_most_once( + "api", + context, + api_func, + type=str, + retryable_exceptions=[...], +) + +# Bad: Using `at_most_once` for idempotent read (unnecessary). +data = await at_most_once("read", context, read_func, type=dict) +``` + +Store intermediate results: + +```python +async def expensive_operation(): + result = await external_api() + # Store immediately so we don't lose it. + # Use `.idempotently()` if multiple SortedMap operations occur + # in the same context. + await results_map.idempotently("store_result").insert( + context, + entries={...}, + ) + return result +``` + +Use distinct aliases: + +```python +# Each step has unique alias. +await at_least_once(f"get_file_{file_id}", ...) +await at_most_once(f"ocr_{job_id}", ...) +await at_most_once(f"translate_{job_id}", ...) +await at_least_once(f"store_job_{job_id}", ...) +``` + +## Running + +```bash +cd examples/document +uv run python example.py +``` diff --git a/examples/document/client.py b/examples/document/client.py new file mode 100644 index 0000000..dc8f354 --- /dev/null +++ b/examples/document/client.py @@ -0,0 +1,124 @@ +""" +Example client for document processing demonstration. +""" + +import asyncio +import json +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run document processing example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to document example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Upload files. + print("=" * 60) + print("Example 1: Upload files for processing") + print("=" * 60) + + files = [ + ("file_001", "invoice.pdf", "application/pdf"), + ("file_002", "contract.pdf", "application/pdf"), + ("file_003", "report.docx", "application/vnd.openxmlformats"), + ] + + file_ids = [] + for file_id, filename, content_type in files: + result = await session.call_tool( + "upload_file", + { + "file_id": file_id, + "content": f"Binary content of {filename}", + "metadata": { + "filename": filename, + "content_type": content_type, + }, + }, + ) + print(f"Uploaded: {result.content[0].text}") + + # Extract file_id from result for later use. + try: + data = json.loads(result.content[0].text) + if data.get("status") == "success": + file_ids.append(data["file_id"]) + except json.JSONDecodeError: + print(f" Error parsing response: {result.content[0].text}") + + print() + + # Example 2: Process documents. + print("=" * 60) + print("Example 2: Process documents (OCR + Translation)") + print("=" * 60) + + job_ids = [] + for file_id in file_ids: + result = await session.call_tool( + "process_document", + {"file_id": file_id, "target_language": "es"}, + ) + print(f"Processing result: {result.content[0].text}") + + # Extract job_id if successful. + try: + data = json.loads(result.content[0].text) + if data.get("status") == "success": + job_ids.append(data.get("job_id")) + except json.JSONDecodeError: + pass + + print() + + # Example 3: Check job status. + print("=" * 60) + print("Example 3: Check processing job status") + print("=" * 60) + + # Use the job_id from first successful processing. + if job_ids and job_ids[0]: + result = await session.call_tool( + "get_job_status", + {"job_id": job_ids[0]}, + ) + print(f"Job status: {result.content[0].text}") + print() + + # Example 4: Process with different language. + print("=" * 60) + print("Example 4: Process document to French") + print("=" * 60) + + if file_ids: + result = await session.call_tool( + "process_document", + {"file_id": file_ids[0], "target_language": "fr"}, + ) + print(f"Processing result: {result.content[0].text}") + print() + + print("Document example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/document/example.py b/examples/document/example.py new file mode 100644 index 0000000..0b9c668 --- /dev/null +++ b/examples/document/example.py @@ -0,0 +1,356 @@ +""" +Document Processing with Mixed Idempotency Patterns. + +Demonstrates combining `at_least_once` and `at_most_once` in a single workflow +for document processing with external API calls and multi-step operations. +""" + +import asyncio +import hashlib +import json +import random +import sys +from pathlib import Path +from typing import Any, Dict + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.aio.workflows import ( + at_least_once, + at_most_once, + AtMostOnceFailedBeforeCompleting, +) +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +class NetworkError(Exception): + """Temporary network error (retryable).""" + + pass + + +class InvalidDocumentError(Exception): + """Document format is invalid (not retryable).""" + + pass + + +class QuotaExceededError(Exception): + """API quota exceeded (not retryable).""" + + pass + + +async def simulate_ocr_api(content: str) -> str: + """ + Simulate external OCR API call. + + May raise `NetworkError` (retryable) or `InvalidDocumentError` (not + retryable). + """ + # Simulate network issues. + if random.random() < 0.15: + raise NetworkError("OCR service timeout") + + # Simulate invalid documents. + if random.random() < 0.05: + raise InvalidDocumentError("Unsupported document format") + + # Return simulated OCR text. + return f"Extracted text from document: {content[:50]}..." + + +async def simulate_translation_api(text: str, target_lang: str) -> str: + """ + Simulate external translation API call. + + May raise `NetworkError` (retryable) or `QuotaExceededError` (not + retryable). + """ + # Simulate network issues. + if random.random() < 0.1: + raise NetworkError("Translation service timeout") + + # Simulate quota exceeded. + if random.random() < 0.03: + raise QuotaExceededError("Daily translation quota exceeded") + + # Return simulated translation. + return f"[{target_lang}] {text}" + + +@mcp.tool() +async def process_document( + file_id: str, + target_language: str = "en", + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Process a document through OCR and translation pipeline. + + This demonstrates a complex workflow combining: + - `at_least_once` for idempotent steps (file lookup, result storage) + - `at_most_once` for external API calls with side effects + + Args: + file_id: The file ID to process. + target_language: Target language code for translation. + context: The durable context. + + Returns: + Processing result with job_id and status. + """ + files_map = SortedMap.ref("files") + results_map = SortedMap.ref("results") + jobs_map = SortedMap.ref("jobs") + + # Generate job ID. + job_id = f"job_{hashlib.md5(f'{file_id}_{target_language}'.encode()).hexdigest()[:12]}" + + # Step 1: Retrieve file metadata (idempotent read). + async def get_file_metadata(): + response = await files_map.get(context, key=file_id) + + if not response.HasField("value"): + raise ValueError(f"File {file_id} not found") + + file_data = json.loads(response.value.decode("utf-8")) + return file_data + + # Use `at_least_once` for idempotent file lookup. + file_metadata = await at_least_once( + f"get_file_{file_id}", + context, + get_file_metadata, + type=dict, + ) + + # Step 2: Perform OCR (external API, at most once). + async def perform_ocr(): + # Call OCR API. + extracted_text = await simulate_ocr_api(file_metadata["content"]) + + # Store OCR result. + ocr_result_key = f"{job_id}_ocr" + await results_map.idempotently(f"store_ocr_{job_id}").insert( + context, + entries={ + ocr_result_key: json.dumps( + {"job_id": job_id, "step": "ocr", "text": extracted_text} + ).encode("utf-8") + }, + ) + + return extracted_text + + try: + # Use `at_most_once` to ensure OCR is called at most once. + # Retry only on network errors. + ocr_text = await at_most_once( + f"ocr_{job_id}", + context, + perform_ocr, + type=str, + retryable_exceptions=[NetworkError], + ) + + except NetworkError: + return { + "status": "error", + "job_id": job_id, + "step": "ocr", + "error": "OCR service unavailable", + "retryable": True, + } + + except AtMostOnceFailedBeforeCompleting: + return { + "status": "error", + "job_id": job_id, + "step": "ocr", + "error": "OCR failed on previous attempt", + "retryable": False, + } + + except InvalidDocumentError as e: + return { + "status": "error", + "job_id": job_id, + "step": "ocr", + "error": str(e), + "retryable": False, + } + + # Step 3: Translate text (external API, at most once). + async def perform_translation(): + # Call translation API. + translated_text = await simulate_translation_api( + ocr_text, target_language + ) + + # Store translation result. + translation_result_key = f"{job_id}_translation" + await results_map.idempotently(f"store_translation_{job_id}").insert( + context, + entries={ + translation_result_key: json.dumps( + { + "job_id": job_id, + "step": "translation", + "text": translated_text, + "language": target_language, + } + ).encode("utf-8") + }, + ) + + return translated_text + + try: + # Use `at_most_once` for translation. + translated_text = await at_most_once( + f"translate_{job_id}", + context, + perform_translation, + type=str, + retryable_exceptions=[NetworkError], + ) + + except NetworkError: + return { + "status": "error", + "job_id": job_id, + "step": "translation", + "error": "Translation service unavailable", + "retryable": True, + } + + except AtMostOnceFailedBeforeCompleting: + return { + "status": "error", + "job_id": job_id, + "step": "translation", + "error": "Translation failed on previous attempt", + "retryable": False, + } + + except QuotaExceededError as e: + return { + "status": "error", + "job_id": job_id, + "step": "translation", + "error": str(e), + "retryable": False, + } + + # Step 4: Store final job result (idempotent write). + async def store_job_result(): + await jobs_map.insert( + context, + entries={ + job_id: json.dumps( + { + "job_id": job_id, + "file_id": file_id, + "target_language": target_language, + "status": "completed", + "result": translated_text, + } + ).encode("utf-8") + }, + ) + return job_id + + # Use `at_least_once` for final storage. + final_job_id = await at_least_once( + f"store_job_{job_id}", + context, + store_job_result, + type=str, + ) + + return { + "status": "success", + "job_id": final_job_id, + "result": translated_text, + } + + +@mcp.tool() +async def upload_file( + file_id: str, + content: str, + metadata: Dict[str, str] = None, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Upload a file for processing. + + Args: + file_id: Unique file identifier. + content: File content. + metadata: Optional metadata dictionary. + context: The durable context. + + Returns: + Upload confirmation. + """ + files_map = SortedMap.ref("files") + + await files_map.insert( + context, + entries={ + file_id: json.dumps( + { + "file_id": file_id, + "content": content, + "metadata": metadata or {}, + } + ).encode("utf-8") + }, + ) + + return {"status": "success", "file_id": file_id} + + +@mcp.tool() +async def get_job_status( + job_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Get job processing status and result. + + Args: + job_id: The job ID to query. + context: The durable context. + + Returns: + Job status and result if completed. + """ + jobs_map = SortedMap.ref("jobs") + + response = await jobs_map.get(context, key=job_id) + + if not response.HasField("value"): + return {"status": "error", "message": "Job not found"} + + job_data = json.loads(response.value.decode("utf-8")) + + return {"status": "success", "job": job_data} + + +async def main(): + """Start the document processing example server.""" + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/processing/README.md b/examples/processing/README.md new file mode 100644 index 0000000..fe20e41 --- /dev/null +++ b/examples/processing/README.md @@ -0,0 +1,198 @@ +# Payment Processing + +Payment processing with `at_most_once` to prevent duplicate charges. + +## Overview + +Use `at_most_once` with `retryable_exceptions` to distinguish between +temporary failures (network errors) and permanent failures (payment +rejected). This prevents duplicate payments while allowing retries on +transient errors. + +## Pattern + +```python +@mcp.tool() +async def process_payment( + amount: float, + currency: str = "USD", + context: DurableContext = None, +) -> dict: + """Process payment via external API.""" + + async def make_payment(): + # Call external payment API. + result = await simulate_payment_api(amount, currency) + + # Store payment record. + await payments_map.insert(context, entries={...}) + + return result + + try: + # Retry only on network errors. + result = await at_most_once( + f"payment_{amount}_{currency}", + context, + make_payment, + type=dict, + retryable_exceptions=[NetworkError], + ) + + return {"status": "success", "payment": result} + + except NetworkError: + # Network error after retries exhausted. + return {"status": "error", "message": "Service unavailable"} + + except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + return {"status": "error", "message": "Payment failed previously"} + + except (PaymentRejectedError, InsufficientFundsError) as e: + # First attempt with non-retryable error. + return {"status": "error", "message": str(e)} +``` + +## How It Works + +### Retryable vs Non-Retryable + +Define clear exception classes: + +```python +# Retryable: Temporary failures. +class NetworkError(Exception): + pass + +# Non-retryable: Permanent failures. +class PaymentRejectedError(Exception): + pass + +class InsufficientFundsError(Exception): + pass +``` + +Specify which exceptions should trigger retry: + +```python +result = await at_most_once( + "operation", + context, + operation_func, + type=dict, + retryable_exceptions=[NetworkError], # Only retry these. +) +``` + +### Error Scenarios + +**Network Error (Retryable)** + +1. Initial attempt: `NetworkError` raised +2. `at_most_once` retries the operation +3. Second attempt succeeds +4. Result cached and returned + +**Payment Rejected (Non-Retryable)** + +1. Initial attempt: `PaymentRejectedError` raised +2. Exception propagates (not in `retryable_exceptions`) +3. Tool returns error response + +**Tool Retry After Success** + +1. Initial call: Steps succeed, network issue prevents response +2. Tool called again by MCP framework +3. `at_most_once` returns cached result + +**Tool Retry After Rejection** + +1. Initial call: `PaymentRejectedError` raised +2. Tool returns error response +3. Tool called again +4. `at_most_once` raises `AtMostOnceFailedBeforeCompleting` + +### Three Exception Handlers + +```python +try: + result = await at_most_once(...) + return {"status": "success", ...} + +except NetworkError: + # Retryable error after exhausting retries. + return {"status": "error", "retryable": True} + +except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + return {"status": "error", "retryable": False} + +except (PaymentRejectedError, InsufficientFundsError) as e: + # First attempt with non-retryable error. + return {"status": "error", "message": str(e)} +``` + +## AtMostOnceFailedBeforeCompleting + +Raised when: + +1. Previous attempt failed with non-retryable exception +2. Tool is called again (retry by MCP framework) +3. `at_most_once` detects the previous failure + +Purpose: Prevent re-executing operations that already failed +permanently. + +## Best Practices + +Be specific with `retryable_exceptions`: + +```python +# Good: Explicit list of retryable exceptions. +retryable_exceptions=[NetworkError, TimeoutError] + +# Bad: Too broad (might retry unintended exceptions). +retryable_exceptions=[Exception] +``` + +Handle all exception cases: + +```python +try: + result = await at_most_once(...) +except RetryableError: + pass # Handle exhausted retries. +except AtMostOnceFailedBeforeCompleting: + pass # Handle previous failure. +except PermanentError: + pass # Handle first-time permanent failure. +``` + +Use descriptive aliases: + +```python +# Include identifying information in alias. +await at_most_once( + f"payment_{user_id}_{amount}_{timestamp}", + context, + make_payment, + type=dict, +) +``` + +## Comparison: at_least_once vs at_most_once + +| Feature | at_least_once | at_most_once | +|---------|---------------|--------------| +| Guarantee | Completes at least once | Executes at most once | +| Retry | Always retries on failure | Only on `retryable_exceptions` | +| Use case | Idempotent operations | Operations with side effects | +| Exception | None | `AtMostOnceFailedBeforeCompleting` | + +## Running + +```bash +cd examples/processing +uv run python example.py +``` diff --git a/examples/processing/client.py b/examples/processing/client.py new file mode 100644 index 0000000..9b90ec6 --- /dev/null +++ b/examples/processing/client.py @@ -0,0 +1,125 @@ +""" +Example client for payment processing demonstration. +""" + +import asyncio +import json +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run payment processing example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to processing example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Successful payment. + print("=" * 60) + print("Example 1: Successful payment") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 99.99, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + + # Extract transaction ID from result. + payment1_data = json.loads(result.content[0].text) + txn1_id = payment1_data.get("payment", {}).get("transaction_id") + print() + + # Example 2: Another successful payment. + result = await session.call_tool( + "process_payment", + {"amount": 49.99, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + + # Extract transaction ID from result. + payment2_data = json.loads(result.content[0].text) + txn2_id = payment2_data.get("payment", {}).get("transaction_id") + print() + + # Example 3: Retriable network error (will retry and succeed). + print("=" * 60) + print("Example 3: Retriable network error (retries and succeeds)") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 75.01, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + print() + + # Example 4: Retrieve payment records. + print("=" * 60) + print("Example 4: Retrieve payment records") + print("=" * 60) + + # Try first payment. + if txn1_id: + result = await session.call_tool( + "get_payment", + {"transaction_id": txn1_id}, + ) + print(f"Payment record 1: {result.content[0].text}") + print() + + # Try second payment. + if txn2_id: + result = await session.call_tool( + "get_payment", + {"transaction_id": txn2_id}, + ) + print(f"Payment record 2: {result.content[0].text}") + print() + + # Example 5: Payment that will fail (insufficient funds). + print("=" * 60) + print("Example 5: Failed payment (insufficient funds)") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 999999.99, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + print() + + # Example 6: Payment that will fail (invalid currency). + print("=" * 60) + print("Example 6: Failed payment (invalid currency)") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 50.00, "currency": "INVALID"}, + ) + print(f"Payment result: {result.content[0].text}") + print() + + print("Processing example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/processing/example.py b/examples/processing/example.py new file mode 100644 index 0000000..42626e8 --- /dev/null +++ b/examples/processing/example.py @@ -0,0 +1,284 @@ +""" +Payment Processing with at_most_once. + +Demonstrates using at_most_once for operations that call external APIs where +retrying after certain errors could cause unintended side effects. +""" + +import asyncio +import json +import random +import sys +from pathlib import Path +from typing import Any, Dict + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.aio.workflows import at_most_once, AtMostOnceFailedBeforeCompleting +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +class NetworkError(Exception): + """Temporary network error (retryable).""" + + pass + + +class PaymentRejectedError(Exception): + """Payment was rejected by payment processor (not retryable).""" + + pass + + +class InsufficientFundsError(Exception): + """Insufficient funds (not retryable).""" + + pass + + +async def simulate_payment_api( + amount: float, + currency: str, + context: DurableContext, +) -> Dict[str, Any]: + """ + Simulate external payment API call. + + Raises different exception types to demonstrate retry behavior. + """ + # Check for invalid currency (non-retryable error). + valid_currencies = ["USD", "EUR", "GBP", "JPY"] + if currency not in valid_currencies: + raise PaymentRejectedError(f"Invalid currency: {currency}") + + # Check for insufficient funds (non-retryable error). + if amount > 100000: + raise InsufficientFundsError( + f"Amount {amount} exceeds available balance" + ) + + # For amounts ending in .01, simulate retriable network errors using + # a SortedMap to track attempts. Fail first attempt, succeed on retry. + if amount % 1 == 0.01: + retry_map = SortedMap.ref("retry_attempts") + attempt_key = f"payment_{amount}_{currency}" + + # Get current attempt count. + response = await retry_map.get(context, key=attempt_key) + + if response.HasField("value"): + attempts = int(response.value.decode("utf-8")) + else: + attempts = 0 + + # Increment attempt count. + await retry_map.insert( + context, + entries={attempt_key: str(attempts + 1).encode("utf-8")}, + ) + + # Fail first attempt to demonstrate retry. + if attempts == 0: + raise NetworkError("Simulated network timeout for demo (will retry)") + + # Success: Return payment confirmation. + return { + "transaction_id": f"txn_{random.randint(100000, 999999)}", + "amount": amount, + "currency": currency, + "status": "completed", + } + + +@mcp.tool() +async def process_payment( + amount: float, + currency: str = "USD", + description: str = "", + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Process a payment via external payment API. + + Uses at_most_once to ensure the payment is only attempted once, even if + the tool is retried. Network errors are retryable, but payment rejections + are not. + + Args: + amount: Payment amount. + currency: Currency code (default: USD). + description: Payment description. + context: The durable context. + + Returns: + Payment result or error information. + """ + payments_map = SortedMap.ref("payments") + + async def make_payment(): + # Call external payment API. + result = await simulate_payment_api(amount, currency, context) + + # Store payment record. + payment_id = result["transaction_id"] + await payments_map.insert( + context, + entries={ + payment_id: json.dumps( + { + "transaction_id": payment_id, + "amount": amount, + "currency": currency, + "description": description, + "status": result["status"], + } + ).encode("utf-8") + }, + ) + + return result + + try: + # Use at_most_once to ensure payment is attempted at most once. + # Only retry on network errors - payment rejections are final. + result = await at_most_once( + f"payment_{amount}_{currency}_{hash(description)}", + context, + make_payment, + type=dict, + retryable_exceptions=[NetworkError], + ) + + return { + "status": "success", + "payment": result, + } + + except NetworkError: + # Network error after retries exhausted. + return { + "status": "error", + "error_type": "network_error", + "message": "Payment service temporarily unavailable", + "retryable": True, + } + + except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + # This means payment was rejected or funds were insufficient. + return { + "status": "error", + "error_type": "payment_failed", + "message": "Payment failed on previous attempt (not retryable)", + "retryable": False, + } + + except (PaymentRejectedError, InsufficientFundsError) as e: + # First attempt with non-retryable error. + return { + "status": "error", + "error_type": type(e).__name__, + "message": str(e), + "retryable": False, + } + + +@mcp.tool() +async def get_payment( + transaction_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Retrieve payment record. + + Args: + transaction_id: The transaction ID to retrieve. + context: The durable context. + + Returns: + Payment data or error if not found. + """ + payments_map = SortedMap.ref("payments") + + response = await payments_map.get(context, key=transaction_id) + + if not response.HasField("value"): + return {"status": "error", "message": "Payment not found"} + + payment_data = json.loads(response.value.decode("utf-8")) + + return {"status": "success", "payment": payment_data} + + +@mcp.tool() +async def fetch_exchange_rate( + from_currency: str, + to_currency: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Fetch exchange rate from external API. + + Demonstrates at_most_once for read-only API calls where retries are safe + but we want to avoid redundant network calls. + + Args: + from_currency: Source currency code. + to_currency: Target currency code. + context: The durable context. + + Returns: + Exchange rate or error. + """ + + async def fetch_rate(): + # Simulate API call with occasional network errors. + if random.random() < 0.1: + raise NetworkError("API timeout") + + # Return simulated exchange rate. + return { + "from": from_currency, + "to": to_currency, + "rate": round(random.uniform(0.5, 2.0), 4), + } + + try: + # Retry on network errors only. + result = await at_most_once( + f"exchange_rate_{from_currency}_{to_currency}", + context, + fetch_rate, + type=dict, + retryable_exceptions=[NetworkError], + ) + + return {"status": "success", "data": result} + + except NetworkError: + return { + "status": "error", + "message": "Exchange rate service unavailable", + } + + except AtMostOnceFailedBeforeCompleting: + return { + "status": "error", + "message": "Previous fetch attempt failed", + } + + +async def main(): + """Start the payment processing example server.""" + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/run.py b/examples/run.py new file mode 100755 index 0000000..1a5f44e --- /dev/null +++ b/examples/run.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +Harness for running DurableMCP examples. + +Lets user select an example, starts the server, and runs the client. +""" + +import asyncio +import os +import signal +import socket +import subprocess +import sys +import time +from pathlib import Path + +import aiohttp + + +EXAMPLES = { + "1": { + "name": "audit", + "description": "Audit logging with decorator and explicit patterns", + }, + "2": { + "name": "steps", + "description": "Multi-step operations with independent idempotency", + }, + "3": { + "name": "processing", + "description": "Payment processing with at_most_once", + }, + "4": { + "name": "document", + "description": "Document pipeline combining both patterns", + }, + "5": { + "name": "define", + "description": "Technical glossary with SortedMap CRUD", + }, +} + + +def print_menu(): + """Print the example selection menu.""" + print("\nDurableMCP Examples") + print("=" * 60) + for key, example in sorted(EXAMPLES.items()): + print(f"{key}. {example['name']:12} - {example['description']}") + print("=" * 60) + + +def get_selection(): + """Get user's example selection.""" + while True: + choice = input("\nSelect example (1-5, or 'q' to quit): ").strip() + if choice.lower() == "q": + return None + if choice in EXAMPLES: + return EXAMPLES[choice]["name"] + print(f"Invalid selection: {choice}") + + +def check_port_in_use(port: int) -> bool: + """Check if a port is already in use.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(("localhost", port)) == 0 + + +async def wait_for_server(url: str, timeout: int = 30): + """Wait for server to be ready.""" + start = time.time() + port = 9991 + + # First, wait for port to be open. + while time.time() - start < timeout: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + print(f"Port {port} is open, checking MCP endpoint...") + break + await asyncio.sleep(0.5) + else: + print(f"Timeout: Port {port} never opened") + return False + + # Port is open, now wait for MCP to respond. + # Give it a moment to fully initialize. + await asyncio.sleep(2) + + while time.time() - start < timeout: + try: + async with aiohttp.ClientSession() as session: + # Try to list tools via MCP protocol. + async with session.post( + url, + json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/list", + }, + ) as resp: + # Any response means server is up. + print(f"MCP responded with status {resp.status}") + if resp.status in (200, 400, 405, 406): + return True + except (aiohttp.ClientError, ConnectionError) as e: + print(f"MCP check failed: {e.__class__.__name__}") + await asyncio.sleep(0.5) + except Exception as e: + print(f"Unexpected error: {e}") + await asyncio.sleep(0.5) + + print("Timeout: MCP endpoint never responded") + return False + + +async def run_example(example_name: str): + """Run the selected example.""" + example_dir = Path(__file__).parent / example_name + server_path = example_dir / "example.py" + client_path = example_dir / "client.py" + + if not server_path.exists(): + print(f"Error: Server not found at {server_path}") + return + + if not client_path.exists(): + print(f"Error: Client not found at {client_path}") + return + + print(f"\nStarting {example_name} example...") + print(f"Server: {server_path}") + print(f"Client: {client_path}") + + # Check if port 9991 is already in use. + if check_port_in_use(9991): + print("\nWarning: Port 9991 is already in use!") + print("Please stop the existing server before continuing.") + return + + # Start the server with rbt dev run in a new process group. + print("\nStarting server (output below)...") + print("-" * 60) + server_process = subprocess.Popen( + [ + "uv", + "run", + "rbt", + "dev", + "run", + "--python", + f"--application={server_path.name}", + "--working-directory=.", + "--no-generate-watch", + ], + cwd=example_dir, + stdin=subprocess.PIPE, # Provide a pipe for stdin. + preexec_fn=os.setsid, # Create new process group. + ) + + try: + # Wait for server to be ready. + print("\nWaiting for server to be ready on port 9991...") + if not await wait_for_server("http://localhost:9991/mcp"): + print("Error: Server did not start in time") + print("Check server output above for errors") + return + + print("Server ready!") + print("\n" + "=" * 60) + + # Run the client. + client_process = await asyncio.create_subprocess_exec( + sys.executable, + str(client_path), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + stdout, stderr = await client_process.communicate() + + print(stdout.decode()) + if stderr: + print("Errors:", stderr.decode(), file=sys.stderr) + + print("=" * 60) + + finally: + # Clean up - send SIGINT (Ctrl-C) to allow rbt to cleanup properly. + print("\nShutting down server...") + try: + # Send SIGINT (like Ctrl-C) to the entire process group. + # This gives rbt a chance to cleanup docker containers. + os.killpg(os.getpgid(server_process.pid), signal.SIGINT) + # Wait longer for graceful shutdown (docker cleanup takes time). + try: + server_process.wait(timeout=10) + print("Server stopped cleanly") + except subprocess.TimeoutExpired: + print("Server didn't stop in time, forcing shutdown...") + # Force kill if necessary. + os.killpg(os.getpgid(server_process.pid), signal.SIGKILL) + server_process.wait() + except ProcessLookupError: + # Process already terminated. + pass + + +async def main(): + """Main harness loop.""" + while True: + print_menu() + example = get_selection() + + if example is None: + print("\nExiting...") + break + + try: + await run_example(example) + except KeyboardInterrupt: + print("\n\nInterrupted by user") + break + except Exception as e: + print(f"\nError running example: {e}", file=sys.stderr) + + input("\nPress Enter to continue...") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\nExiting...") + sys.exit(0) diff --git a/examples/steps/README.md b/examples/steps/README.md new file mode 100644 index 0000000..b88eab4 --- /dev/null +++ b/examples/steps/README.md @@ -0,0 +1,189 @@ +# Multi-Step Operations + +Multi-step operations with independent idempotency guards using +`at_least_once`. + +## Overview + +When a tool performs multiple sequential operations, each step should +be independently idempotent. If the tool is retried after step 1 +succeeds but before step 2 completes, step 1 returns its cached result +and only step 2 retries. + +## Pattern + +```python +@mcp.tool() +async def create_user_with_profile( + username: str, + email: str, + bio: str = "", + context: DurableContext = None, +) -> dict: + """Create user and profile in separate steps.""" + + # Step 1: Create user (cached on success). + async def create_user(): + user_id = f"user_{hash(username) % 100000}" + await users_map.insert( + context, + entries={user_id: json.dumps({...}).encode("utf-8")}, + ) + return user_id + + user_id = await at_least_once( + f"create_user_{username}", + context, + create_user, + type=str, + ) + + # Step 2: Create profile (cached independently). + async def create_profile(): + profile_id = f"profile_{user_id}" + await profiles_map.insert( + context, + entries={profile_id: json.dumps({...}).encode("utf-8")}, + ) + return profile_id + + profile_id = await at_least_once( + f"create_profile_{user_id}", + context, + create_profile, + type=str, + ) + + return {"user_id": user_id, "profile_id": profile_id} +``` + +## How It Works + +Each step has its own `at_least_once` guard with a unique alias. + +### Retry After Step 1 Success + +1. Initial call: Step 1 creates user, succeeds +2. Crash before step 2 +3. Retry: Step 1 returns cached `user_id`, step 2 executes + +Result: User created once, profile created on retry. + +### Retry After Both Steps + +1. Initial call: Step 1 succeeds, step 2 succeeds +2. Network error prevents response delivery +3. Retry: Both steps return cached results + +Result: Both operations return immediately without re-execution. + +### Retry After Step 1 Failure + +1. Initial call: Step 1 raises exception +2. Retry: Step 1 executes again + +Result: Step 1 retries until success, then step 2 executes. + +## Key Concepts + +### Independent Guards + +Each step has its own guard with distinct alias: + +```python +# Step 1 alias. +await at_least_once(f"create_user_{username}", ...) + +# Step 2 alias. +await at_least_once(f"create_profile_{user_id}", ...) +``` + +This ensures each step caches independently. + +### Sequential Dependencies + +Later steps can depend on earlier step results: + +```python +# Step 1: Get data. +user_id = await at_least_once("create_user", context, ...) + +# Step 2: Use result from step 1. +profile_id = await at_least_once( + f"create_profile_{user_id}", # Uses `user_id`. + context, + ..., +) +``` + +### Function References + +Pass function references directly (no lambda): + +```python +# Correct. +async def create_user(): + return await create_user_record(...) + +user_id = await at_least_once( + "create_user", + context, + create_user, # Function reference. + type=str, +) + +# Wrong (don't use lambda unless needed for arguments). +user_id = await at_least_once( + "create_user", + context, + lambda: create_user(), + type=str, +) +``` + +## Best Practices + +Use distinct aliases: + +```python +# Good: Different aliases. +await at_least_once(f"create_user_{username}", ...) +await at_least_once(f"create_profile_{user_id}", ...) + +# Bad: Same alias. +await at_least_once("create", ...) +await at_least_once("create", ...) +``` + +Make each step atomic: + +```python +# Good: Each step is complete operation. +user_id = await at_least_once("create_user", ...) +profile_id = await at_least_once("create_profile", ...) + +# Bad: Steps too granular. +await at_least_once("validate_username", ...) +await at_least_once("hash_password", ...) +await at_least_once("insert_database", ...) +``` + +Let exceptions propagate: + +```python +# Good: `at_least_once` handles retries. +user_id = await at_least_once("create_user", context, create_user, ...) + +# Bad: Catching exceptions defeats retry. +try: + user_id = await at_least_once("create_user", ...) +except Exception: + return {"error": "failed"} # Operation won't retry. +``` + +## Running + +```bash +cd examples/steps +uv run python example.py +``` diff --git a/examples/steps/client.py b/examples/steps/client.py new file mode 100644 index 0000000..b91fef8 --- /dev/null +++ b/examples/steps/client.py @@ -0,0 +1,92 @@ +""" +Example client for multi-step operations demonstration. +""" + +import asyncio +import json +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run multi-step operations example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to steps example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Create user with profile. + print("=" * 60) + print("Example 1: Create user with profile (two-step)") + print("=" * 60) + + result = await session.call_tool( + "create_user_with_profile", + { + "username": "alice", + "email": "alice@example.com", + "bio": "Software engineer interested in distributed systems", + }, + ) + print(f"Created: {result.content[0].text}") + + # Parse the result to get actual user_id and profile_id. + alice_data = json.loads(result.content[0].text) + alice_user_id = alice_data.get("user_id") + alice_profile_id = alice_data.get("profile_id") + print() + + # Example 2: Create another user. + result = await session.call_tool( + "create_user_with_profile", + { + "username": "bob", + "email": "bob@example.com", + "bio": "Data scientist working on ML infrastructure", + }, + ) + print(f"Created: {result.content[0].text}") + print() + + # Example 3: Retrieve user data. + print("=" * 60) + print("Example 2: Retrieve user and profile data") + print("=" * 60) + + if alice_user_id: + result = await session.call_tool( + "get_user", + {"user_id": alice_user_id}, + ) + print(f"User data: {result.content[0].text}") + print() + + if alice_profile_id: + result = await session.call_tool( + "get_profile", + {"profile_id": alice_profile_id}, + ) + print(f"Profile data: {result.content[0].text}") + print() + + print("Steps example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/steps/example.py b/examples/steps/example.py new file mode 100644 index 0000000..a196533 --- /dev/null +++ b/examples/steps/example.py @@ -0,0 +1,171 @@ +""" +Multi-Step Operations with Partial Failure Recovery. + +Demonstrates using at_least_once for operations with multiple steps where +each step should be idempotent and cached independently. +""" + +import asyncio +import json +import sys +from pathlib import Path +from typing import Any, Dict + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.aio.workflows import at_least_once +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +@mcp.tool() +async def create_user_with_profile( + username: str, + email: str, + bio: str = "", + avatar_url: str = "", + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Create a user and their profile in separate steps. + + This demonstrates multi-step operations where each step is independently + idempotent. If the tool is retried after the user is created but before + the profile is created, the user creation will return the cached result + and only the profile creation will retry. + + Args: + username: Unique username for the user. + email: User's email address. + bio: Optional user bio. + avatar_url: Optional avatar URL. + context: The durable context. + + Returns: + Dictionary with user_id and profile_id. + """ + + # Step 1: Create user (idempotent). + async def create_user(): + users_map = SortedMap.ref("users") + user_id = f"user_{hash(username) % 100000}" + + # Store user data. + await users_map.insert( + context, + entries={ + user_id: json.dumps( + {"username": username, "email": email} + ).encode("utf-8") + }, + ) + + return user_id + + # If this tool is retried after user creation succeeds, this will + # return the cached user_id without re-creating the user. + user_id = await at_least_once( + f"create_user_{username}", + context, + create_user, + type=str, + ) + + # Step 2: Create profile (idempotent, separate guard). + async def create_profile(): + profiles_map = SortedMap.ref("profiles") + profile_id = f"profile_{user_id}" + + # Store profile data. + await profiles_map.insert( + context, + entries={ + profile_id: json.dumps( + {"user_id": user_id, "bio": bio, "avatar_url": avatar_url} + ).encode("utf-8") + }, + ) + + return profile_id + + # If this tool is retried after step 1 but before step 2 completes, + # only this step will execute (step 1 returns cached result). + profile_id = await at_least_once( + f"create_profile_{user_id}", + context, + create_profile, + type=str, + ) + + return { + "status": "success", + "user_id": user_id, + "profile_id": profile_id, + } + + +@mcp.tool() +async def get_user( + user_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Retrieve user data. + + Args: + user_id: The user ID to retrieve. + context: The durable context. + + Returns: + User data or error if not found. + """ + users_map = SortedMap.ref("users") + response = await users_map.get(context, key=user_id) + + if not response.HasField("value"): + return {"status": "error", "message": "User not found"} + + user_data = json.loads(response.value.decode("utf-8")) + + return {"status": "success", "user": user_data} + + +@mcp.tool() +async def get_profile( + profile_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Retrieve profile data. + + Args: + profile_id: The profile ID to retrieve. + context: The durable context. + + Returns: + Profile data or error if not found. + """ + profiles_map = SortedMap.ref("profiles") + response = await profiles_map.get(context, key=profile_id) + + if not response.HasField("value"): + return {"status": "error", "message": "Profile not found"} + + profile_data = json.loads(response.value.decode("utf-8")) + + return {"status": "success", "profile": profile_data} + + +async def main(): + """Start the multi-step example server.""" + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) From af9f922c4e50bb8e779ee3e9ecbe69275201e395 Mon Sep 17 00:00:00 2001 From: skaar Date: Wed, 19 Nov 2025 09:42:40 -0500 Subject: [PATCH 2/3] Convert define example to use Pydantic --- examples/define/example.py | 161 ++++++++++++++++++++----------------- uv.lock | 12 +-- 2 files changed, 94 insertions(+), 79 deletions(-) diff --git a/examples/define/example.py b/examples/define/example.py index 4ed0032..60293e0 100644 --- a/examples/define/example.py +++ b/examples/define/example.py @@ -1,16 +1,18 @@ """ -Technical Glossary with SortedMap CRUD Operations. +Technical Glossary with OrderedMap CRUD Operations. -Demonstrates all SortedMap operations: insert, get, range, reverse_range, -and remove, using a technical terms glossary as an example. +Demonstrates OrderedMap operations using Pydantic models and +from_model/as_model helpers: Insert, Search, Range, ReverseRange, and +Remove, using a technical terms glossary as an example. """ import asyncio -import json import sys import time from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List + +from pydantic import BaseModel # Add api/ to Python path for generated proto code. api_path = Path(__file__).parent.parent.parent / "api" @@ -18,13 +20,26 @@ sys.path.insert(0, str(api_path)) from reboot.mcp.server import DurableMCP, DurableContext -from reboot.std.collections.v1.sorted_map import SortedMap +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, +) +from rebootdev.protobuf import from_model, as_model from uuid7 import create as uuid7 # type: ignore[import-untyped] # Initialize MCP server. mcp = DurableMCP(path="/mcp") +# Pydantic model for term entries. +class TermEntry(BaseModel): + term: str + definition: str + category: str = "general" + examples: List[str] = [] + timestamp: int + + @mcp.tool() async def add_term( term: str, @@ -36,7 +51,7 @@ async def add_term( """ Add a technical term to the glossary. - Stores the term in two SortedMaps: + Stores the term in two OrderedMaps: - Alphabetically by term name (for lookup and browsing). - Chronologically by UUIDv7 (for recent additions). @@ -50,30 +65,33 @@ async def add_term( Returns: Confirmation with the term and timestamp. """ - terms_map = SortedMap.ref("terms") - recent_map = SortedMap.ref("recent") + terms_map = OrderedMap.ref("terms") + recent_map = OrderedMap.ref("recent") timestamp = int(time.time() * 1000) - term_data = { - "term": term, - "definition": definition, - "category": category, - "examples": examples or [], - "timestamp": timestamp, - } + # Create Pydantic model instance. + term_entry = TermEntry( + term=term, + definition=definition, + category=category, + examples=examples or [], + timestamp=timestamp, + ) # Insert into alphabetical map (keyed by term). await terms_map.insert( context, - entries={term.lower(): json.dumps(term_data).encode("utf-8")}, + key=term.lower(), + value=from_model(term_entry), ) # Insert into chronological map (keyed by UUIDv7). recent_key = str(uuid7()) await recent_map.insert( context, - entries={recent_key: json.dumps(term_data).encode("utf-8")}, + key=recent_key, + value=from_model(term_entry), ) return { @@ -91,7 +109,7 @@ async def define( """ Look up a term's definition. - Uses the `get` method for point lookup. + Uses the `search` method for point lookup. Args: term: The term to look up. @@ -100,21 +118,21 @@ async def define( Returns: Term definition or error if not found. """ - terms_map = SortedMap.ref("terms") + terms_map = OrderedMap.ref("terms") - response = await terms_map.get(context, key=term.lower()) + response = await terms_map.search(context, key=term.lower()) - if not response.HasField("value"): + if not response.found: return { "status": "error", "message": f"Term '{term}' not found in glossary", } - term_data = json.loads(response.value.decode("utf-8")) + term_entry = as_model(response.value, TermEntry) return { "status": "success", - "term": term_data, + "term": term_entry.model_dump(), } @@ -136,12 +154,12 @@ async def remove_term( Returns: Confirmation of removal. """ - terms_map = SortedMap.ref("terms") + terms_map = OrderedMap.ref("terms") # Check if term exists first. - response = await terms_map.get(context, key=term.lower()) + response = await terms_map.search(context, key=term.lower()) - if not response.HasField("value"): + if not response.found: return { "status": "error", "message": f"Term '{term}' not found", @@ -150,7 +168,7 @@ async def remove_term( # Remove from alphabetical map. await terms_map.remove( context, - keys=[term.lower()], + key=term.lower(), ) return { @@ -178,7 +196,7 @@ async def list_terms( Returns: List of terms in alphabetical order. """ - terms_map = SortedMap.ref("terms") + terms_map = OrderedMap.ref("terms") if start_with: # Range starting from prefix. @@ -196,13 +214,13 @@ async def list_terms( terms = [] for entry in response.entries: - term_data = json.loads(entry.value.decode("utf-8")) + term_entry = as_model(entry.value, TermEntry) terms.append({ - "term": term_data["term"], - "definition": term_data["definition"][:100] + "..." - if len(term_data["definition"]) > 100 - else term_data["definition"], - "category": term_data["category"], + "term": term_entry.term, + "definition": term_entry.definition[:100] + "..." + if len(term_entry.definition) > 100 + else term_entry.definition, + "category": term_entry.category, }) return { @@ -231,7 +249,7 @@ async def browse_category( Returns: Terms in the specified category. """ - terms_map = SortedMap.ref("terms") + terms_map = OrderedMap.ref("terms") # Get all terms and filter by category. # Note: In production, you'd use a separate category-indexed map. @@ -242,12 +260,12 @@ async def browse_category( terms = [] for entry in response.entries: - term_data = json.loads(entry.value.decode("utf-8")) - if term_data["category"] == category: + term_entry = as_model(entry.value, TermEntry) + if term_entry.category == category: terms.append({ - "term": term_data["term"], - "definition": term_data["definition"], - "examples": term_data["examples"], + "term": term_entry.term, + "definition": term_entry.definition, + "examples": term_entry.examples, }) if len(terms) >= limit: break @@ -278,7 +296,7 @@ async def recent_terms( Returns: Recently added terms in reverse chronological order. """ - recent_map = SortedMap.ref("recent") + recent_map = OrderedMap.ref("recent") response = await recent_map.reverse_range( context, @@ -287,14 +305,14 @@ async def recent_terms( terms = [] for entry in response.entries: - term_data = json.loads(entry.value.decode("utf-8")) + term_entry = as_model(entry.value, TermEntry) terms.append({ - "term": term_data["term"], - "definition": term_data["definition"][:100] + "..." - if len(term_data["definition"]) > 100 - else term_data["definition"], - "category": term_data["category"], - "added_at": term_data["timestamp"], + "term": term_entry.term, + "definition": term_entry.definition[:100] + "..." + if len(term_entry.definition) > 100 + else term_entry.definition, + "category": term_entry.category, + "added_at": term_entry.timestamp, }) return { @@ -313,7 +331,8 @@ async def search_terms( """ Search for terms by prefix. - Demonstrates range query with start and end boundaries. + Demonstrates range query with client-side prefix filtering. + Note: OrderedMap.range() only supports start_key, not end_key. Args: prefix: Search prefix. @@ -323,37 +342,33 @@ async def search_terms( Returns: Terms matching the prefix. """ - terms_map = SortedMap.ref("terms") + terms_map = OrderedMap.ref("terms") - # Calculate end key for prefix range. - # For prefix "api", we want keys >= "api" and < "apj". start_key = prefix.lower() - # Increment last character for upper bound. - end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1) if prefix else None - if end_key: - response = await terms_map.range( - context, - start_key=start_key, - end_key=end_key.lower(), - limit=limit, - ) - else: - response = await terms_map.range( - context, - start_key=start_key, - limit=limit, - ) + # Fetch more than limit to account for client-side filtering. + response = await terms_map.range( + context, + start_key=start_key, + limit=limit * 2, + ) terms = [] for entry in response.entries: - term_data = json.loads(entry.value.decode("utf-8")) + # Check if key still matches prefix. + if not entry.key.startswith(start_key): + break + + term_entry = as_model(entry.value, TermEntry) terms.append({ - "term": term_data["term"], - "definition": term_data["definition"], - "category": term_data["category"], + "term": term_entry.term, + "definition": term_entry.definition, + "category": term_entry.category, }) + if len(terms) >= limit: + break + return { "status": "success", "prefix": prefix, @@ -364,7 +379,7 @@ async def search_terms( async def main(): """Start the technical glossary server.""" - await mcp.application().run() + await mcp.application(servicers=ordered_map_servicers()).run() if __name__ == "__main__": diff --git a/uv.lock b/uv.lock index cedb75c..0ab49f8 100644 --- a/uv.lock +++ b/uv.lock @@ -284,7 +284,7 @@ wheels = [ [[package]] name = "durable-mcp" -version = "0.5.0" +version = "0.6.0" source = { editable = "." } dependencies = [ { name = "mcp" }, @@ -307,7 +307,7 @@ requires-dist = [ { name = "mcp", specifier = "==1.21.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = "==1.2.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = "==8.4.1" }, - { name = "reboot", specifier = "==0.39.3" }, + { name = "reboot", specifier = "==0.40.1" }, { name = "twine", marker = "extra == 'dev'" }, { name = "types-protobuf", marker = "extra == 'dev'", specifier = ">=4.24.0.20240129" }, { name = "uuid7-standard", specifier = ">=1.1.0" }, @@ -1418,7 +1418,7 @@ wheels = [ [[package]] name = "reboot" -version = "0.39.3" +version = "0.40.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiofiles" }, @@ -1462,9 +1462,9 @@ dependencies = [ { name = "websockets" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/a5/89/f1e03159918f72388176d00af2ee21d06f10655d2f1ecf73ffa126c3049d/reboot-0.39.3-py3-none-macosx_13_0_arm64.whl", hash = "sha256:fcacb37fc7a43e0444120180ffb0b68f44e985cab5214913e141b9b9bdb943a5", size = 18290389, upload-time = "2025-11-08T21:31:45.597Z" }, - { url = "https://files.pythonhosted.org/packages/fd/c4/7234ea8ac6fb80f1dd6f18020ed376bd5ce45578541f8ff557a7b10a210e/reboot-0.39.3-py3-none-manylinux_2_35_aarch64.whl", hash = "sha256:862528e8e4073844a30b6bce9a472bbfd132ddf9bbfe545efcb3978a39ff8936", size = 22041443, upload-time = "2025-11-08T21:19:12.786Z" }, - { url = "https://files.pythonhosted.org/packages/54/e7/3f73e7d60c1e6c375a1c8c207ec672e99d9203dab7595c94863d6bc4df72/reboot-0.39.3-py3-none-manylinux_2_35_x86_64.whl", hash = "sha256:85a029aa7545d0634737a97d473194567190122f0d2e0ffddd3dbaa2c0bcf481", size = 22159062, upload-time = "2025-11-08T21:18:30.383Z" }, + { url = "https://files.pythonhosted.org/packages/0b/b8/41dac97ad72cbe3051429f13b8ad5e56599f2abad2126796ad178966b94f/reboot-0.40.1-py3-none-macosx_13_0_arm64.whl", hash = "sha256:d6a5f85ddac802ffb902e7e64a8f7151186fc044e6efcac0392ef8174ab99710", size = 18318771, upload-time = "2025-11-13T18:14:14.174Z" }, + { url = "https://files.pythonhosted.org/packages/cc/3e/3043d1b69f8a1dd677c52aebe75541c381c9ab76a5529054c5c3ad588b1b/reboot-0.40.1-py3-none-manylinux_2_35_aarch64.whl", hash = "sha256:e4277bed694aba535cdb771d572ab3a9c882a37693c54f86165e5b8c3b9f7ac4", size = 22069832, upload-time = "2025-11-13T18:01:46.694Z" }, + { url = "https://files.pythonhosted.org/packages/39/ac/2541d1143a98a0390bef57a9407b64778224d506713bb917c927fe429300/reboot-0.40.1-py3-none-manylinux_2_35_x86_64.whl", hash = "sha256:11b0360d597974a41d390f6566bb2080ff6267afa2564cc1c36f057088178d6d", size = 22187636, upload-time = "2025-11-13T18:01:42.228Z" }, ] [[package]] From 4ea94404665d353b5aee2afc73895c33937504a4 Mon Sep 17 00:00:00 2001 From: skaar Date: Fri, 21 Nov 2025 19:48:48 +0000 Subject: [PATCH 3/3] Use Pydantic with both SortedMap (output json) and OrderedMap --- examples/audit/audit.py | 47 ++-- examples/audit/example.py | 11 +- examples/define/README.md | 458 +++++++++------------------------ examples/define/example.py | 4 +- examples/document/README.md | 309 +++++++++------------- examples/document/example.py | 143 ++++++---- examples/processing/example.py | 37 ++- examples/run.py | 4 +- examples/steps/example.py | 49 ++-- 9 files changed, 438 insertions(+), 624 deletions(-) diff --git a/examples/audit/audit.py b/examples/audit/audit.py index 4a8023a..78e6033 100644 --- a/examples/audit/audit.py +++ b/examples/audit/audit.py @@ -6,16 +6,33 @@ """ import functools -import json import time from typing import Any, Callable, Dict, Optional, Union from uuid import UUID +from pydantic import BaseModel, Field + from reboot.mcp.server import DurableContext from reboot.std.collections.v1.sorted_map import SortedMap from uuid7 import create as uuid7 # type: ignore[import-untyped] +# Pydantic model for audit log entries. +class AuditEntry(BaseModel): + """Audit log entry with structured data.""" + + timestamp: int + tool: Optional[str] = None + inputs: Optional[Dict[str, Any]] = None + outputs: Optional[Any] = None + success: Optional[bool] = None + duration_seconds: Optional[float] = None + error: Optional[str] = None + + # Allow extra fields for custom audit data. + model_config = {"extra": "allow"} + + def timestamp_to_uuidv7(timestamp_ms: int) -> UUID: """ Create a UUIDv7 from a Unix timestamp in milliseconds. @@ -36,12 +53,12 @@ def timestamp_to_uuidv7(timestamp_ms: int) -> UUID: # - 2 bits: variant (10) # - 62 bits: random - # Create minimal UUIDv7 with timestamp and zeros for random bits. - timestamp_48 = timestamp_ms & 0xFFFFFFFFFFFF # 48 bits + # Create minimal `UUIDv7` with timestamp and zeros for random bits. + timestamp_48 = timestamp_ms & 0xFFFFFFFFFFFF # 48 bits. - # Build the 128-bit UUID. - uuid_int = (timestamp_48 << 80) | (0x7 << 76) # timestamp + version - uuid_int |= (0x2 << 62) # variant bits + # Build the 128-bit `UUID`. + uuid_int = (timestamp_48 << 80) | (0x7 << 76) # Timestamp + version. + uuid_int |= (0x2 << 62) # Variant bits. return UUID(int=uuid_int) @@ -60,17 +77,17 @@ async def _write_audit( data: Dictionary of audit data to store. """ timestamp = int(time.time() * 1000) - # Use UUIDv7 for time-ordered, unique keys. + # Use `UUIDv7` for time-ordered, unique keys. key = str(uuid7()) - # Add timestamp to data if not present. - audit_data = {"timestamp": timestamp, **data} + # Create Pydantic model with timestamp and provided data. + audit_entry = AuditEntry(timestamp=timestamp, **data) try: audit_map = SortedMap.ref(f"audit:{log_name}") await audit_map.insert( context, - entries={key: json.dumps(audit_data).encode("utf-8")}, + entries={key: audit_entry.model_dump_json().encode("utf-8")}, ) except Exception: # Don't fail the original operation if audit fails. @@ -117,18 +134,18 @@ async def my_tool(name: str, context: DurableContext = None): - Measures execution duration - Automatically adds: tool, inputs, outputs, success, duration_seconds """ - # Explicit logging: audit(log_name, context, data) + # Explicit logging: `audit(log_name, context, data)`. if context is not None and data is not None: return _write_audit(log_name, context, data) - # Decorator mode: @audit(log_name) + # Decorator mode: `@audit(log_name)`. def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args, **kwargs) -> Any: - # Assume context is in kwargs. + # Assume `context` is in `kwargs`. ctx = kwargs.get("context") - # Capture inputs (exclude context). + # Capture inputs (exclude `context`). inputs = {k: v for k, v in kwargs.items() if k != "context"} # Call function and capture timing. @@ -144,7 +161,7 @@ async def wrapper(*args, **kwargs) -> Any: error = f"{type(e).__name__}: {str(e)}" raise finally: - # Log if we have context. + # Log if we have `context`. if ctx: duration = time.time() - start_time audit_data = { diff --git a/examples/audit/example.py b/examples/audit/example.py index 200b101..96d7f10 100644 --- a/examples/audit/example.py +++ b/examples/audit/example.py @@ -5,7 +5,6 @@ """ import asyncio -import json import sys from pathlib import Path from typing import Any, Dict, Optional @@ -15,7 +14,7 @@ if api_path.exists(): sys.path.insert(0, str(api_path)) -from audit import audit, timestamp_to_uuidv7 +from audit import AuditEntry, audit, timestamp_to_uuidv7 from reboot.mcp.server import DurableMCP, DurableContext from reboot.std.collections.v1.sorted_map import SortedMap @@ -114,7 +113,7 @@ async def get_audit_log( """ audit_map = SortedMap.ref(f"audit:{log_name}") - # Build range query using UUIDv7 boundaries. + # Build range query using `UUIDv7` boundaries. if begin is not None and end is not None: # Range query: begin to end. begin_key = str(timestamp_to_uuidv7(begin)) @@ -151,11 +150,11 @@ async def get_audit_log( limit=limit, ) - # Parse entries. + # Parse entries using Pydantic. entries = [] for entry in response.entries: - data = json.loads(entry.value.decode("utf-8")) - entries.append(data) + audit_entry = AuditEntry.model_validate_json(entry.value) + entries.append(audit_entry.model_dump()) return { "log_name": log_name, diff --git a/examples/define/README.md b/examples/define/README.md index b212527..0ceef10 100644 --- a/examples/define/README.md +++ b/examples/define/README.md @@ -1,97 +1,128 @@ # Technical Glossary -Complete SortedMap CRUD reference using a technical glossary. +Complete OrderedMap CRUD reference using a technical glossary with Pydantic models. ## Overview -Demonstrates all SortedMap operations through a practical use case: -maintaining a technical glossary with both alphabetical and -chronological indexes. +Demonstrates all OrderedMap operations with type-safe Pydantic models through a practical use case: maintaining a technical glossary with both alphabetical and chronological indexes. -## SortedMap Operations +## OrderedMap + Pydantic Pattern + +This example shows the **OrderedMap with protobuf Values** pattern: +- Uses `OrderedMap` for durable key-value storage +- Pydantic models for type safety and validation +- `from_model()` / `as_model()` helpers for serialization + +## OrderedMap Operations | Operation | Method | Use Case | |-----------|--------|----------| -| Insert | `insert(context, entries={...})` | Add terms | -| Get | `get(context, key="...")` | Look up term | +| Insert | `insert(context, key="...", value=...)` | Add terms | +| Search | `search(context, key="...")` | Look up term | | Range | `range(context, start_key=..., limit=...)` | Browse alphabetically | -| Range (bounded) | `range(context, start_key=..., end_key=..., limit=...)` | Prefix search | | Reverse Range | `reverse_range(context, limit=...)` | Recent additions | -| Remove | `remove(context, keys=[...])` | Delete terms | +| Remove | `remove(context, key="...")` | Delete terms | ## Architecture -Two SortedMaps for different access patterns: +### Pydantic Model + +```python +from pydantic import BaseModel + +class TermEntry(BaseModel): + """Type-safe term entry.""" + term: str + definition: str + category: str = "general" + examples: List[str] = [] + timestamp: int +``` + +### Two OrderedMaps for Different Access Patterns ```python # Map 1: Alphabetical index (keyed by term name). -terms_map = SortedMap.ref("terms") -# Key: "api" -> Value: {"term": "API", "definition": "...", ...} +terms_map = OrderedMap.ref("terms") +# Key: "api" -> Value: `protobuf.Value(TermEntry)`. -# Map 2: Chronological index (keyed by UUIDv7). -recent_map = SortedMap.ref("recent") -# Key: "018c1234-..." -> Value: {"term": "API", "definition": "...", ...} +# Map 2: Chronological index (keyed by `UUIDv7`). +recent_map = OrderedMap.ref("recent") +# Key: "018c1234-..." -> Value: `protobuf.Value(TermEntry)`. ``` -## Usage +## Usage Examples -### Insert +### Insert with Pydantic -Add terms to both indexes: +Add terms with type validation: ```python +from rebootdev.protobuf import from_model + @mcp.tool() async def add_term( term: str, definition: str, category: str = "general", + examples: List[str] = None, context: DurableContext = None, -) -> dict: +) -> Dict[str, Any]: """Add a technical term to the glossary.""" - term_data = { - "term": term, - "definition": definition, - "category": category, - "timestamp": int(time.time() * 1000), - } - - # Insert into alphabetical map (keyed by term). + timestamp = int(time.time() * 1000) + + # Create Pydantic model instance. + term_entry = TermEntry( + term=term, + definition=definition, + category=category, + examples=examples or [], + timestamp=timestamp, + ) + + # Insert into alphabetical map using `from_model()`. await terms_map.insert( context, - entries={term.lower(): json.dumps(term_data).encode("utf-8")}, + key=term.lower(), + value=from_model(term_entry), ) - # Insert into chronological map (keyed by UUIDv7). + # Insert into chronological map. recent_key = str(uuid7()) await recent_map.insert( context, - entries={recent_key: json.dumps(term_data).encode("utf-8")}, + key=recent_key, + value=from_model(term_entry), ) return {"status": "success", "term": term} ``` -### Get +### Search with Pydantic -Point lookup for term definition: +Point lookup with type-safe deserialization: ```python +from rebootdev.protobuf import as_model + @mcp.tool() async def define( term: str, context: DurableContext = None, -) -> dict: +) -> Dict[str, Any]: """Look up a term's definition.""" - response = await terms_map.get(context, key=term.lower()) + response = await terms_map.search(context, key=term.lower()) - if not response.HasField("value"): + if not response.found: return {"status": "error", "message": "Term not found"} - term_data = json.loads(response.value.decode("utf-8")) - return {"status": "success", "term": term_data} + # Convert protobuf `Value` to Pydantic model. + term_entry = as_model(response.value, TermEntry) + + return {"status": "success", "term": term_entry.model_dump()} ``` -### Range +### Range Query Browse terms alphabetically: @@ -101,348 +132,113 @@ async def list_terms( start_with: str = "", limit: int = 50, context: DurableContext = None, -) -> dict: +) -> Dict[str, Any]: """List terms alphabetically.""" - if start_with: - # Range starting from prefix. - response = await terms_map.range( - context, - start_key=start_with.lower(), - limit=limit, - ) - else: - # Range from beginning. - response = await terms_map.range( - context, - limit=limit, - ) - - # Parse and return entries. - # ... -``` - -### Range (Bounded) - -Prefix search with start and end boundaries: - -```python -@mcp.tool() -async def search_terms( - prefix: str, - limit: int = 20, - context: DurableContext = None, -) -> dict: - """Search for terms by prefix.""" - # Calculate end key for prefix range. - start_key = prefix.lower() - # Increment last character for upper bound. - end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1) - response = await terms_map.range( context, - start_key=start_key, - end_key=end_key.lower(), + start_key=start_with.lower() if start_with else None, limit=limit, ) - # Parse and return entries. - # ... + terms = [] + for entry in response.entries: + # Deserialize each entry using `as_model()`. + term_entry = as_model(entry.value, TermEntry) + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition[:100] + "..." + if len(term_entry.definition) > 100 + else term_entry.definition, + "category": term_entry.category, + }) + + return {"status": "success", "count": len(terms), "terms": terms} ``` ### Reverse Range -Get recently added terms (UUIDv7 keys): +Get recently added terms: ```python @mcp.tool() async def recent_terms( limit: int = 20, context: DurableContext = None, -) -> dict: - """Get recently added terms.""" +) -> Dict[str, Any]: + """Get recently added terms (newest first).""" response = await recent_map.reverse_range( context, limit=limit, ) - # Parse and return entries (newest first). - # ... + terms = [] + for entry in response.entries: + term_entry = as_model(entry.value, TermEntry) + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition[:100] + "...", + "category": term_entry.category, + "added_at": term_entry.timestamp, + }) + + return {"status": "success", "count": len(terms), "recent_terms": terms} ``` ### Remove -Delete terms from glossary: +Delete a term: ```python @mcp.tool() async def remove_term( term: str, context: DurableContext = None, -) -> dict: +) -> Dict[str, Any]: """Remove a term from the glossary.""" - # Check if term exists first. - response = await terms_map.get(context, key=term.lower()) + # Check if term exists. + response = await terms_map.search(context, key=term.lower()) - if not response.HasField("value"): + if not response.found: return {"status": "error", "message": "Term not found"} # Remove from alphabetical map. - await terms_map.remove( - context, - keys=[term.lower()], - ) + await terms_map.remove(context, key=term.lower()) return {"status": "success", "message": f"Removed '{term}'"} ``` -## Key Concepts - -### Point Lookup with get() - -Single key retrieval: - -```python -response = await terms_map.get(context, key="api") - -if not response.HasField("value"): - # Key not found. - return {"error": "Not found"} - -# Key found. -data = json.loads(response.value.decode("utf-8")) -``` - -Returns `GetResponse` with optional `value` field. - -### Range Queries - -Ascending order traversal: - -```python -# All keys from "api" onwards. -response = await terms_map.range( - context, - start_key="api", - limit=50, -) - -# Keys from "api" to "apz" (exclusive). -response = await terms_map.range( - context, - start_key="api", - end_key="apz", - limit=50, -) - -# First 50 keys in map. -response = await terms_map.range( - context, - limit=50, -) -``` - -Parameters: - -- `start_key`: Inclusive lower bound (optional) -- `end_key`: Exclusive upper bound (optional) -- `limit`: Maximum entries to return (required) - -Returns `RangeResponse` with `entries` list. - -### Reverse Range - -Descending order traversal (largest to smallest keys): - -```python -# Get 20 most recent entries (UUIDv7 keys are time-ordered). -response = await recent_map.reverse_range( - context, - limit=20, -) - -# Keys from "z" down to "m" (exclusive). -response = await terms_map.reverse_range( - context, - start_key="z", - end_key="m", - limit=50, -) -``` - -Use cases: Recent items, reverse alphabetical browsing. +## Registering OrderedMap Servicers -### UUIDv7 for Time Ordering - -UUIDv7 embeds timestamp in first 48 bits: - -```python -from uuid7 import create as uuid7 - -# Generate time-ordered key. -key = str(uuid7()) # "018c1234-5678-7abc-9012-3456789abcdef" - -# Later keys sort after earlier keys. -key1 = str(uuid7()) # At time T1. -time.sleep(0.1) -key2 = str(uuid7()) # At time T2. -# key1 < key2 (lexicographically) -``` - -Benefits: Natural chronological sorting, no collisions, works with -`reverse_range()` for recent items. - -### Prefix Search Pattern - -Find all keys starting with prefix "api": - -```python -prefix = "api" -start_key = prefix.lower() -# Increment last character to get exclusive upper bound. -end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1) # "api" -> "apj" - -response = await terms_map.range( - context, - start_key=start_key, - end_key=end_key, - limit=100, -) -``` - -Works because SortedMap uses lexicographic ordering. - -## Best Practices - -Always check `HasField("value")` for `get()`: - -```python -# Correct. -response = await map.get(context, key="term") -if not response.HasField("value"): - return {"error": "Not found"} - -# Wrong (will raise AttributeError if value is unset). -if not response.value: - pass -``` - -Use `limit` parameter for `range()` queries: - -```python -# Required - limit prevents unbounded results. -response = await map.range(context, limit=100) - -# Error - limit is required. -response = await map.range(context) -``` - -Lowercase keys for case-insensitive lookup while preserving original -capitalization: - -```python -# Store original term in data, use lowercase for key. -term_data = { - "term": term, # Preserves "gRPC", "Kubernetes", etc. - "definition": definition, - # ... -} - -await terms_map.insert( - context, - entries={term.lower(): json.dumps(term_data).encode("utf-8")}, -) - -# Lookup with lowercase (case-insensitive). -response = await terms_map.get(context, key=term.lower()) -# Returns: {"term": "gRPC", ...} regardless of input case -``` - -This allows lookups like `define("grpc")`, `define("GRPC")`, and -`define("gRPC")` to all return the same term with its original -capitalization. - -## Common Patterns - -### Recent Items with UUIDv7 - -```python -# Store with UUIDv7 keys for time ordering. -recent_map = SortedMap.ref("recent") -await recent_map.insert( - context, - entries={str(uuid7()): data}, -) - -# Get N most recent. -response = await recent_map.reverse_range(context, limit=10) -``` - -### Dual Indexing - -```python -# Primary index: optimized for lookups. -terms_map = SortedMap.ref("terms") -await terms_map.insert(context, entries={term.lower(): data}) - -# Secondary index: optimized for chronological access. -recent_map = SortedMap.ref("recent") -await recent_map.insert(context, entries={str(uuid7()): data}) -``` - -### Batch Operations +**Important**: OrderedMap requires servicer registration: ```python -# Insert multiple entries at once (single call). -await terms_map.insert( - context, - entries={ - "api": json.dumps({...}).encode("utf-8"), - "rest": json.dumps({...}).encode("utf-8"), - "grpc": json.dumps({...}).encode("utf-8"), - }, +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, ) -# Remove multiple keys at once (single call). -await terms_map.remove( - context, - keys=["api", "rest", "grpc"], -) +async def main(): + await mcp.application(servicers=ordered_map_servicers()).run() ``` -### Multiple Operations on Same Map - -When calling methods on the same named SortedMap multiple times within -the same context, use `.idempotently()` with unique aliases: - -```python -# Multiple inserts on same map require idempotency guards. -terms_map = SortedMap.ref("terms") - -await terms_map.idempotently("insert_api").insert( - context, - entries={"api": data1}, -) +## Benefits of OrderedMap + Pydantic -await terms_map.idempotently("insert_rest").insert( - context, - entries={"rest": data2}, -) -``` +- Type Safety: Pydantic validates all data structures +- Clean API: `from_model()` / `as_model()` are explicit and readable +- IDE Support: Full autocomplete with `term_entry.field` +- Protobuf Integration: Works seamlessly with protobuf Values +- Validation: Catch errors at serialization boundaries -Different named maps don't require guards: +## When to Use OrderedMap vs SortedMap -```python -# These are different maps - no conflict. -terms_map = SortedMap.ref("terms") -recent_map = SortedMap.ref("recent") +**Use OrderedMap when:** +- You want protobuf Value integration +- Type safety with Pydantic is important +- You need the `from_model` / `as_model` pattern -await terms_map.insert(context, entries={...}) # Fine -await recent_map.insert(context, entries={...}) # Also fine -``` - -## Running +**Use SortedMap when:** +- You prefer working with raw bytes +- You need batch operations (`entries={...}`) +- Simplicity is preferred over type safety -```bash -cd examples/define -uv run python example.py -``` +See other examples for SortedMap + Pydantic pattern (audit, steps, processing). diff --git a/examples/define/example.py b/examples/define/example.py index 60293e0..cbf5668 100644 --- a/examples/define/example.py +++ b/examples/define/example.py @@ -79,14 +79,14 @@ async def add_term( timestamp=timestamp, ) - # Insert into alphabetical map (keyed by term). + # Insert into alphabetical map (keyed by `term`). await terms_map.insert( context, key=term.lower(), value=from_model(term_entry), ) - # Insert into chronological map (keyed by UUIDv7). + # Insert into chronological map (keyed by `UUIDv7`). recent_key = str(uuid7()) await recent_map.insert( context, diff --git a/examples/document/README.md b/examples/document/README.md index 98cf80c..269a780 100644 --- a/examples/document/README.md +++ b/examples/document/README.md @@ -1,13 +1,16 @@ # Document Processing -Document processing pipeline combining `at_least_once` and -`at_most_once` patterns. +Document processing pipeline combining `at_least_once` and `at_most_once` patterns with OrderedMap and Pydantic models. ## Overview -Real-world workflows often need both idempotency patterns. Use -`at_least_once` for idempotent operations (reads, storage) and -`at_most_once` for operations with side effects (external APIs). +Real-world workflows often need both idempotency patterns. Use `at_least_once` for idempotent operations (reads, storage) and `at_most_once` for operations with side effects (external APIs). + +This example demonstrates: +- OrderedMap for durable storage +- Pydantic models for type-safe data structures +- `from_model()` / `as_model()` for serialization +- Mixed idempotency patterns in a single workflow ## Workflow @@ -19,9 +22,45 @@ Upload File -> Process Document |- Step 4: Store result (at_least_once) ``` -## Pattern +## Pydantic Models + +```python +from pydantic import BaseModel + +class FileData(BaseModel): + """File data model.""" + file_id: str + content: str + metadata: Dict[str, str] = {} + +class OCRResult(BaseModel): + """OCR processing result.""" + job_id: str + step: str + text: str + +class TranslationResult(BaseModel): + """Translation processing result.""" + job_id: str + step: str + text: str + language: str + +class JobResult(BaseModel): + """Final job result.""" + job_id: str + file_id: str + target_language: str + status: str + result: str +``` + +## Pattern with OrderedMap + Pydantic ```python +from rebootdev.protobuf import from_model, as_model +from reboot.std.collections.ordered_map.v1.ordered_map import OrderedMap + @mcp.tool() async def process_document( file_id: str, @@ -30,12 +69,19 @@ async def process_document( ) -> dict: """Process document through OCR and translation pipeline.""" + files_map = OrderedMap.ref("files") + results_map = OrderedMap.ref("results") + jobs_map = OrderedMap.ref("jobs") + # Step 1: Idempotent file lookup. async def get_file_metadata(): - response = await files_map.get(context, key=file_id) - if not response.HasField("value"): + response = await files_map.search(context, key=file_id) + if not response.found: raise ValueError(f"File {file_id} not found") - return json.loads(response.value.decode("utf-8")) + + # Deserialize with Pydantic. + file_data = as_model(response.value, FileData) + return file_data.model_dump() file_metadata = await at_least_once( f"get_file_{file_id}", @@ -47,10 +93,17 @@ async def process_document( # Step 2: OCR (external API, at most once). async def perform_ocr(): extracted_text = await simulate_ocr_api(file_metadata["content"]) - # Store intermediate result with idempotency guard. + + # Create Pydantic model and store with from_model(). + ocr_result = OCRResult( + job_id=job_id, + step="ocr", + text=extracted_text, + ) await results_map.idempotently(f"store_ocr_{job_id}").insert( context, - entries={...}, + key=f"{job_id}_ocr", + value=from_model(ocr_result), ) return extracted_text @@ -75,10 +128,18 @@ async def process_document( ocr_text, target_language, ) - # Store intermediate result with idempotency guard. + + # Create Pydantic model and store. + translation_result = TranslationResult( + job_id=job_id, + step="translation", + text=translated_text, + language=target_language, + ) await results_map.idempotently(f"store_translation_{job_id}").insert( context, - entries={...}, + key=f"{job_id}_translation", + value=from_model(translation_result), ) return translated_text @@ -97,9 +158,20 @@ async def process_document( except QuotaExceededError as e: return {"status": "error", "step": "translation", "error": str(e)} - # Step 4: Idempotent final storage. + # Step 4: Store final result (idempotent write). async def store_job_result(): - await jobs_map.insert(context, entries={job_id: ...}) + job_result = JobResult( + job_id=job_id, + file_id=file_id, + target_language=target_language, + status="completed", + result=translated_text, + ) + await jobs_map.insert( + context, + key=job_id, + value=from_model(job_result), + ) return job_id final_job_id = await at_least_once( @@ -112,191 +184,58 @@ async def process_document( return {"status": "success", "job_id": final_job_id} ``` -## Pattern Selection - -| Operation | Pattern | Reason | -|-----------|---------|--------| -| File lookup | `at_least_once` | Idempotent read | -| OCR API call | `at_most_once` | External API side effects | -| Translation API | `at_most_once` | External API quota | -| Store result | `at_least_once` | Idempotent write | - ## Error Handling -### For at_most_once Steps - -Each external API call needs three exception handlers: - -```python -try: - result = await at_most_once( - "operation", - context, - operation_func, - type=str, - retryable_exceptions=[NetworkError], - ) -except NetworkError: - # Retryable error after retries exhausted. - return {"status": "error", "retryable": True} -except AtMostOnceFailedBeforeCompleting: - # Previous attempt failed with non-retryable error. - return {"status": "error", "retryable": False} -except (InvalidDocumentError, QuotaExceededError) as e: - # First attempt with non-retryable error. - return {"status": "error", "message": str(e)} -``` - -### For at_least_once Steps - -Let exceptions propagate (they'll cause workflow retry): - -```python -# No try/except needed - idempotent operations can safely retry. -result = await at_least_once( - "operation", - context, - operation_func, - type=dict, -) -``` - -## Multiple Operations on Same SortedMap - -When calling a method on the **same named SortedMap** multiple times -within the same context, use `.idempotently()` with a unique alias for -each call: - -```python -# Inside an `at_most_once` or `at_least_once` callable: -async def perform_operation(): - results_map = SortedMap.ref("results") - - # First insert on "results" map. - await results_map.idempotently("store_step1").insert( - context, - entries={key1: value1}, - ) +### Retryable Errors +- `NetworkError`: Temporary network issues (API timeouts) +- Handled by `at_most_once` with automatic retry - # Second insert on same "results" map - needs different alias. - await results_map.idempotently("store_step2").insert( - context, - entries={key2: value2}, - ) +### Non-Retryable Errors +- `InvalidDocumentError`: Document format not supported +- `QuotaExceededError`: API quota exceeded +- Caught and returned as error responses - return result -``` - -Without `.idempotently()`, the second call raises: - -``` -ValueError: To call 'rbt.std.collections.v1.SortedMapMethods.Insert' -of 'results' more than once using the same context an idempotency -alias or key must be specified -``` +## Idempotency Guards -**Different maps don't need idempotency guards:** +### at_least_once +- Used for: File reads, result storage +- Behavior: Caches return value, retries until success +- Ideal for: Idempotent operations that should eventually complete -```python -# These are different named maps - no conflict. -results_map = SortedMap.ref("results") -jobs_map = SortedMap.ref("jobs") +### at_most_once +- Used for: External API calls (OCR, translation) +- Behavior: Executes at most once, even if retried +- Ideal for: Operations with side effects (charges, state changes) -await results_map.insert(context, entries={...}) # Fine -await jobs_map.insert(context, entries={...}) # Also fine -``` +### Combined Pattern +The `.idempotently()` modifier on `insert()` ensures intermediate results are stored exactly once, even if the enclosing `at_most_once` block retries. -**For loop operations:** +## Registering Servicers -Use dynamic aliases when calling the same map in loops: +OrderedMap requires servicer registration: ```python -items_map = SortedMap.ref("items") -for i in range(5): - await items_map.idempotently(f"insert_item_{i}").insert( - context, - entries={f"item_{i}": data}, - ) -``` - -This pattern applies to all SortedMap methods (`insert`, `get`, -`range`, `remove`) when called multiple times on the same named map -within the same context. - -## Retry Scenarios - -### Network Error During OCR - -1. Step 1: File lookup succeeds -2. Step 2: OCR API raises `NetworkError` -3. `at_most_once` retries OCR -4. OCR succeeds on retry -5. Steps 3-4 proceed normally - -### Invalid Document Error - -1. Step 1: File lookup succeeds -2. Step 2: OCR API raises `InvalidDocumentError` -3. Exception propagates (not in `retryable_exceptions`) -4. Tool returns error response - -### Tool Retry After OCR Success - -1. Initial call: Steps 1-2 succeed, network issue prevents response -2. Tool retried by MCP framework -3. Step 1: `at_least_once` returns cached file metadata -4. Step 2: `at_most_once` returns cached OCR text -5. Step 3: Translation proceeds - -## Best Practices - -Choose the right pattern for each step: - -```python -# Good: Idempotent read uses `at_least_once`. -data = await at_least_once("read", context, read_func, type=dict) - -# Good: External API uses `at_most_once`. -result = await at_most_once( - "api", - context, - api_func, - type=str, - retryable_exceptions=[...], +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, ) -# Bad: Using `at_most_once` for idempotent read (unnecessary). -data = await at_most_once("read", context, read_func, type=dict) +async def main(): + await mcp.application(servicers=ordered_map_servicers()).run() ``` -Store intermediate results: +## Benefits -```python -async def expensive_operation(): - result = await external_api() - # Store immediately so we don't lose it. - # Use `.idempotently()` if multiple SortedMap operations occur - # in the same context. - await results_map.idempotently("store_result").insert( - context, - entries={...}, - ) - return result -``` - -Use distinct aliases: +- Type Safety: Pydantic validates all intermediate results +- Clear Errors: Each step has specific error types +- Resumable: If translation fails, OCR doesn't re-run +- Protobuf Integration: OrderedMap with `from_model` / `as_model` +- Audit Trail: Intermediate results stored for debugging -```python -# Each step has unique alias. -await at_least_once(f"get_file_{file_id}", ...) -await at_most_once(f"ocr_{job_id}", ...) -await at_most_once(f"translate_{job_id}", ...) -await at_least_once(f"store_job_{job_id}", ...) -``` +## Use Case -## Running - -```bash -cd examples/document -uv run python example.py -``` +Perfect for workflows that: +- Call external APIs with charges or side effects +- Need to resume after partial failures +- Require validation of intermediate results +- Want type-safe data structures throughout diff --git a/examples/document/example.py b/examples/document/example.py index 0b9c668..d688cd8 100644 --- a/examples/document/example.py +++ b/examples/document/example.py @@ -7,12 +7,13 @@ import asyncio import hashlib -import json import random import sys from pathlib import Path from typing import Any, Dict +from pydantic import BaseModel + # Add api/ to Python path for generated proto code. api_path = Path(__file__).parent.parent.parent / "api" if api_path.exists(): @@ -24,12 +25,52 @@ AtMostOnceFailedBeforeCompleting, ) from reboot.mcp.server import DurableMCP, DurableContext -from reboot.std.collections.v1.sorted_map import SortedMap +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, +) +from rebootdev.protobuf import from_model, as_model # Initialize MCP server. mcp = DurableMCP(path="/mcp") +# Pydantic models for document processing. +class FileData(BaseModel): + """File data model.""" + + file_id: str + content: str + metadata: Dict[str, str] = {} + + +class OCRResult(BaseModel): + """OCR processing result.""" + + job_id: str + step: str + text: str + + +class TranslationResult(BaseModel): + """Translation processing result.""" + + job_id: str + step: str + text: str + language: str + + +class JobResult(BaseModel): + """Final job result.""" + + job_id: str + file_id: str + target_language: str + status: str + result: str + + class NetworkError(Exception): """Temporary network error (retryable).""" @@ -107,22 +148,22 @@ async def process_document( Returns: Processing result with job_id and status. """ - files_map = SortedMap.ref("files") - results_map = SortedMap.ref("results") - jobs_map = SortedMap.ref("jobs") + files_map = OrderedMap.ref("files") + results_map = OrderedMap.ref("results") + jobs_map = OrderedMap.ref("jobs") # Generate job ID. job_id = f"job_{hashlib.md5(f'{file_id}_{target_language}'.encode()).hexdigest()[:12]}" # Step 1: Retrieve file metadata (idempotent read). async def get_file_metadata(): - response = await files_map.get(context, key=file_id) + response = await files_map.search(context, key=file_id) - if not response.HasField("value"): + if not response.found: raise ValueError(f"File {file_id} not found") - file_data = json.loads(response.value.decode("utf-8")) - return file_data + file_data = as_model(response.value, FileData) + return file_data.model_dump() # Use `at_least_once` for idempotent file lookup. file_metadata = await at_least_once( @@ -137,15 +178,17 @@ async def perform_ocr(): # Call OCR API. extracted_text = await simulate_ocr_api(file_metadata["content"]) - # Store OCR result. + # Create Pydantic model and store OCR result. ocr_result_key = f"{job_id}_ocr" + ocr_result = OCRResult( + job_id=job_id, + step="ocr", + text=extracted_text, + ) await results_map.idempotently(f"store_ocr_{job_id}").insert( context, - entries={ - ocr_result_key: json.dumps( - {"job_id": job_id, "step": "ocr", "text": extracted_text} - ).encode("utf-8") - }, + key=ocr_result_key, + value=from_model(ocr_result), ) return extracted_text @@ -195,20 +238,18 @@ async def perform_translation(): ocr_text, target_language ) - # Store translation result. + # Create Pydantic model and store translation result. translation_result_key = f"{job_id}_translation" + translation_result = TranslationResult( + job_id=job_id, + step="translation", + text=translated_text, + language=target_language, + ) await results_map.idempotently(f"store_translation_{job_id}").insert( context, - entries={ - translation_result_key: json.dumps( - { - "job_id": job_id, - "step": "translation", - "text": translated_text, - "language": target_language, - } - ).encode("utf-8") - }, + key=translation_result_key, + value=from_model(translation_result), ) return translated_text @@ -252,19 +293,17 @@ async def perform_translation(): # Step 4: Store final job result (idempotent write). async def store_job_result(): + job_result = JobResult( + job_id=job_id, + file_id=file_id, + target_language=target_language, + status="completed", + result=translated_text, + ) await jobs_map.insert( context, - entries={ - job_id: json.dumps( - { - "job_id": job_id, - "file_id": file_id, - "target_language": target_language, - "status": "completed", - "result": translated_text, - } - ).encode("utf-8") - }, + key=job_id, + value=from_model(job_result), ) return job_id @@ -302,19 +341,17 @@ async def upload_file( Returns: Upload confirmation. """ - files_map = SortedMap.ref("files") + files_map = OrderedMap.ref("files") + file_data = FileData( + file_id=file_id, + content=content, + metadata=metadata or {}, + ) await files_map.insert( context, - entries={ - file_id: json.dumps( - { - "file_id": file_id, - "content": content, - "metadata": metadata or {}, - } - ).encode("utf-8") - }, + key=file_id, + value=from_model(file_data), ) return {"status": "success", "file_id": file_id} @@ -335,21 +372,21 @@ async def get_job_status( Returns: Job status and result if completed. """ - jobs_map = SortedMap.ref("jobs") + jobs_map = OrderedMap.ref("jobs") - response = await jobs_map.get(context, key=job_id) + response = await jobs_map.search(context, key=job_id) - if not response.HasField("value"): + if not response.found: return {"status": "error", "message": "Job not found"} - job_data = json.loads(response.value.decode("utf-8")) + job_result = as_model(response.value, JobResult) - return {"status": "success", "job": job_data} + return {"status": "success", "job": job_result.model_dump()} async def main(): """Start the document processing example server.""" - await mcp.application().run() + await mcp.application(servicers=ordered_map_servicers()).run() if __name__ == "__main__": diff --git a/examples/processing/example.py b/examples/processing/example.py index 42626e8..dc4279e 100644 --- a/examples/processing/example.py +++ b/examples/processing/example.py @@ -6,12 +6,13 @@ """ import asyncio -import json import random import sys from pathlib import Path from typing import Any, Dict +from pydantic import BaseModel + # Add api/ to Python path for generated proto code. api_path = Path(__file__).parent.parent.parent / "api" if api_path.exists(): @@ -25,6 +26,17 @@ mcp = DurableMCP(path="/mcp") +# Pydantic model for payment records. +class PaymentRecord(BaseModel): + """Payment record model.""" + + transaction_id: str + amount: float + currency: str + description: str = "" + status: str + + class NetworkError(Exception): """Temporary network error (retryable).""" @@ -126,20 +138,19 @@ async def make_payment(): # Call external payment API. result = await simulate_payment_api(amount, currency, context) - # Store payment record. + # Create Pydantic model and store payment record. payment_id = result["transaction_id"] + payment_record = PaymentRecord( + transaction_id=payment_id, + amount=amount, + currency=currency, + description=description, + status=result["status"], + ) await payments_map.insert( context, entries={ - payment_id: json.dumps( - { - "transaction_id": payment_id, - "amount": amount, - "currency": currency, - "description": description, - "status": result["status"], - } - ).encode("utf-8") + payment_id: payment_record.model_dump_json().encode("utf-8") }, ) @@ -212,9 +223,9 @@ async def get_payment( if not response.HasField("value"): return {"status": "error", "message": "Payment not found"} - payment_data = json.loads(response.value.decode("utf-8")) + payment_record = PaymentRecord.model_validate_json(response.value) - return {"status": "success", "payment": payment_data} + return {"status": "success", "payment": payment_record.model_dump()} @mcp.tool() diff --git a/examples/run.py b/examples/run.py index 1a5f44e..a2d85aa 100755 --- a/examples/run.py +++ b/examples/run.py @@ -32,11 +32,11 @@ }, "4": { "name": "document", - "description": "Document pipeline combining both patterns", + "description": "Document pipeline with OrderedMap + Pydantic", }, "5": { "name": "define", - "description": "Technical glossary with SortedMap CRUD", + "description": "Technical glossary with OrderedMap CRUD", }, } diff --git a/examples/steps/example.py b/examples/steps/example.py index a196533..dd6cdcd 100644 --- a/examples/steps/example.py +++ b/examples/steps/example.py @@ -6,11 +6,12 @@ """ import asyncio -import json import sys from pathlib import Path from typing import Any, Dict +from pydantic import BaseModel + # Add api/ to Python path for generated proto code. api_path = Path(__file__).parent.parent.parent / "api" if api_path.exists(): @@ -24,6 +25,22 @@ mcp = DurableMCP(path="/mcp") +# Pydantic models for user and profile data. +class UserData(BaseModel): + """User data model.""" + + username: str + email: str + + +class ProfileData(BaseModel): + """Profile data model.""" + + user_id: str + bio: str = "" + avatar_url: str = "" + + @mcp.tool() async def create_user_with_profile( username: str, @@ -56,14 +73,11 @@ async def create_user(): users_map = SortedMap.ref("users") user_id = f"user_{hash(username) % 100000}" - # Store user data. + # Create Pydantic model and store user data. + user_data = UserData(username=username, email=email) await users_map.insert( context, - entries={ - user_id: json.dumps( - {"username": username, "email": email} - ).encode("utf-8") - }, + entries={user_id: user_data.model_dump_json().encode("utf-8")}, ) return user_id @@ -82,14 +96,15 @@ async def create_profile(): profiles_map = SortedMap.ref("profiles") profile_id = f"profile_{user_id}" - # Store profile data. + # Create Pydantic model and store profile data. + profile_data = ProfileData( + user_id=user_id, + bio=bio, + avatar_url=avatar_url, + ) await profiles_map.insert( context, - entries={ - profile_id: json.dumps( - {"user_id": user_id, "bio": bio, "avatar_url": avatar_url} - ).encode("utf-8") - }, + entries={profile_id: profile_data.model_dump_json().encode("utf-8")}, ) return profile_id @@ -131,9 +146,9 @@ async def get_user( if not response.HasField("value"): return {"status": "error", "message": "User not found"} - user_data = json.loads(response.value.decode("utf-8")) + user_data = UserData.model_validate_json(response.value) - return {"status": "success", "user": user_data} + return {"status": "success", "user": user_data.model_dump()} @mcp.tool() @@ -157,9 +172,9 @@ async def get_profile( if not response.HasField("value"): return {"status": "error", "message": "Profile not found"} - profile_data = json.loads(response.value.decode("utf-8")) + profile_data = ProfileData.model_validate_json(response.value) - return {"status": "success", "profile": profile_data} + return {"status": "success", "profile": profile_data.model_dump()} async def main():