Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions agent/src/attachments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Attachment download and integrity verification.

Downloads attachments from S3 using version-pinned reads and verifies
SHA-256 checksums against the orchestrator-provided values. Files are
placed in a workspace subdirectory for the agent to reference.
"""

from __future__ import annotations

import hashlib
import os
from pathlib import Path
from typing import Literal
from urllib.parse import urlparse

from pydantic import BaseModel, ConfigDict

from shell import log

ATTACHMENTS_DIR = ".attachments"


class PreparedAttachment(BaseModel):
"""An attachment downloaded to the local filesystem and verified."""

model_config = ConfigDict(frozen=True, extra="forbid")

attachment_id: str
type: Literal["image", "file", "url"]
content_type: str
filename: str
local_path: str
size_bytes: int
token_estimate: int | None = None


def download_attachments(
attachments: list,
workspace: str,
) -> list[PreparedAttachment]:
"""Download all attachments from S3 and verify integrity.

Args:
attachments: List of AttachmentConfig models from TaskConfig.
workspace: The agent workspace root (e.g., /workspace).

Returns:
List of PreparedAttachment with local file paths.

Raises:
RuntimeError: If any attachment fails download or integrity check.
"""
if not attachments:
return []

import boto3

attachments_dir = Path(workspace) / ATTACHMENTS_DIR
attachments_dir.mkdir(parents=True, exist_ok=True)

s3_client = boto3.client("s3")
prepared: list[PreparedAttachment] = []

try:
for att in attachments:
local_path = _download_single(att, attachments_dir, s3_client)
prepared.append(
PreparedAttachment(
attachment_id=att.attachment_id,
type=att.type,
content_type=att.content_type,
filename=att.filename,
local_path=str(local_path),
size_bytes=att.size_bytes,
token_estimate=att.token_estimate,
)
)
except Exception:
import shutil

shutil.rmtree(attachments_dir, ignore_errors=True)
raise

log("TASK", f"Downloaded {len(prepared)} attachment(s) to {attachments_dir}")
return prepared


def _download_single(att, attachments_dir: Path, s3_client) -> Path:
"""Download a single attachment and verify its SHA-256 checksum."""
# Parse s3_uri (s3://bucket/key)
parsed = urlparse(att.s3_uri)
bucket = parsed.netloc
key = parsed.path.lstrip("/")

# Unique subdirectory per attachment to avoid filename collisions
dest_dir = attachments_dir / att.attachment_id
dest_dir.mkdir(parents=True, exist_ok=True)
local_path = dest_dir / att.filename

log(
"TASK",
f"Downloading attachment '{att.filename}' "
f"(s3://{bucket}/{key}, version={att.s3_version_id})",
)

# Download with pinned VersionId to prevent TOCTOU
response = s3_client.get_object(
Bucket=bucket,
Key=key,
VersionId=att.s3_version_id,
)
content = response["Body"].read()

# Verify SHA-256 integrity
actual_checksum = hashlib.sha256(content).hexdigest()
if actual_checksum != att.checksum_sha256:
raise RuntimeError(
f"Attachment '{att.filename}' integrity check failed: "
f"expected SHA-256 {att.checksum_sha256}, got {actual_checksum}. "
f"The file may have been tampered with."
)

# Verify size matches
if len(content) != att.size_bytes:
raise RuntimeError(
f"Attachment '{att.filename}' size mismatch: "
f"expected {att.size_bytes} bytes, got {len(content)} bytes."
)

# Write to local filesystem
local_path.write_bytes(content)
os.chmod(str(local_path), 0o444) # Read-only

log("TASK", f" Verified: {att.filename} ({len(content)} bytes, SHA-256 OK)")
return local_path
15 changes: 14 additions & 1 deletion agent/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import uuid

from models import TaskConfig, TaskType
from models import AttachmentConfig, TaskConfig, TaskType
from shell import log

AGENT_WORKSPACE = os.environ.get("AGENT_WORKSPACE", "/workspace")
Expand Down Expand Up @@ -114,6 +114,7 @@ def build_config(
initial_approvals: list[str] | None = None,
initial_approval_gate_count: int = 0,
approval_gate_cap: int | None = None,
attachments: list[dict] | None = None,
) -> TaskConfig:
"""Build and validate configuration from explicit parameters.

Expand Down Expand Up @@ -149,6 +150,17 @@ def build_config(
if errors:
raise ValueError("; ".join(errors))

# Validate attachment descriptors into typed models (Pydantic validation
# surfaces schema mismatches between the orchestrator and agent early).
validated_attachments: list[AttachmentConfig] = []
if attachments:
for i, raw_att in enumerate(attachments):
try:
validated_attachments.append(AttachmentConfig.model_validate(raw_att))
except Exception as e:
log("ERROR", f"Attachment[{i}] validation failed: {e}")
raise ValueError(f"Attachment[{i}] validation failed: {e}") from e

return TaskConfig(
repo_url=resolved_repo_url,
issue_number=resolved_issue_number,
Expand All @@ -172,6 +184,7 @@ def build_config(
initial_approvals=initial_approvals or [],
initial_approval_gate_count=initial_approval_gate_count,
approval_gate_cap=approval_gate_cap,
attachments=validated_attachments,
)


Expand Down
36 changes: 36 additions & 0 deletions agent/src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,39 @@ class MemoryContext(BaseModel):
# (see cdk/src/handlers/shared/context-hydration.ts).
SUPPORTED_HYDRATED_CONTEXT_VERSION = 1

# Attachment types — mirrors AttachmentType in cdk/src/handlers/shared/types.ts.
AttachmentType = Literal["image", "file", "url"]


class AttachmentConfig(BaseModel):
"""Attachment descriptor from the orchestrator — mirrors AgentAttachmentPayload in types.ts."""

model_config = ConfigDict(frozen=True, extra="forbid")

attachment_id: str
type: AttachmentType
content_type: str
filename: str
s3_uri: str
s3_version_id: str
size_bytes: int
source_url: str | None = None
token_estimate: int | None = None
checksum_sha256: str

@model_validator(mode="after")
def _validate_integrity_fields(self) -> Self:
if not self.s3_version_id:
raise ValueError("s3_version_id is required for integrity verification")
if not self.checksum_sha256:
raise ValueError("checksum_sha256 is required for integrity verification")
# checksum must be lowercase hex (SHA-256 = 64 hex chars)
if len(self.checksum_sha256) != 64 or not all(
c in "0123456789abcdef" for c in self.checksum_sha256
):
raise ValueError("checksum_sha256 must be a 64-character lowercase hex string")
return self


class HydratedContext(BaseModel):
"""Orchestrator context JSON — keep in sync with HydratedContext in context-hydration.ts."""
Expand Down Expand Up @@ -150,6 +183,9 @@ class TaskConfig(BaseModel):
approval_gate_cap: int | None = None
issue: GitHubIssue | None = None
base_branch: str | None = None
# Attachments from the orchestrator payload (Phase 3). Validated as
# AttachmentConfig models. Empty list for tasks without attachments.
attachments: list[AttachmentConfig] = Field(default_factory=list)

@model_validator(mode="after")
def _validate_trace_requires_user_id(self) -> Self:
Expand Down
62 changes: 62 additions & 0 deletions agent/src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ def _chain_prior_agent_error(agent_result: AgentResult | None, exc: BaseExceptio
return tail


def _inject_attachment_context(prompt: str, prepared_attachments: list) -> str:
"""Append attachment file references to the user prompt.

Images are referenced by absolute path so the agent can view them
with the Read tool (which supports multimodal image reading).
File attachments are similarly referenced by path.
"""
lines = ["\n\n---\n\n**Attachments provided with this task:**\n"]
for att in prepared_attachments:
size_kb = att.size_bytes / 1024
if att.type == "image":
lines.append(
f"- **Image:** `{att.filename}` ({size_kb:.1f} KB, {att.content_type}) "
f"— View with: `Read {att.local_path}`"
)
else:
lines.append(
f"- **File:** `{att.filename}` ({size_kb:.1f} KB, {att.content_type}) "
f"— Read with: `Read {att.local_path}`"
)
lines.append(
"\nUse the Read tool to view these files. Image files will be displayed visually when read."
)
return prompt + "\n".join(lines)


def _maybe_upload_trace(
config: TaskConfig,
trajectory,
Expand Down Expand Up @@ -252,6 +278,7 @@ def run_task(
channel_metadata: dict[str, str] | None = None,
trace: bool = False,
user_id: str = "",
attachments: list[dict] | None = None,
) -> dict:
"""Run the full agent pipeline and return a serialized result dict.

Expand Down Expand Up @@ -290,6 +317,7 @@ def run_task(
initial_approvals=initial_approvals,
initial_approval_gate_count=initial_approval_gate_count,
approval_gate_cap=approval_gate_cap,
attachments=attachments,
)

# Inject Cedar policies into config for the PolicyEngine in runner.py
Expand Down Expand Up @@ -440,6 +468,33 @@ def _on_trace_truncated(max_bytes: int, first_dropped: int) -> None:
config.channel_metadata,
)

# Download attachments from S3 (version-pinned, integrity-verified)
prepared_attachments: list = []
if config.attachments:
from attachments import download_attachments

try:
with task_span("task.attachment_download"):
prepared_attachments = download_attachments(
config.attachments, setup.repo_dir
)
progress.write_agent_milestone(
"attachments_downloaded",
f"count={len(prepared_attachments)}",
)
except RuntimeError as e:
log("ERROR", f"Attachment integrity check failed: {e}")
raise RuntimeError(
f"Attachment download/verification failed: {e}. "
"The task cannot proceed without valid attachments."
) from e
except Exception as e:
err_type = type(e).__name__
log("ERROR", f"Attachment download failed: {err_type}: {e}")
raise RuntimeError(
f"Failed to download task attachments from S3: {err_type}: {e}"
) from e

# Log discovered repo-level project configuration
# (all files loaded by setting_sources=["project"])
repo_dir = setup.repo_dir
Expand All @@ -449,6 +504,13 @@ def _on_trace_truncated(max_bytes: int, first_dropped: int) -> None:
else:
log("TASK", "No repo-level project configuration found")

# Inject attachment references into the prompt so the agent knows
# about available files. Images are read natively by the agent's
# Read tool (multimodal support). File attachments are referenced
# by path for the agent to read as needed.
if prepared_attachments:
prompt = _inject_attachment_context(prompt, prepared_attachments)

# Run agent
disk_before = get_disk_usage(AGENT_WORKSPACE)
start_time = time.time()
Expand Down
4 changes: 4 additions & 0 deletions agent/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def _run_task_background(
channel_metadata: dict[str, str] | None = None,
trace: bool = False,
user_id: str = "",
attachments: list[dict] | None = None,
) -> None:
"""Run the agent task in a background thread."""
global _background_pipeline_failed
Expand Down Expand Up @@ -405,6 +406,7 @@ def _run_task_background(
channel_metadata=channel_metadata,
trace=trace,
user_id=user_id,
attachments=attachments,
)
_background_pipeline_failed = False
except Exception as e:
Expand Down Expand Up @@ -492,6 +494,7 @@ def _extract_invocation_params(inp: dict, request: Request) -> dict:
approval_gate_cap = None
channel_source = inp.get("channel_source", "") or ""
channel_metadata = inp.get("channel_metadata") or {}
attachments = inp.get("attachments") or []
# ``trace`` is strictly opt-in (design §10.1). Accept only real
# booleans from the orchestrator — a string "false" would otherwise
# flip the flag on.
Expand Down Expand Up @@ -556,6 +559,7 @@ def _extract_invocation_params(inp: dict, request: Request) -> dict:
"channel_metadata": channel_metadata,
"trace": trace,
"user_id": user_id,
"attachments": attachments,
}


Expand Down
Loading