From 3fcfafc3bed9748934cd72ac035b110a4bad902e Mon Sep 17 00:00:00 2001 From: krisztianfekete Date: Thu, 16 Apr 2026 16:39:17 +0200 Subject: [PATCH 1/3] improve trace load logic --- src/agentevals/api/models.py | 10 +++ src/agentevals/api/routes.py | 142 +++++++++++++++++++++++++++++++++- src/agentevals/config.py | 47 +++++++---- src/agentevals/loader/otlp.py | 68 ++++++++++++---- src/agentevals/runner.py | 87 ++++++++++++++------- 5 files changed, 295 insertions(+), 59 deletions(-) diff --git a/src/agentevals/api/models.py b/src/agentevals/api/models.py index 1e6a1a7..98c1e8e 100644 --- a/src/agentevals/api/models.py +++ b/src/agentevals/api/models.py @@ -11,6 +11,8 @@ from pydantic import BaseModel, ConfigDict, Field from pydantic.alias_generators import to_camel +from ..config import EvalParams + T = TypeVar("T") @@ -134,6 +136,14 @@ class ConvertTracesData(CamelModel): traces: list[TraceConversionEntry] +class EvaluateJsonRequest(CamelModel): + """Request body for JSON-based trace evaluation (``POST /evaluate/json``).""" + + traces: dict = Field(description="OTLP JSON export with resourceSpans structure.") + config: EvalParams = Field(default_factory=EvalParams, description="Evaluation parameters.") + eval_set: dict | None = Field(default=None, description="Optional ADK EvalSet JSON.") + + # --------------------------------------------------------------------------- # SSE evaluation event models # --------------------------------------------------------------------------- diff --git a/src/agentevals/api/routes.py b/src/agentevals/api/routes.py index a1a46a3..0ce2dbb 100644 --- a/src/agentevals/api/routes.py +++ b/src/agentevals/api/routes.py @@ -27,13 +27,22 @@ ) from ..converter import convert_traces from ..extraction import get_extractor -from ..runner import RunResult, get_loader, load_eval_set, run_evaluation +from ..loader.otlp import OtlpJsonLoader +from ..runner import ( + RunResult, + get_loader, + load_eval_set, + load_eval_set_from_dict, + run_evaluation, + run_evaluation_from_traces, +) from ..trace_metrics import extract_performance_metrics, extract_trace_metadata from .models import ( ApiKeyStatus, ConfigData, ConvertTracesData, EvalSetValidation, + EvaluateJsonRequest, HealthData, MetricInfo, SSEDoneEvent, @@ -729,3 +738,134 @@ async def run_with_progress(): "Connection": "keep-alive", }, ) + + +@router.post("/evaluate/json", response_model=StandardResponse[RunResult]) +async def evaluate_traces_json(request: EvaluateJsonRequest): + """Evaluate OTLP JSON traces passed in the request body.""" + try: + traces = OtlpJsonLoader().load_from_dict(request.traces) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + if not traces: + raise HTTPException(status_code=400, detail="No traces found in OTLP JSON") + + eval_set = None + if request.eval_set: + try: + eval_set = load_eval_set_from_dict(request.eval_set) + except Exception as exc: + raise HTTPException(status_code=400, detail=f"Invalid eval set: {exc}") from exc + + try: + result = await run_evaluation_from_traces( + traces=traces, config=request.config, eval_set=eval_set, + ) + return StandardResponse(data=_camel_keys(result.model_dump(by_alias=True))) + except Exception as exc: + logger.exception("JSON evaluation failed") + raise HTTPException(status_code=500, detail=f"Internal error: {exc!s}") from exc + + +@router.post("/evaluate/json/stream") +async def evaluate_traces_json_stream(request: EvaluateJsonRequest): + """Evaluate OTLP JSON traces with real-time progress via SSE.""" + + async def event_generator(): + try: + try: + traces = OtlpJsonLoader().load_from_dict(request.traces) + except ValueError as exc: + yield f"data: {SSEErrorEvent(error=str(exc)).model_dump_json(by_alias=True)}\n\n" + return + + if not traces: + yield f"data: {SSEErrorEvent(error='No traces found in OTLP JSON').model_dump_json(by_alias=True)}\n\n" + return + + eval_set = None + if request.eval_set: + try: + eval_set = load_eval_set_from_dict(request.eval_set) + except Exception as exc: + yield f"data: {SSEErrorEvent(error=f'Invalid eval set: {exc}').model_dump_json(by_alias=True)}\n\n" + return + + for trace in traces: + try: + extractor = get_extractor(trace) + perf_metrics = _camel_keys(extract_performance_metrics(trace, extractor)) + trace_metadata = _camel_keys(extract_trace_metadata(trace, extractor)) + evt = SSEPerformanceMetricsEvent( + trace_id=trace.trace_id, + performance_metrics=perf_metrics, + trace_metadata=trace_metadata, + ) + yield f"event: performance_metrics\ndata: {evt.model_dump_json(by_alias=True)}\n\n" + except Exception as e: + logger.error(f"Failed to extract early performance metrics: {e}") + + queue: asyncio.Queue = asyncio.Queue() + + async def progress_callback(message: str): + await queue.put(("progress", message)) + + async def trace_progress_callback(trace_result): + await queue.put(("trace_progress", trace_result)) + + async def run_with_progress(): + result = await run_evaluation_from_traces( + traces=traces, + config=request.config, + eval_set=eval_set, + progress_callback=progress_callback, + trace_progress_callback=trace_progress_callback, + ) + await queue.put(("done", result)) + + eval_task = asyncio.create_task(run_with_progress()) + + try: + while True: + msg = await queue.get() + tag, payload = msg + + if tag == "done": + evt = SSEDoneEvent( + result=_camel_keys(payload.model_dump(by_alias=True)), + ) + yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" + break + elif tag == "trace_progress": + evt = SSETraceProgressEvent( + trace_progress=SSETraceProgress( + trace_id=payload.trace_id, + partial_result=_camel_keys(payload.model_dump(by_alias=True)), + ) + ) + yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" + elif tag == "progress": + evt = SSEProgressEvent(message=payload) + yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" + finally: + if not eval_task.done(): + eval_task.cancel() + try: + await eval_task + except asyncio.CancelledError: + pass + + except Exception as exc: + logger.exception("JSON evaluation stream failed") + evt = SSEErrorEvent(error=str(exc)) + yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) diff --git a/src/agentevals/config.py b/src/agentevals/config.py index f7a3149..2c65dec 100644 --- a/src/agentevals/config.py +++ b/src/agentevals/config.py @@ -5,7 +5,8 @@ from pathlib import Path from typing import Annotated, Any, Literal -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic.alias_generators import to_camel class BuiltinMetricDef(BaseModel): @@ -99,13 +100,14 @@ def _validate_grader(cls, v: dict[str, Any]) -> dict[str, Any]: ] -class EvalRunConfig(BaseModel): - trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).") +class EvalParams(BaseModel): + """Evaluation parameters independent of how traces are provided. - eval_set_file: str | None = Field( - default=None, - description="Path to a golden eval set JSON file (ADK EvalSet format).", - ) + Used by ``run_evaluation_from_traces`` for programmatic / API-driven + evaluation. ``EvalRunConfig`` inherits from this and adds file I/O fields. + """ + + model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True) metrics: list[str] = Field( default_factory=lambda: ["tool_trajectory_avg_score"], @@ -117,11 +119,6 @@ class EvalRunConfig(BaseModel): description="Custom evaluator definitions.", ) - trace_format: str = Field( - default="jaeger-json", - description="Format of the trace files (jaeger-json or otlp-json).", - ) - judge_model: str | None = Field( default=None, description="LLM model for judge-based metrics.", @@ -145,11 +142,6 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None: raise ValueError(f"Invalid trajectory_match_type '{v}'. Valid values: {sorted(valid)}") return v.upper() if v is not None else v - output_format: str = Field( - default="table", - description="Output format: 'table', 'json', or 'summary'.", - ) - max_concurrent_traces: int = Field( default=10, description="Maximum number of traces to evaluate concurrently.", @@ -159,3 +151,24 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None: default=5, description="Maximum number of concurrent metric evaluations (LLM API calls).", ) + + +class EvalRunConfig(EvalParams): + """Full configuration for file-based evaluation runs.""" + + trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).") + + eval_set_file: str | None = Field( + default=None, + description="Path to a golden eval set JSON file (ADK EvalSet format).", + ) + + trace_format: str = Field( + default="jaeger-json", + description="Format of the trace files (jaeger-json or otlp-json).", + ) + + output_format: str = Field( + default="table", + description="Output format: 'table', 'json', or 'summary'.", + ) diff --git a/src/agentevals/loader/otlp.py b/src/agentevals/loader/otlp.py index 05f2e29..35ddfd3 100644 --- a/src/agentevals/loader/otlp.py +++ b/src/agentevals/loader/otlp.py @@ -56,6 +56,12 @@ def load(self, source: str) -> list[Trace]: logger.info("Loaded %d trace(s) from %s", len(traces), source) return traces + def load_from_dict(self, data: dict) -> list[Trace]: + """Load traces from an OTLP JSON dict (resourceSpans structure).""" + if "resourceSpans" not in data: + raise ValueError("Expected OTLP JSON with 'resourceSpans' key") + return self._parse_otlp_export(data) + def _parse_otlp_export(self, data: dict) -> list[Trace]: """Parse full OTLP export structure with resourceSpans.""" all_spans = [] @@ -122,23 +128,40 @@ def _promote_genai_event_attributes(self, span_data: dict, attributes: dict) -> Some SDKs (e.g. Strands) store message content in span events rather than span attributes. This promotes those values so the converter can find them via normal attribute lookups. + + Accepts events in OTLP array format or flat/nested dict format. """ for event in span_data.get("events", []): - for attr in event.get("attributes", []): - key = attr.get("key", "") - if key in self._GENAI_EVENT_KEYS and key not in attributes: - value_obj = attr.get("value", {}) - if "stringValue" in value_obj: - attributes[key] = value_obj["stringValue"] - - def _extract_attributes(self, attrs_list: list[dict]) -> dict: - """Convert OTLP attributes array to flat dict. - - OTLP attributes are [{key, value: {stringValue|intValue|...}}] - We flatten to {key: value} for easier use. + event_attrs = event.get("attributes", []) + if isinstance(event_attrs, dict): + flat = self._flatten_nested_dict(event_attrs) + for key in self._GENAI_EVENT_KEYS: + if key in flat and key not in attributes: + attributes[key] = flat[key] + else: + for attr in event_attrs: + key = attr.get("key", "") + if key in self._GENAI_EVENT_KEYS and key not in attributes: + value_obj = attr.get("value", {}) + if "stringValue" in value_obj: + attributes[key] = value_obj["stringValue"] + + def _extract_attributes(self, attrs) -> dict: + """Convert attributes to a flat ``{key: value}`` dict. + + Accepts three formats: + 1. OTLP array: ``[{key, value: {stringValue|intValue|...}}]`` + 2. Flat dict: ``{"gen_ai.operation.name": "chat"}`` + 3. Nested dict (ClickHouse JSON column): ``{"gen_ai": {"operation": {"name": "chat"}}}`` + + Formats 2 and 3 are auto-detected by checking whether *attrs* is a dict. + Nested dicts are recursively flattened to dot-notation keys. """ + if isinstance(attrs, dict): + return self._flatten_nested_dict(attrs) + result = {} - for attr in attrs_list: + for attr in attrs: key = attr.get("key", "") value_obj = attr.get("value", {}) @@ -157,6 +180,25 @@ def _extract_attributes(self, attrs_list: list[dict]) -> dict: return result + @staticmethod + def _flatten_nested_dict(d: dict, prefix: str = "") -> dict: + """Recursively flatten a nested dict to dot-notation keys. + + ``{"gen_ai": {"operation": {"name": "chat"}}}`` + becomes ``{"gen_ai.operation.name": "chat"}``. + + Already-flat keys (e.g. ``{"service.name": "agent"}``) pass through + unchanged. + """ + result = {} + for key, value in d.items(): + full_key = f"{prefix}{key}" if not prefix else f"{prefix}.{key}" + if isinstance(value, dict): + result.update(OtlpJsonLoader._flatten_nested_dict(value, full_key)) + else: + result[full_key] = value + return result + def _build_traces(self, all_spans: list[Span]) -> list[Trace]: """Group spans by trace_id and build parent-child relationships.""" traces_by_id: dict[str, list[Span]] = {} diff --git a/src/agentevals/runner.py b/src/agentevals/runner.py index 93bf232..4c0ac59 100644 --- a/src/agentevals/runner.py +++ b/src/agentevals/runner.py @@ -17,10 +17,11 @@ from .builtin_metrics import evaluate_builtin_metric from .config import ( CustomEvaluatorDef, + EvalParams, EvalRunConfig, ) from .converter import ConversionResult, convert_traces -from .loader.base import TraceLoader +from .loader.base import Trace, TraceLoader from .loader.jaeger import JaegerJsonLoader from .loader.otlp import OtlpJsonLoader from .trace_metrics import _calc_percentiles, extract_performance_metrics @@ -77,45 +78,33 @@ def load_eval_set(path: str) -> EvalSet: return EvalSet.model_validate(data) -async def run_evaluation( - config: EvalRunConfig, +def load_eval_set_from_dict(data: dict) -> EvalSet: + """Parse an ADK EvalSet from a dict (for programmatic / API use).""" + return EvalSet.model_validate(data) + + +async def run_evaluation_from_traces( + traces: list[Trace], + config: EvalParams, + eval_set: EvalSet | None = None, progress_callback: ProgressCallback | None = None, trace_progress_callback: TraceProgressCallback | None = None, ) -> RunResult: + """Evaluate pre-loaded traces. Skips file loading.""" result = RunResult() - loader = get_loader(config.trace_format) - all_traces = [] - for trace_file in config.trace_files: - try: - traces = loader.load(trace_file) - all_traces.extend(traces) - except Exception as exc: - msg = f"Failed to load trace file '{trace_file}': {exc}" - logger.error(msg) - result.errors.append(msg) - - if not all_traces: - result.errors.append("No traces loaded.") + if not traces: + result.errors.append("No traces provided.") return result - conversion_results = convert_traces(all_traces) + conversion_results = convert_traces(traces) - trace_map = {t.trace_id: t for t in all_traces} + trace_map = {t.trace_id: t for t in traces} perf_metrics_map: dict[str, dict[str, Any]] = {} - for trace in all_traces: + for trace in traces: perf_metrics_map[trace.trace_id] = extract_performance_metrics(trace) - eval_set: EvalSet | None = None - if config.eval_set_file: - try: - eval_set = load_eval_set(config.eval_set_file) - except Exception as exc: - msg = f"Failed to load eval set '{config.eval_set_file}': {exc}" - logger.error(msg) - result.errors.append(msg) - total_traces = len(conversion_results) if progress_callback: await progress_callback(f"Evaluating {total_traces} trace{'s' if total_traces != 1 else ''}...") @@ -226,6 +215,48 @@ async def _evaluate_trace_bounded(idx: int, conv_result: ConversionResult) -> Tr return result +async def run_evaluation( + config: EvalRunConfig, + progress_callback: ProgressCallback | None = None, + trace_progress_callback: TraceProgressCallback | None = None, +) -> RunResult: + """Load traces from files, then evaluate. Delegates to ``run_evaluation_from_traces``.""" + load_errors: list[str] = [] + + loader = get_loader(config.trace_format) + all_traces: list[Trace] = [] + for trace_file in config.trace_files: + try: + all_traces.extend(loader.load(trace_file)) + except Exception as exc: + msg = f"Failed to load trace file '{trace_file}': {exc}" + logger.error(msg) + load_errors.append(msg) + + if not all_traces: + return RunResult(errors=[*load_errors, "No traces loaded."]) + + eval_set: EvalSet | None = None + if config.eval_set_file: + try: + eval_set = load_eval_set(config.eval_set_file) + except Exception as exc: + msg = f"Failed to load eval set '{config.eval_set_file}': {exc}" + logger.error(msg) + load_errors.append(msg) + + result = await run_evaluation_from_traces( + traces=all_traces, + config=config, + eval_set=eval_set, + progress_callback=progress_callback, + trace_progress_callback=trace_progress_callback, + ) + if load_errors: + result.errors = load_errors + result.errors + return result + + async def _evaluate_trace( conv_result: ConversionResult, metrics: list[str], From 93e3e522e500aff999c640860122f1b774d0287d Mon Sep 17 00:00:00 2001 From: krisztianfekete Date: Thu, 16 Apr 2026 16:58:04 +0200 Subject: [PATCH 2/3] address review comments --- src/agentevals/api/app.py | 3 +- src/agentevals/api/routes.py | 25 +++- src/agentevals/config.py | 2 + tests/test_api.py | 221 +++++++++++++++++++++++++++++++ tests/test_otlp_loader.py | 244 +++++++++++++++++++++++++++++++++++ 5 files changed, 489 insertions(+), 6 deletions(-) diff --git a/src/agentevals/api/app.py b/src/agentevals/api/app.py index 25f827b..ec3b3dd 100644 --- a/src/agentevals/api/app.py +++ b/src/agentevals/api/app.py @@ -10,8 +10,7 @@ from pathlib import Path from typing import TYPE_CHECKING -from fastapi import FastAPI, Request -from fastapi import WebSocket +from fastapi import FastAPI, Request, WebSocket from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse diff --git a/src/agentevals/api/routes.py b/src/agentevals/api/routes.py index 0ce2dbb..14a6cf6 100644 --- a/src/agentevals/api/routes.py +++ b/src/agentevals/api/routes.py @@ -11,7 +11,7 @@ import tempfile from typing import Any -from fastapi import APIRouter, File, Form, HTTPException, UploadFile +from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile from fastapi.responses import StreamingResponse from pydantic.alias_generators import to_camel @@ -70,6 +70,8 @@ def _camel_keys(obj: Any) -> Any: router = APIRouter() +_MAX_JSON_BODY_BYTES = 50 * 1024 * 1024 # 50 MB (multipart endpoints allow 10 MB per file) + _TYPE_TO_MODEL = { "builtin": BuiltinMetricDef, "code": CodeEvaluatorDef, @@ -741,8 +743,15 @@ async def run_with_progress(): @router.post("/evaluate/json", response_model=StandardResponse[RunResult]) -async def evaluate_traces_json(request: EvaluateJsonRequest): +async def evaluate_traces_json(request: EvaluateJsonRequest, raw_request: Request): """Evaluate OTLP JSON traces passed in the request body.""" + content_length = int(raw_request.headers.get("content-length", 0)) + if content_length > _MAX_JSON_BODY_BYTES: + raise HTTPException( + status_code=413, + detail=f"Request body exceeds {_MAX_JSON_BODY_BYTES // (1024 * 1024)}MB limit", + ) + try: traces = OtlpJsonLoader().load_from_dict(request.traces) except ValueError as exc: @@ -760,7 +769,9 @@ async def evaluate_traces_json(request: EvaluateJsonRequest): try: result = await run_evaluation_from_traces( - traces=traces, config=request.config, eval_set=eval_set, + traces=traces, + config=request.config, + eval_set=eval_set, ) return StandardResponse(data=_camel_keys(result.model_dump(by_alias=True))) except Exception as exc: @@ -769,8 +780,14 @@ async def evaluate_traces_json(request: EvaluateJsonRequest): @router.post("/evaluate/json/stream") -async def evaluate_traces_json_stream(request: EvaluateJsonRequest): +async def evaluate_traces_json_stream(request: EvaluateJsonRequest, raw_request: Request): """Evaluate OTLP JSON traces with real-time progress via SSE.""" + content_length = int(raw_request.headers.get("content-length", 0)) + if content_length > _MAX_JSON_BODY_BYTES: + raise HTTPException( + status_code=413, + detail=f"Request body exceeds {_MAX_JSON_BODY_BYTES // (1024 * 1024)}MB limit", + ) async def event_generator(): try: diff --git a/src/agentevals/config.py b/src/agentevals/config.py index 2c65dec..1f61b8b 100644 --- a/src/agentevals/config.py +++ b/src/agentevals/config.py @@ -144,11 +144,13 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None: max_concurrent_traces: int = Field( default=10, + ge=1, description="Maximum number of traces to evaluate concurrently.", ) max_concurrent_evals: int = Field( default=5, + ge=1, description="Maximum number of concurrent metric evaluations (LLM API calls).", ) diff --git a/tests/test_api.py b/tests/test_api.py index fd513f2..80f1593 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -588,6 +588,227 @@ def test_stream_done_event(self, mock_loader, mock_eval): assert "traceResults" in done["result"] +# --------------------------------------------------------------------------- +# POST /api/evaluate/json +# --------------------------------------------------------------------------- + + +def _make_otlp_json_payload() -> dict: + return { + "resourceSpans": [ + { + "resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "test"}}]}, + "scopeSpans": [ + { + "scope": {"name": "gcp.vertex.agent"}, + "spans": [ + { + "traceId": "abc123", + "spanId": "span1", + "name": "invoke_agent test", + "startTimeUnixNano": "1000000000", + "endTimeUnixNano": "2000000000", + "attributes": [ + {"key": "gen_ai.operation.name", "value": {"stringValue": "invoke_agent"}}, + ], + } + ], + } + ], + } + ], + } + + +class TestEvaluateJson: + @classmethod + def setup_class(cls): + cls.client = TestClient(_make_app()) + + @patch("agentevals.api.routes.run_evaluation_from_traces", new_callable=AsyncMock) + def test_evaluate_json_success(self, mock_eval): + mock_eval.return_value = _make_run_result() + resp = self.client.post( + "/api/evaluate/json", + json={ + "traces": _make_otlp_json_payload(), + "config": {"metrics": ["tool_trajectory_avg_score"]}, + }, + ) + body = _assert_envelope(resp) + assert "traceResults" in body["data"] + assert len(body["data"]["traceResults"]) == 1 + + @patch("agentevals.api.routes.run_evaluation_from_traces", new_callable=AsyncMock) + def test_evaluate_json_camel_case_config(self, mock_eval): + mock_eval.return_value = _make_run_result() + resp = self.client.post( + "/api/evaluate/json", + json={ + "traces": _make_otlp_json_payload(), + "config": { + "metrics": ["hallucinations_v1"], + "judgeModel": "gemini-2.0-flash", + "maxConcurrentTraces": 5, + }, + }, + ) + body = _assert_envelope(resp) + assert "traceResults" in body["data"] + call_config = mock_eval.call_args.kwargs["config"] + assert call_config.judge_model == "gemini-2.0-flash" + assert call_config.max_concurrent_traces == 5 + + def test_evaluate_json_missing_resource_spans(self): + resp = self.client.post( + "/api/evaluate/json", + json={"traces": {"foo": "bar"}, "config": {}}, + ) + assert resp.status_code == 400 + assert "resourceSpans" in resp.json()["detail"] + + def test_evaluate_json_empty_traces(self): + resp = self.client.post( + "/api/evaluate/json", + json={"traces": {"resourceSpans": []}, "config": {}}, + ) + assert resp.status_code == 400 + assert "No traces" in resp.json()["detail"] + + @patch("agentevals.api.routes.run_evaluation_from_traces", new_callable=AsyncMock) + def test_evaluate_json_with_eval_set(self, mock_eval): + mock_eval.return_value = _make_run_result() + resp = self.client.post( + "/api/evaluate/json", + json={ + "traces": _make_otlp_json_payload(), + "config": {"metrics": ["tool_trajectory_avg_score"]}, + "evalSet": { + "eval_set_id": "test", + "eval_cases": [ + { + "eval_id": "c1", + "conversation": [ + { + "invocation_id": "inv1", + "user_content": {"role": "user", "parts": [{"text": "hi"}]}, + "final_response": {"role": "model", "parts": [{"text": "hello"}]}, + } + ], + } + ], + }, + }, + ) + _assert_envelope(resp) + assert mock_eval.call_args.kwargs["eval_set"] is not None + + def test_evaluate_json_invalid_eval_set(self): + resp = self.client.post( + "/api/evaluate/json", + json={ + "traces": _make_otlp_json_payload(), + "config": {}, + "evalSet": {"not_valid": True}, + }, + ) + assert resp.status_code == 400 + assert "eval set" in resp.json()["detail"].lower() + + def test_evaluate_json_invalid_concurrency(self): + resp = self.client.post( + "/api/evaluate/json", + json={ + "traces": _make_otlp_json_payload(), + "config": {"maxConcurrentTraces": 0}, + }, + ) + assert resp.status_code == 422 + + @patch("agentevals.api.routes.run_evaluation_from_traces", new_callable=AsyncMock) + def test_evaluate_json_camel_keys_in_result(self, mock_eval): + mock_eval.return_value = _make_run_result() + resp = self.client.post( + "/api/evaluate/json", + json={ + "traces": _make_otlp_json_payload(), + "config": {"metrics": ["tool_trajectory_avg_score"]}, + }, + ) + body = resp.json() + _assert_all_keys_camel(body) + + @patch("agentevals.api.routes.run_evaluation_from_traces", new_callable=AsyncMock) + def test_evaluate_json_default_config(self, mock_eval): + mock_eval.return_value = _make_run_result() + resp = self.client.post( + "/api/evaluate/json", + json={"traces": _make_otlp_json_payload()}, + ) + body = _assert_envelope(resp) + assert "traceResults" in body["data"] + + +# --------------------------------------------------------------------------- +# POST /api/evaluate/json/stream (SSE) +# --------------------------------------------------------------------------- + + +class TestEvaluateJsonStream: + @classmethod + def setup_class(cls): + cls.client = TestClient(_make_app()) + + @patch("agentevals.api.routes.run_evaluation_from_traces", new_callable=AsyncMock) + @patch("agentevals.api.routes.OtlpJsonLoader") + def test_stream_content_type(self, mock_loader_cls, mock_eval): + mock_loader_cls.return_value.load_from_dict.return_value = [] + mock_eval.return_value = _make_run_result() + resp = self.client.post( + "/api/evaluate/json/stream", + json={"traces": _make_otlp_json_payload(), "config": {}}, + ) + assert resp.headers["content-type"].startswith("text/event-stream") + + @patch("agentevals.api.routes.run_evaluation_from_traces", new_callable=AsyncMock) + @patch("agentevals.api.routes.OtlpJsonLoader") + def test_stream_done_event(self, mock_loader_cls, mock_eval): + mock_trace = MagicMock() + mock_trace.trace_id = "abc123" + mock_loader_cls.return_value.load_from_dict.return_value = [mock_trace] + mock_eval.return_value = _make_run_result() + + resp = self.client.post( + "/api/evaluate/json/stream", + json={"traces": _make_otlp_json_payload(), "config": {}}, + ) + lines = resp.text.strip().split("\n") + data_lines = [line for line in lines if line.startswith("data: ")] + done_events = [json.loads(line[6:]) for line in data_lines if '"done"' in line] + assert len(done_events) == 1 + assert done_events[0]["done"] is True + assert "traceResults" in done_events[0]["result"] + + def test_stream_error_on_invalid_traces(self): + resp = self.client.post( + "/api/evaluate/json/stream", + json={"traces": {"no_resource_spans": True}, "config": {}}, + ) + assert resp.status_code == 200 + body = resp.text + assert '"error"' in body + assert "resourceSpans" in body + + def test_stream_error_on_empty_traces(self): + resp = self.client.post( + "/api/evaluate/json/stream", + json={"traces": {"resourceSpans": []}, "config": {}}, + ) + body = resp.text + assert '"error"' in body + assert "No traces" in body + + # --------------------------------------------------------------------------- # GET /api/streaming/sessions # --------------------------------------------------------------------------- diff --git a/tests/test_otlp_loader.py b/tests/test_otlp_loader.py index 0c61478..aad3937 100644 --- a/tests/test_otlp_loader.py +++ b/tests/test_otlp_loader.py @@ -208,3 +208,247 @@ def test_otlp_loader_empty_file(): finally: Path(temp_path).unlink() + + +class TestLoadFromDict: + """Tests for OtlpJsonLoader.load_from_dict().""" + + def test_load_from_dict_basic(self): + loader = OtlpJsonLoader() + data = { + "resourceSpans": [ + { + "resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "test-agent"}}]}, + "scopeSpans": [ + { + "scope": {"name": "gcp.vertex.agent", "version": "1.0"}, + "spans": [ + { + "traceId": "abc123", + "spanId": "span1", + "name": "invoke_agent", + "startTimeUnixNano": "1000000000", + "endTimeUnixNano": "2000000000", + "attributes": [{"key": "gen_ai.agent.name", "value": {"stringValue": "my_agent"}}], + } + ], + } + ], + } + ], + } + traces = loader.load_from_dict(data) + + assert len(traces) == 1 + assert traces[0].trace_id == "abc123" + span = traces[0].all_spans[0] + assert span.tags["gen_ai.agent.name"] == "my_agent" + assert span.tags["service.name"] == "test-agent" + assert span.tags["otel.scope.name"] == "gcp.vertex.agent" + + def test_load_from_dict_missing_resource_spans(self): + loader = OtlpJsonLoader() + with pytest.raises(ValueError, match="resourceSpans"): + loader.load_from_dict({"foo": "bar"}) + + def test_load_from_dict_empty_resource_spans(self): + loader = OtlpJsonLoader() + traces = loader.load_from_dict({"resourceSpans": []}) + assert traces == [] + + +class TestFlatDictAttributes: + """Tests for flat dict attribute format (e.g. from simplified producers).""" + + def test_span_attributes_as_flat_dict(self): + loader = OtlpJsonLoader() + data = { + "resourceSpans": [ + { + "resource": {"attributes": {"service.name": "my-agent"}}, + "scopeSpans": [ + { + "scope": {"name": "test-scope"}, + "spans": [ + { + "traceId": "t1", + "spanId": "s1", + "name": "test", + "startTimeUnixNano": "1000000000", + "endTimeUnixNano": "2000000000", + "attributes": { + "gen_ai.operation.name": "chat", + "gen_ai.usage.input_tokens": 167, + "gen_ai.usage.output_tokens": 42, + "enabled": True, + }, + } + ], + } + ], + } + ], + } + traces = loader.load_from_dict(data) + span = traces[0].all_spans[0] + + assert span.tags["gen_ai.operation.name"] == "chat" + assert span.tags["gen_ai.usage.input_tokens"] == 167 + assert span.tags["gen_ai.usage.output_tokens"] == 42 + assert span.tags["enabled"] is True + assert span.tags["service.name"] == "my-agent" + + def test_resource_attributes_as_flat_dict(self): + loader = OtlpJsonLoader() + data = { + "resourceSpans": [ + { + "resource": {"attributes": {"service.name": "agent", "k8s.namespace.name": "default"}}, + "scopeSpans": [ + { + "scope": {}, + "spans": [ + { + "traceId": "t1", + "spanId": "s1", + "name": "test", + "startTimeUnixNano": "0", + "endTimeUnixNano": "0", + "attributes": [], + } + ], + } + ], + } + ], + } + traces = loader.load_from_dict(data) + span = traces[0].all_spans[0] + assert span.tags["service.name"] == "agent" + assert span.tags["k8s.namespace.name"] == "default" + + +class TestNestedDictAttributes: + """Tests for ClickHouse JSON column format (nested dicts auto-flattened).""" + + def test_nested_dict_flattened_to_dot_notation(self): + loader = OtlpJsonLoader() + data = { + "resourceSpans": [ + { + "resource": { + "attributes": { + "service": {"name": "my-agent"}, + "k8s": {"namespace": {"name": "prod"}}, + "cluster_name": "mgmt", + } + }, + "scopeSpans": [ + { + "scope": {"name": "strands.telemetry.tracer"}, + "spans": [ + { + "traceId": "t1", + "spanId": "s1", + "name": "invoke_agent", + "startTimeUnixNano": "1000000000", + "endTimeUnixNano": "2000000000", + "attributes": { + "gen_ai": { + "operation": {"name": "invoke_agent"}, + "agent": {"name": "dice_agent"}, + "request": {"model": "gpt-4o"}, + "usage": {"input_tokens": 167, "output_tokens": 11}, + }, + }, + } + ], + } + ], + } + ], + } + traces = loader.load_from_dict(data) + span = traces[0].all_spans[0] + + assert span.tags["gen_ai.operation.name"] == "invoke_agent" + assert span.tags["gen_ai.agent.name"] == "dice_agent" + assert span.tags["gen_ai.request.model"] == "gpt-4o" + assert span.tags["gen_ai.usage.input_tokens"] == 167 + assert span.tags["gen_ai.usage.output_tokens"] == 11 + assert span.tags["service.name"] == "my-agent" + assert span.tags["k8s.namespace.name"] == "prod" + assert span.tags["cluster_name"] == "mgmt" + + def test_nested_event_attributes_flattened(self): + loader = OtlpJsonLoader() + messages_json = '[{"role": "user", "parts": [{"text": "Hello"}]}]' + data = { + "resourceSpans": [ + { + "resource": {"attributes": {}}, + "scopeSpans": [ + { + "scope": {}, + "spans": [ + { + "traceId": "t1", + "spanId": "s1", + "name": "chat", + "startTimeUnixNano": "0", + "endTimeUnixNano": "0", + "attributes": {}, + "events": [ + { + "timeUnixNano": "0", + "name": "gen_ai.client.inference.operation.details", + "attributes": { + "gen_ai": { + "input": {"messages": messages_json}, + }, + }, + } + ], + } + ], + } + ], + } + ], + } + traces = loader.load_from_dict(data) + span = traces[0].all_spans[0] + assert span.tags["gen_ai.input.messages"] == messages_json + + def test_mixed_nested_and_flat_keys(self): + """Keys that are already flat should pass through unchanged.""" + loader = OtlpJsonLoader() + data = { + "resourceSpans": [ + { + "resource": {"attributes": {}}, + "scopeSpans": [ + { + "scope": {}, + "spans": [ + { + "traceId": "t1", + "spanId": "s1", + "name": "test", + "startTimeUnixNano": "0", + "endTimeUnixNano": "0", + "attributes": { + "simple_key": "simple_value", + "nested": {"deep": {"key": 42}}, + }, + } + ], + } + ], + } + ], + } + traces = loader.load_from_dict(data) + span = traces[0].all_spans[0] + assert span.tags["simple_key"] == "simple_value" + assert span.tags["nested.deep.key"] == 42 From d020d3bc8b2d74394e15d2fcf241eb8f9225a6a1 Mon Sep 17 00:00:00 2001 From: krisztianfekete Date: Fri, 17 Apr 2026 09:12:32 +0200 Subject: [PATCH 3/3] address suggestions from review --- src/agentevals/api/routes.py | 63 ++++++++++++++++++------------------ src/agentevals/config.py | 4 ++- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/src/agentevals/api/routes.py b/src/agentevals/api/routes.py index 14a6cf6..97fed6a 100644 --- a/src/agentevals/api/routes.py +++ b/src/agentevals/api/routes.py @@ -742,16 +742,11 @@ async def run_with_progress(): ) -@router.post("/evaluate/json", response_model=StandardResponse[RunResult]) -async def evaluate_traces_json(request: EvaluateJsonRequest, raw_request: Request): - """Evaluate OTLP JSON traces passed in the request body.""" - content_length = int(raw_request.headers.get("content-length", 0)) - if content_length > _MAX_JSON_BODY_BYTES: - raise HTTPException( - status_code=413, - detail=f"Request body exceeds {_MAX_JSON_BODY_BYTES // (1024 * 1024)}MB limit", - ) +def _parse_json_request(request: EvaluateJsonRequest): + """Parse traces and eval set from an EvaluateJsonRequest. + Returns (traces, eval_set). Raises HTTPException on invalid input. + """ try: traces = OtlpJsonLoader().load_from_dict(request.traces) except ValueError as exc: @@ -767,6 +762,28 @@ async def evaluate_traces_json(request: EvaluateJsonRequest, raw_request: Reques except Exception as exc: raise HTTPException(status_code=400, detail=f"Invalid eval set: {exc}") from exc + return traces, eval_set + + +def _check_json_body_size(raw_request: Request): + content_length = int(raw_request.headers.get("content-length", 0)) + if content_length > _MAX_JSON_BODY_BYTES: + raise HTTPException( + status_code=413, + detail=f"Request body exceeds {_MAX_JSON_BODY_BYTES // (1024 * 1024)}MB limit", + ) + + +def _sse_error(message: str) -> str: + return f"data: {SSEErrorEvent(error=message).model_dump_json(by_alias=True)}\n\n" + + +@router.post("/evaluate/json", response_model=StandardResponse[RunResult]) +async def evaluate_traces_json(request: EvaluateJsonRequest, raw_request: Request): + """Evaluate OTLP JSON traces passed in the request body.""" + _check_json_body_size(raw_request) + traces, eval_set = _parse_json_request(request) + try: result = await run_evaluation_from_traces( traces=traces, @@ -782,33 +799,16 @@ async def evaluate_traces_json(request: EvaluateJsonRequest, raw_request: Reques @router.post("/evaluate/json/stream") async def evaluate_traces_json_stream(request: EvaluateJsonRequest, raw_request: Request): """Evaluate OTLP JSON traces with real-time progress via SSE.""" - content_length = int(raw_request.headers.get("content-length", 0)) - if content_length > _MAX_JSON_BODY_BYTES: - raise HTTPException( - status_code=413, - detail=f"Request body exceeds {_MAX_JSON_BODY_BYTES // (1024 * 1024)}MB limit", - ) + _check_json_body_size(raw_request) async def event_generator(): try: try: - traces = OtlpJsonLoader().load_from_dict(request.traces) - except ValueError as exc: - yield f"data: {SSEErrorEvent(error=str(exc)).model_dump_json(by_alias=True)}\n\n" + traces, eval_set = _parse_json_request(request) + except HTTPException as exc: + yield _sse_error(exc.detail) return - if not traces: - yield f"data: {SSEErrorEvent(error='No traces found in OTLP JSON').model_dump_json(by_alias=True)}\n\n" - return - - eval_set = None - if request.eval_set: - try: - eval_set = load_eval_set_from_dict(request.eval_set) - except Exception as exc: - yield f"data: {SSEErrorEvent(error=f'Invalid eval set: {exc}').model_dump_json(by_alias=True)}\n\n" - return - for trace in traces: try: extractor = get_extractor(trace) @@ -875,8 +875,7 @@ async def run_with_progress(): except Exception as exc: logger.exception("JSON evaluation stream failed") - evt = SSEErrorEvent(error=str(exc)) - yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" + yield _sse_error(str(exc)) return StreamingResponse( event_generator(), diff --git a/src/agentevals/config.py b/src/agentevals/config.py index 1f61b8b..8c7d01a 100644 --- a/src/agentevals/config.py +++ b/src/agentevals/config.py @@ -126,7 +126,9 @@ class EvalParams(BaseModel): threshold: float | None = Field( default=None, - description="Score threshold for pass/fail.", + ge=0, + le=1, + description="Score threshold for pass/fail (0.0 to 1.0).", ) trajectory_match_type: str | None = Field(