Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions src/py_code_mode/execution/container/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This client connects to a running session server and provides
a Python API for code execution. Each client maintains its own
isolated session with separate Python namespace and artifacts.
The server allocates the session on the first execute call.

Usage:
async with SessionClient("http://localhost:8080") as client:
Expand All @@ -13,7 +14,6 @@

from __future__ import annotations

import uuid
from dataclasses import dataclass
from typing import Any

Expand Down Expand Up @@ -64,7 +64,7 @@ class ResetResult:
"""Reset result."""

status: str
session_id: str
session_id: str | None


class SessionClient:
Expand All @@ -74,24 +74,22 @@ class SessionClient:
- Separate Python namespace (variables don't leak between sessions)
- Separate artifact directory

Use the same client instance across requests to maintain state,
or create a new client for a fresh isolated session.
Use the same client instance across requests to maintain state.
The server issues the session ID on first execution and the client
reuses it for later session-scoped requests.
"""

def __init__(
self,
base_url: str = "http://localhost:8080",
timeout: float = 30.0,
session_id: str | None = None,
auth_token: str | None = None,
) -> None:
"""Initialize session client.

Args:
base_url: Base URL of session server.
timeout: Default timeout for HTTP requests.
session_id: Optional session ID. If not provided, a new
unique session is created on first request.
auth_token: Optional Bearer token for API authentication.
If provided, sent as Authorization header.
"""
Expand All @@ -101,7 +99,7 @@ def __init__(
# Strip trailing slash
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session_id = session_id or str(uuid.uuid4())
self.session_id: str | None = None
self.auth_token = auth_token
self._client: httpx.AsyncClient | None = None

Expand All @@ -113,7 +111,9 @@ async def _get_client(self) -> httpx.AsyncClient:

def _headers(self) -> dict[str, str]:
"""Get headers with session ID and optional auth token."""
headers = {"X-Session-ID": self.session_id}
headers: dict[str, str] = {}
if self.session_id is not None:
headers["X-Session-ID"] = self.session_id
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
return headers
Expand Down Expand Up @@ -145,16 +145,15 @@ async def execute(
response.raise_for_status()
data = response.json()

# Update session_id if server assigned one
if "session_id" in data:
self.session_id = data["session_id"]
session_id = data["session_id"]
self.session_id = session_id

return ExecuteResult(
value=data["value"],
stdout=data["stdout"],
error=data["error"],
execution_time_ms=data["execution_time_ms"],
session_id=data.get("session_id", self.session_id),
session_id=session_id,
)

async def health(self) -> HealthResult:
Expand Down Expand Up @@ -198,17 +197,21 @@ async def reset(self) -> ResetResult:
Returns:
ResetResult confirming reset.
"""
if self.session_id is None:
return ResetResult(status="reset", session_id=None)

client = await self._get_client()
response = await client.post(
f"{self.base_url}/reset",
headers=self._headers(),
)
response.raise_for_status()
data = response.json()
self.session_id = None

return ResetResult(
status=data["status"],
session_id=data.get("session_id", self.session_id),
session_id=data.get("session_id"),
)

async def install_deps(self, packages: list[str]) -> dict[str, Any]:
Expand All @@ -230,10 +233,8 @@ async def install_deps(self, packages: list[str]) -> dict[str, Any]:
headers=self._headers(),
timeout=300.0, # Long timeout for package installation
)
data = response.json()
if response.status_code != 200:
raise RuntimeError(data.get("error", "Install failed"))
return data
response.raise_for_status()
return response.json()

async def uninstall_deps(self, packages: list[str]) -> dict[str, Any]:
"""Uninstall packages from the container.
Expand All @@ -254,10 +255,8 @@ async def uninstall_deps(self, packages: list[str]) -> dict[str, Any]:
headers=self._headers(),
timeout=120.0, # Reasonable timeout for uninstall
)
data = response.json()
if response.status_code != 200:
raise RuntimeError(data.get("error", "Uninstall failed"))
return data
response.raise_for_status()
return response.json()

# ==========================================================================
# Tools API Methods
Expand Down Expand Up @@ -481,9 +480,7 @@ async def api_add_dep(self, package: str) -> dict[str, Any]:
headers=self._headers(),
timeout=300.0, # Long timeout for package installation
)
if response.status_code != 200:
data = response.json()
raise RuntimeError(data.get("detail", "Add dep failed"))
response.raise_for_status()
return response.json()

async def api_remove_dep(self, package: str) -> dict[str, Any]:
Expand All @@ -504,9 +501,7 @@ async def api_remove_dep(self, package: str) -> dict[str, Any]:
json={"package": package},
headers=self._headers(),
)
if response.status_code != 200:
data = response.json()
raise RuntimeError(data.get("detail", "Remove dep failed"))
response.raise_for_status()
return response.json()

async def api_sync_deps(self) -> dict[str, Any]:
Expand Down
41 changes: 37 additions & 4 deletions src/py_code_mode/execution/container/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,27 @@ async def __aexit__(
"""Stop container and cleanup."""
await self.close()

def _translate_client_http_error(self, error: Exception) -> RuntimeError:
"""Translate low-level client HTTP errors into executor-style RuntimeErrors."""
if HTTPX_AVAILABLE and isinstance(error, httpx.HTTPStatusError):
detail: str | None = None
try:
data = error.response.json()
except ValueError:
data = None

if isinstance(data, dict):
value = data.get("detail") or data.get("error")
if isinstance(value, str) and value:
detail = value

if detail is None:
detail = f"HTTP {error.response.status_code}: {error.response.text}"

return RuntimeError(detail)

return RuntimeError(str(error))

def _create_docker_client(self) -> Any:
"""Create Docker client, trying multiple socket locations if needed."""
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -541,7 +562,10 @@ async def install_deps(self, packages: list[str]) -> dict[str, Any]:
if self._client is None:
raise RuntimeError("Container not started")

return await self._client.install_deps(packages)
try:
return await self._client.install_deps(packages)
except Exception as e:
raise self._translate_client_http_error(e) from e

async def uninstall_deps(self, packages: list[str]) -> dict[str, Any]:
"""Uninstall packages from the container environment.
Expand All @@ -568,7 +592,10 @@ async def uninstall_deps(self, packages: list[str]) -> dict[str, Any]:
if self._client is None:
raise RuntimeError("Container not started")

return await self._client.uninstall_deps(packages)
try:
return await self._client.uninstall_deps(packages)
except Exception as e:
raise self._translate_client_http_error(e) from e

# ==========================================================================
# Tools API Methods
Expand Down Expand Up @@ -751,7 +778,10 @@ async def add_dep(self, package: str) -> dict[str, Any]:
if self._client is None:
raise RuntimeError("Container not started")

return await self._client.api_add_dep(package)
try:
return await self._client.api_add_dep(package)
except Exception as e:
raise self._translate_client_http_error(e) from e

async def remove_dep(self, package: str) -> dict[str, Any]:
"""Remove a package from configuration and uninstall it.
Expand All @@ -770,7 +800,10 @@ async def remove_dep(self, package: str) -> dict[str, Any]:
if self._client is None:
raise RuntimeError("Container not started")

return await self._client.api_remove_dep(package)
try:
return await self._client.api_remove_dep(package)
except Exception as e:
raise self._translate_client_http_error(e) from e

async def sync_deps(self) -> dict[str, Any]:
"""Install all configured packages.
Expand Down
40 changes: 22 additions & 18 deletions src/py_code_mode/execution/container/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,24 +313,22 @@ def create_session(session_id: str) -> Session:
)


def get_or_create_session(session_id: str | None) -> Session:
"""Get existing session or create a new one."""
# Generate session_id if not provided
if session_id is None:
session_id = str(uuid.uuid4())

# Return existing session
if session_id in _state.sessions:
session = _state.sessions[session_id]
session.last_used = time.time()
return session

# Create new session
def create_new_session() -> Session:
"""Create a new isolated session with a server-issued ID."""
session_id = str(uuid.uuid4())
session = create_session(session_id)
_state.sessions[session_id] = session
return session


def get_existing_session(session_id: str) -> Session | None:
"""Get an existing session if present."""
session = _state.sessions.get(session_id)
if session is not None:
session.last_used = time.time()
return session


def cleanup_expired_sessions() -> int:
"""Remove sessions that haven't been used recently."""
now = time.time()
Expand Down Expand Up @@ -603,8 +601,12 @@ async def execute(
# Cleanup expired sessions periodically
cleanup_expired_sessions()

# Get or create session
session = get_or_create_session(x_session_id)
if x_session_id is None:
session = create_new_session()
else:
session = get_existing_session(x_session_id)
if session is None:
raise HTTPException(status_code=400, detail="Invalid session ID")

start = time.time()
timeout = body.timeout or _state.config.default_timeout
Expand Down Expand Up @@ -667,12 +669,14 @@ async def reset(
x_session_id: str | None = Header(None, alias="X-Session-ID"),
) -> ResetResponseModel:
"""Reset a session (clears namespace, keeps artifacts)."""
if x_session_id and x_session_id in _state.sessions:
del _state.sessions[x_session_id]
if x_session_id is None or x_session_id not in _state.sessions:
raise HTTPException(status_code=400, detail="Invalid session ID")

del _state.sessions[x_session_id]

return ResetResponseModel(
status="reset",
session_id=x_session_id or "",
session_id=x_session_id,
)

# NOTE: /sessions endpoint removed - session enumeration is an information disclosure risk
Expand Down
21 changes: 10 additions & 11 deletions src/py_code_mode/integrations/autogen.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def create_run_code_tool(
executor: InProcessExecutor | None = None,
session_url: str | None = None,
timeout: float = 30.0,
session_id: str | None = None,
) -> Callable[[str], Any]:
"""Create a run_code tool for AutoGen agents.

Expand All @@ -45,9 +44,6 @@ def create_run_code_tool(
executor: CodeExecutor instance for in-process execution
session_url: URL of py-code-mode session server for remote execution
timeout: Execution timeout in seconds
session_id: Optional session ID for remote execution. If not provided,
a unique session is created. Use this to isolate different
agents using the same session server.

Returns:
A function that can be registered as an AutoGen tool
Expand All @@ -63,7 +59,7 @@ def create_run_code_tool(
if executor is not None:
return _create_local_tool(executor, timeout)
else:
return _create_remote_tool(session_url, timeout, session_id) # type: ignore
return _create_remote_tool(session_url, timeout) # type: ignore


def _create_local_tool(
Expand Down Expand Up @@ -109,17 +105,14 @@ async def run_code(code: str) -> str:
def _create_remote_tool(
session_url: str,
timeout: float,
session_id: str | None = None,
) -> Callable[[str], Any]:
"""Create tool using remote session server."""

# Lazy import to avoid requiring httpx for local-only usage
import uuid

import httpx

# Each tool instance gets its own session
_session_id = session_id or str(uuid.uuid4())
# Session is created lazily by the server on first execute.
_session_id: str | None = None

def run_code(code: str) -> str:
"""Execute Python code with access to tools.*, workflows.*, and artifacts.*.
Expand All @@ -137,15 +130,21 @@ def run_code(code: str) -> str:
Returns:
String representation of the result or error message
"""
nonlocal _session_id
try:
headers = {}
if _session_id is not None:
headers["X-Session-ID"] = _session_id

with httpx.Client(timeout=timeout + 5) as client:
response = client.post(
f"{session_url.rstrip('/')}/execute",
json={"code": code, "timeout": timeout},
headers={"X-Session-ID": _session_id},
headers=headers,
)
response.raise_for_status()
result = response.json()
_session_id = result["session_id"]

if result.get("error"):
return f"Error: {result['error']}"
Expand Down
Loading