Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/agentevals/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from pathlib import Path
from typing import TYPE_CHECKING

from fastapi import FastAPI, Request
from fastapi import WebSocket
from fastapi import FastAPI, Request, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

Expand Down
10 changes: 10 additions & 0 deletions src/agentevals/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel

from ..config import EvalParams

T = TypeVar("T")


Expand Down Expand Up @@ -134,6 +136,14 @@ class ConvertTracesData(CamelModel):
traces: list[TraceConversionEntry]


class EvaluateJsonRequest(CamelModel):
"""Request body for JSON-based trace evaluation (``POST /evaluate/json``)."""

traces: dict = Field(description="OTLP JSON export with resourceSpans structure.")
config: EvalParams = Field(default_factory=EvalParams, description="Evaluation parameters.")
eval_set: dict | None = Field(default=None, description="Optional ADK EvalSet JSON.")

Comment thread
krisztianfekete marked this conversation as resolved.

# ---------------------------------------------------------------------------
# SSE evaluation event models
# ---------------------------------------------------------------------------
Expand Down
160 changes: 158 additions & 2 deletions src/agentevals/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import tempfile
from typing import Any

from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import StreamingResponse
from pydantic.alias_generators import to_camel

Expand All @@ -27,13 +27,22 @@
)
from ..converter import convert_traces
from ..extraction import get_extractor
from ..runner import RunResult, get_loader, load_eval_set, run_evaluation
from ..loader.otlp import OtlpJsonLoader
from ..runner import (
RunResult,
get_loader,
load_eval_set,
load_eval_set_from_dict,
run_evaluation,
run_evaluation_from_traces,
)
from ..trace_metrics import extract_performance_metrics, extract_trace_metadata
from .models import (
ApiKeyStatus,
ConfigData,
ConvertTracesData,
EvalSetValidation,
EvaluateJsonRequest,
HealthData,
MetricInfo,
SSEDoneEvent,
Expand Down Expand Up @@ -61,6 +70,8 @@ def _camel_keys(obj: Any) -> Any:

router = APIRouter()

_MAX_JSON_BODY_BYTES = 50 * 1024 * 1024 # 50 MB (multipart endpoints allow 10 MB per file)

_TYPE_TO_MODEL = {
"builtin": BuiltinMetricDef,
"code": CodeEvaluatorDef,
Expand Down Expand Up @@ -729,3 +740,148 @@ async def run_with_progress():
"Connection": "keep-alive",
},
)


def _parse_json_request(request: EvaluateJsonRequest):
"""Parse traces and eval set from an EvaluateJsonRequest.

Returns (traces, eval_set). Raises HTTPException on invalid input.
"""
try:
Comment thread
krisztianfekete marked this conversation as resolved.
traces = OtlpJsonLoader().load_from_dict(request.traces)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc

if not traces:
raise HTTPException(status_code=400, detail="No traces found in OTLP JSON")

eval_set = None
if request.eval_set:
Comment thread
peterj marked this conversation as resolved.
try:
eval_set = load_eval_set_from_dict(request.eval_set)
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid eval set: {exc}") from exc

return traces, eval_set


def _check_json_body_size(raw_request: Request):
content_length = int(raw_request.headers.get("content-length", 0))
if content_length > _MAX_JSON_BODY_BYTES:
raise HTTPException(
status_code=413,
detail=f"Request body exceeds {_MAX_JSON_BODY_BYTES // (1024 * 1024)}MB limit",
)


def _sse_error(message: str) -> str:
return f"data: {SSEErrorEvent(error=message).model_dump_json(by_alias=True)}\n\n"


@router.post("/evaluate/json", response_model=StandardResponse[RunResult])
async def evaluate_traces_json(request: EvaluateJsonRequest, raw_request: Request):
"""Evaluate OTLP JSON traces passed in the request body."""
_check_json_body_size(raw_request)
traces, eval_set = _parse_json_request(request)

try:
result = await run_evaluation_from_traces(
traces=traces,
config=request.config,
eval_set=eval_set,
)
return StandardResponse(data=_camel_keys(result.model_dump(by_alias=True)))
except Exception as exc:
logger.exception("JSON evaluation failed")
raise HTTPException(status_code=500, detail=f"Internal error: {exc!s}") from exc


@router.post("/evaluate/json/stream")
async def evaluate_traces_json_stream(request: EvaluateJsonRequest, raw_request: Request):
"""Evaluate OTLP JSON traces with real-time progress via SSE."""
_check_json_body_size(raw_request)

async def event_generator():
try:
try:
traces, eval_set = _parse_json_request(request)
except HTTPException as exc:
yield _sse_error(exc.detail)
return

for trace in traces:
try:
extractor = get_extractor(trace)
perf_metrics = _camel_keys(extract_performance_metrics(trace, extractor))
trace_metadata = _camel_keys(extract_trace_metadata(trace, extractor))
evt = SSEPerformanceMetricsEvent(
trace_id=trace.trace_id,
performance_metrics=perf_metrics,
trace_metadata=trace_metadata,
)
yield f"event: performance_metrics\ndata: {evt.model_dump_json(by_alias=True)}\n\n"
except Exception as e:
logger.error(f"Failed to extract early performance metrics: {e}")
Comment thread
peterj marked this conversation as resolved.

queue: asyncio.Queue = asyncio.Queue()

async def progress_callback(message: str):
await queue.put(("progress", message))

async def trace_progress_callback(trace_result):
await queue.put(("trace_progress", trace_result))

async def run_with_progress():
result = await run_evaluation_from_traces(
traces=traces,
config=request.config,
eval_set=eval_set,
progress_callback=progress_callback,
trace_progress_callback=trace_progress_callback,
)
await queue.put(("done", result))

eval_task = asyncio.create_task(run_with_progress())

try:
while True:
msg = await queue.get()
tag, payload = msg

if tag == "done":
evt = SSEDoneEvent(
result=_camel_keys(payload.model_dump(by_alias=True)),
)
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
Comment thread
krisztianfekete marked this conversation as resolved.
break
elif tag == "trace_progress":
evt = SSETraceProgressEvent(
trace_progress=SSETraceProgress(
trace_id=payload.trace_id,
partial_result=_camel_keys(payload.model_dump(by_alias=True)),
)
)
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
elif tag == "progress":
evt = SSEProgressEvent(message=payload)
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
finally:
if not eval_task.done():
eval_task.cancel()
try:
await eval_task
except asyncio.CancelledError:
pass

except Exception as exc:
logger.exception("JSON evaluation stream failed")
yield _sse_error(str(exc))

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
53 changes: 35 additions & 18 deletions src/agentevals/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from pathlib import Path
from typing import Annotated, Any, Literal

from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic.alias_generators import to_camel


class BuiltinMetricDef(BaseModel):
Expand Down Expand Up @@ -99,13 +100,14 @@ def _validate_grader(cls, v: dict[str, Any]) -> dict[str, Any]:
]


class EvalRunConfig(BaseModel):
trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).")
class EvalParams(BaseModel):
"""Evaluation parameters independent of how traces are provided.

eval_set_file: str | None = Field(
default=None,
description="Path to a golden eval set JSON file (ADK EvalSet format).",
)
Used by ``run_evaluation_from_traces`` for programmatic / API-driven
evaluation. ``EvalRunConfig`` inherits from this and adds file I/O fields.
"""

model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)

metrics: list[str] = Field(
default_factory=lambda: ["tool_trajectory_avg_score"],
Expand All @@ -117,19 +119,16 @@ class EvalRunConfig(BaseModel):
description="Custom evaluator definitions.",
)

trace_format: str = Field(
default="jaeger-json",
description="Format of the trace files (jaeger-json or otlp-json).",
)

judge_model: str | None = Field(
default=None,
description="LLM model for judge-based metrics.",
)

threshold: float | None = Field(
default=None,
description="Score threshold for pass/fail.",
ge=0,
le=1,
description="Score threshold for pass/fail (0.0 to 1.0).",
)

trajectory_match_type: str | None = Field(
Expand All @@ -145,17 +144,35 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None:
raise ValueError(f"Invalid trajectory_match_type '{v}'. Valid values: {sorted(valid)}")
return v.upper() if v is not None else v

output_format: str = Field(
default="table",
description="Output format: 'table', 'json', or 'summary'.",
)

max_concurrent_traces: int = Field(
default=10,
ge=1,
description="Maximum number of traces to evaluate concurrently.",
)

max_concurrent_evals: int = Field(
default=5,
ge=1,
description="Maximum number of concurrent metric evaluations (LLM API calls).",
)
Comment thread
krisztianfekete marked this conversation as resolved.


class EvalRunConfig(EvalParams):
"""Full configuration for file-based evaluation runs."""

trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).")

eval_set_file: str | None = Field(
default=None,
description="Path to a golden eval set JSON file (ADK EvalSet format).",
)

trace_format: str = Field(
default="jaeger-json",
description="Format of the trace files (jaeger-json or otlp-json).",
)

output_format: str = Field(
default="table",
description="Output format: 'table', 'json', or 'summary'.",
)
Loading