From 88ce8991ceb089b40cd5b8e26c3bfe8544464a64 Mon Sep 17 00:00:00 2001 From: Felix Kampfer Date: Tue, 16 Dec 2025 13:45:19 +0100 Subject: [PATCH 1/7] refactor: use built-in testkube expression to get name of workflow (see also: https://docs.testkube.io/articles/test-workflows-expressions) --- .github/workflows/ci.yml | 1 - CLAUDE.md | 2 -- README.md | 2 -- deploy/base/templates/publish-template.yaml | 5 +---- deploy/local/ragas-evaluation-workflow.yaml | 6 ------ 5 files changed, 1 insertion(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fc0fed4..259201c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -136,7 +136,6 @@ jobs: --config datasetUrl="http://data-server.data-server:8000/dataset.csv" \ --config agentUrl="http://agent-gateway-krakend.agent-gateway-krakend:10000/weather-agent" \ --config metrics="nv_accuracy context_recall" \ - --config workflowName="Testworkflow-Name" \ --config image="${{ steps.extract-tag.outputs.image-tag }}" \ -n testkube \ --watch diff --git a/CLAUDE.md b/CLAUDE.md index 179c46d..0558890 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -75,7 +75,6 @@ kubectl testkube run testworkflow ragas-evaluation-workflow \ --config datasetUrl="http://data-server.data-server:8000/dataset.csv" \ --config agentUrl="http://weather-agent.sample-agents:8000" \ --config metrics="nv_accuracy context_recall" \ - --config workflowName="Test-Run" \ -n testkube # Watch workflow execution @@ -155,7 +154,6 @@ Observability Backend (Grafana) - `agentUrl` - A2A endpoint of agent to evaluate - `model` - LLM model for RAGAS evaluation (e.g., `gemini-2.5-flash-lite`) - `metrics` - Space-separated RAGAS metrics (e.g., `faithfulness context_recall`) -- `workflowName` - Label for published metrics - `otlpEndpoint` - OpenTelemetry collector URL (default: `http://lgtm.monitoring:4318`) - `image` - Docker image to use (default: `ghcr.io/agentic-layer/testbench/testworkflows:latest`) diff --git a/README.md b/README.md index 46a7b24..aa6cf48 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,6 @@ kubectl testkube run testworkflow ragas-evaluation-workflow \ --config datasetUrl="http://data-server.data-server:8000/dataset.csv" \ --config agentUrl="http://agent-gateway-krakend.agent-gateway-krakend:10000/weather-agent" \ --config metrics="nv_accuracy context_recall" \ - --config workflowName="Testworkflow-Name" \ --config image="ghcr.io/agentic-layer/testbench/testworkflows:latest" \ -n testkube ``` @@ -144,7 +143,6 @@ kubectl testkube run testworkflow ragas-evaluation-workflow \ --config datasetUrl="http://data-server.data-server:8000/dataset.csv" \ --config agentUrl="http://agent-gateway-krakend.agent-gateway-krakend:10000/weather-agent" \ --config metrics="nv_accuracy context_recall" \ - --config workflowName="Testworkflow-Name" \ --config image="ghcr.io/agentic-layer/testbench/testworkflows:latest" \ --config model="gemini/gemini-2.5-flash" \ --config otlpEndpoint="http://otlp-endpoint:4093" \ diff --git a/deploy/base/templates/publish-template.yaml b/deploy/base/templates/publish-template.yaml index 32ea61b..3bc4cd0 100644 --- a/deploy/base/templates/publish-template.yaml +++ b/deploy/base/templates/publish-template.yaml @@ -9,9 +9,6 @@ metadata: spec: # Configuration parameters that can be overridden config: - workflowName: - type: string - description: "Name of the test workflow (used as label)" otlpEndpoint: type: string description: "URL of the OTLP endpoint" @@ -27,5 +24,5 @@ spec: run: args: - publish.py - - "{{ config.workflowName }}" + - "{{ workflow.name }}" - "{{ config.otlpEndpoint }}" diff --git a/deploy/local/ragas-evaluation-workflow.yaml b/deploy/local/ragas-evaluation-workflow.yaml index 5462082..817a7c0 100644 --- a/deploy/local/ragas-evaluation-workflow.yaml +++ b/deploy/local/ragas-evaluation-workflow.yaml @@ -49,11 +49,6 @@ spec: default: "nv_accuracy context_recall" # Publishing configuration - workflowName: - type: string - description: "Workflow name for metrics" - default: "ragas-test-workflow" - otlpEndpoint: type: string description: "OTLP endpoint URL" @@ -97,6 +92,5 @@ spec: template: name: ragas-publish-template config: - workflowName: "{{ config.workflowName }}" otlpEndpoint: "{{ config.otlpEndpoint }}" image: "{{ config.image }}" From 65514fefbcfa06cf1c133fc60ca143da06d97750 Mon Sep 17 00:00:00 2001 From: Felix Kampfer Date: Fri, 12 Dec 2025 16:59:54 +0100 Subject: [PATCH 2/7] chore: linting --- scripts/run.py | 3 +-- tests/test_run.py | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/scripts/run.py b/scripts/run.py index b93f592..d1860f4 100644 --- a/scripts/run.py +++ b/scripts/run.py @@ -16,11 +16,10 @@ ) from opentelemetry import trace from opentelemetry.trace import Status, StatusCode +from otel_setup import setup_otel from pydantic import BaseModel from ragas import Dataset, experiment -from otel_setup import setup_otel - # Set up module-level logger logging.basicConfig(level=logging.INFO) logger: Logger = logging.getLogger(__name__) diff --git a/tests/test_run.py b/tests/test_run.py index bfbbe60..8cacc30 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -111,9 +111,7 @@ def mock_httpx_client(): # Call the function result = await run_agent_experiment.func( - test_row, - agent_url="http://test-agent:8000", - workflow_name="test-workflow" + test_row, agent_url="http://test-agent:8000", workflow_name="test-workflow" ) # Verify result structure @@ -156,9 +154,7 @@ def mock_httpx_client(): # Call the function result = await run_agent_experiment.func( - test_row, - agent_url="http://test-agent:8000", - workflow_name="test-workflow" + test_row, agent_url="http://test-agent:8000", workflow_name="test-workflow" ) # Verify error is captured in response From 0b05a405d1153200a38f10bfd3d28199f1c8434d Mon Sep 17 00:00:00 2001 From: Felix Kampfer Date: Mon, 15 Dec 2025 16:10:38 +0100 Subject: [PATCH 3/7] fix(ci): configure initialDelaySeconds on deploy/local/lgtm.yaml --- deploy/local/lgtm.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/deploy/local/lgtm.yaml b/deploy/local/lgtm.yaml index 7edc3b9..583db19 100644 --- a/deploy/local/lgtm.yaml +++ b/deploy/local/lgtm.yaml @@ -58,6 +58,9 @@ spec: command: - cat - /tmp/ready + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 30 volumeMounts: - name: tempo-data mountPath: /data/tempo From cd790e7ea095500e69eb7819628659c0c8ed6f10 Mon Sep 17 00:00:00 2001 From: Felix Kampfer Date: Mon, 15 Dec 2025 16:22:26 +0100 Subject: [PATCH 4/7] fix(ci): clean up stuck helm releases before retrying --- .github/workflows/ci.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 259201c..22cb5bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,8 +122,12 @@ jobs: echo "Tilt CI failed after 5 attempts, exiting." exit 1 fi - echo "Tilt CI failed, retrying... ($i/3)" - # Wait a bit for resource to stabilize / cleanup + echo "Tilt CI failed, retrying... ($i/4)" + + # Clean up stuck Helm releases before retrying + helm rollback testkube -n testkube 2>/dev/null || helm uninstall testkube -n testkube 2>/dev/null || true + + # Wait a bit for resources to stabilize sleep 10 done From a63d983e4b1d5881b9692df032f3fd8bdb3b0af3 Mon Sep 17 00:00:00 2001 From: Felix Kampfer Date: Mon, 15 Dec 2025 16:34:21 +0100 Subject: [PATCH 5/7] fix(ci): increase Tilt timeout to 10 minutes --- Tiltfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tiltfile b/Tiltfile index c852cb2..01d7200 100644 --- a/Tiltfile +++ b/Tiltfile @@ -1,7 +1,7 @@ # -*- mode: Python -*- -# Increase Kubernetes upsert timeout for CRD installations -update_settings(max_parallel_updates=10) +# Increase Kubernetes upsert timeout for CRD installations and slow Helm charts (testkube) +update_settings(max_parallel_updates=10, k8s_upsert_timeout_secs=600) # Load .env file for environment variables load('ext://dotenv', 'dotenv') From 263931014bc1f0488268c927a8df90670e37ff7b Mon Sep 17 00:00:00 2001 From: Felix Kampfer Date: Wed, 17 Dec 2025 13:26:47 +0100 Subject: [PATCH 6/7] feat: PAAL-233 refactor gauges to differentiate metrics and execution id --- DetailedUsageAndTroubleshooting.md | 23 ++- README.md | 2 +- deploy/base/templates/publish-template.yaml | 1 + scripts/publish.py | 183 +++++++++++------ tests/test_publish.py | 211 ++++++++++++++------ 5 files changed, 299 insertions(+), 121 deletions(-) diff --git a/DetailedUsageAndTroubleshooting.md b/DetailedUsageAndTroubleshooting.md index 1672c8e..d3c3dd4 100644 --- a/DetailedUsageAndTroubleshooting.md +++ b/DetailedUsageAndTroubleshooting.md @@ -164,12 +164,13 @@ Publishes evaluation metrics to an OpenTelemetry OTLP endpoint for monitoring. **Syntax:** ```shell -python3 scripts/publish.py [otlp_endpoint] +python3 scripts/publish.py [otlp_endpoint] ``` **Arguments:** - `workflow_name` (required): Name of the test workflow (used as metric label) +- `execution_id` (required): Testkube execution ID for this workflow run - `otlp_endpoint` (optional): OTLP HTTP endpoint URL (default: `localhost:4318`) **Input:** @@ -178,17 +179,29 @@ python3 scripts/publish.py [otlp_endpoint] **Published Metrics:** -Each RAGAS metric is published as a gauge with the workflow name as an attribute: +Three gauge types are published to the OTLP endpoint: + +| Gauge Name | Description | Attributes | +|------------|-------------|------------| +| `testbench_evaluation_metric` | Per-sample evaluation scores | `name`, `workflow_name`, `execution_id`, `trace_id` | +| `testbench_evaluation_token_usage` | Token counts from evaluation | `type` (input_tokens/output_tokens), `workflow_name`, `execution_id` | +| `testbench_evaluation_cost` | Total evaluation cost in USD | `workflow_name`, `execution_id` | + +**Example output:** ``` -ragas_evaluation_faithfulness{workflow_name="weather-assistant-eval"} = 0.85 -ragas_evaluation_answer_relevancy{workflow_name="weather-assistant-eval"} = 0.92 +testbench_evaluation_metric{name="faithfulness", workflow_name="weather-eval", execution_id="exec-123", trace_id="abc123..."} = 0.85 +testbench_evaluation_metric{name="context_recall", workflow_name="weather-eval", execution_id="exec-123", trace_id="abc123..."} = 1.0 +testbench_evaluation_token_usage{type="input_tokens", workflow_name="weather-eval", execution_id="exec-123"} = 1500 +testbench_evaluation_token_usage{type="output_tokens", workflow_name="weather-eval", execution_id="exec-123"} = 500 +testbench_evaluation_cost{workflow_name="weather-eval", execution_id="exec-123"} = 0.015 ``` **Notes:** - Sends metrics to `/v1/metrics` endpoint - Uses resource with `service.name="ragas-evaluation"` +- The `trace_id` attribute links metrics to distributed traces for debugging - Forces flush to ensure delivery before exit @@ -254,4 +267,4 @@ user_input,retrieved_contexts,reference - Verify your dataset includes all required fields for the metrics you're using - Check the RAGAS documentation for metric-specific requirements ----- \ No newline at end of file +---- diff --git a/README.md b/README.md index aa6cf48..cb213c2 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,7 @@ uv run python3 scripts/run.py "http://localhost:11010" uv run python3 scripts/evaluate.py gemini-2.5-flash-lite faithfulness answer_relevancy # 4. Publish metrics to OpenTelemetry -uv run python3 scripts/publish.py "my-agent-evaluation" +uv run python3 scripts/publish.py "my-agent-evaluation" "local-exec-001" ``` ---- diff --git a/deploy/base/templates/publish-template.yaml b/deploy/base/templates/publish-template.yaml index 3bc4cd0..d5c2fc0 100644 --- a/deploy/base/templates/publish-template.yaml +++ b/deploy/base/templates/publish-template.yaml @@ -25,4 +25,5 @@ spec: args: - publish.py - "{{ workflow.name }}" + - "{{ execution.id }}" - "{{ config.otlpEndpoint }}" diff --git a/scripts/publish.py b/scripts/publish.py index 599b66b..ed79302 100644 --- a/scripts/publish.py +++ b/scripts/publish.py @@ -1,7 +1,10 @@ import argparse import json import logging +import math +from dataclasses import dataclass from logging import Logger +from typing import Any, TypeGuard from opentelemetry import metrics from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter @@ -14,98 +17,159 @@ logger: Logger = logging.getLogger(__name__) -def get_overall_scores(file_path: str) -> dict[str, float]: - """Load the evaluation_scores.json file and return the 'overall_scores' metrics.""" - with open(file_path, "r") as file: - return json.load(file).get("overall_scores", {}) +@dataclass +class EvaluationData: + """Container for all evaluation data to be published as metrics.""" + + individual_results: list[dict[str, Any]] + total_tokens: dict[str, int] + total_cost: float -def create_and_push_metrics(overall_scores: dict[str, float], workflow_name: str, otlp_endpoint: str) -> None: +def load_evaluation_data(file_path: str) -> EvaluationData: + """Load the evaluation_scores.json file and return the relevant data for metrics.""" + with open(file_path, "r") as file: + data = json.load(file) + return EvaluationData( + individual_results=data.get("individual_results", []), + total_tokens=data.get("total_tokens", {"input_tokens": 0, "output_tokens": 0}), + total_cost=data.get("total_cost", 0.0), + ) + + +def _is_metric_value(value: Any) -> TypeGuard[int | float]: + """Check if a value is a valid metric score (numeric and not NaN).""" + if not isinstance(value, (int, float)): + return False + if isinstance(value, float) and math.isnan(value): + return False + return True + + +def create_and_push_metrics( + evaluation_data: EvaluationData, workflow_name: str, execution_id: str, otlp_endpoint: str +) -> None: """ - Create OpenTelemetry metrics for each overall score and push via OTLP. + Create OpenTelemetry metrics for evaluation results and push via OTLP. + + Creates per-sample gauges for each metric, plus token usage and cost gauges. Args: - overall_scores: Dictionary of metric names to scores + evaluation_data: Container with individual results, token counts, and cost workflow_name: Name of the test workflow (used as label to distinguish workflows) + execution_id: Testkube execution ID for this workflow run otlp_endpoint: URL of the OTLP endpoint (e.g., 'http://localhost:4318') """ - # Ensure the endpoint has the correct protocol if not otlp_endpoint.startswith("http://") and not otlp_endpoint.startswith("https://"): otlp_endpoint = f"http://{otlp_endpoint}" - # Create OTLP exporter exporter = OTLPMetricExporter(endpoint=f"{otlp_endpoint}/v1/metrics") - - # Create a metric reader that exports immediately - reader = PeriodicExportingMetricReader( - exporter=exporter, - export_interval_millis=1000, # Export every second - ) - - # Create resource with workflow metadata + reader = PeriodicExportingMetricReader(exporter=exporter, export_interval_millis=1000) resource = Resource.create({"service.name": "ragas-evaluation", "workflow.name": workflow_name}) - - # Create MeterProvider with the exporter and resource provider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(provider) - - # Get a meter meter = metrics.get_meter("ragas.evaluation", "1.0.0") - # Create and record metrics try: logger.info(f"Pushing metrics to OTLP endpoint at {otlp_endpoint}...") - for metric_name, score in overall_scores.items(): - # Create a Gauge - gauge = meter.create_gauge( - name=f"ragas_evaluation_{metric_name}", - description=f"Overall {metric_name} score from RAGAS evaluation", - unit="1", - ) - - # Set the gauge value with workflow_name as an attribute - gauge.set(score, {"workflow_name": workflow_name}) - logger.info(f"Set metric 'ragas_evaluation_{metric_name}{{workflow_name=\"{workflow_name}\"}}' to {score}") + # Collect metric names from individual results (any numeric field is a metric) + metric_names: set[str] = set() + for result in evaluation_data.individual_results: + for key, value in result.items(): + if _is_metric_value(value): + metric_names.add(key) + + # Single gauge for all evaluation metrics, differentiated by 'name' attribute + metric_gauge = meter.create_gauge( + name="testbench_evaluation_metric", + description="Evaluation metric from RAGAS testbench", + unit="", + ) + + # Set per-sample values for each metric + for metric_name in sorted(metric_names): + for result in evaluation_data.individual_results: + score = result.get(metric_name) + if not _is_metric_value(score): + logger.debug(f"Skipping invalid metric value for {metric_name}: {score}") + continue + trace_id = result.get("trace_id") + if not trace_id: + logger.warning(f"Missing trace_id for sample in execution {execution_id}") + trace_id = "missing-trace-id" + attributes = { + "name": metric_name, + "workflow_name": workflow_name, + "execution_id": execution_id, + "trace_id": trace_id, + } + metric_gauge.set(score, attributes) + logger.info(f"testbench_evaluation_metric{attributes} = {score}") + + # Token usage gauge with 'type' attribute + token_gauge = meter.create_gauge( + name="testbench_evaluation_token_usage", + description="Token usage from RAGAS evaluation", + unit="", + ) + + input_tokens = evaluation_data.total_tokens.get("input_tokens", 0) + token_gauge.set( + input_tokens, {"type": "input_tokens", "workflow_name": workflow_name, "execution_id": execution_id} + ) + logger.info( + f"testbench_evaluation_token_usage{{type=input_tokens, workflow_name={workflow_name}, execution_id={execution_id}}} = {input_tokens}" + ) + + output_tokens = evaluation_data.total_tokens.get("output_tokens", 0) + token_gauge.set( + output_tokens, {"type": "output_tokens", "workflow_name": workflow_name, "execution_id": execution_id} + ) + logger.info( + f"testbench_evaluation_token_usage{{type=output_tokens, workflow_name={workflow_name}, execution_id={execution_id}}} = {output_tokens}" + ) + + # Total cost gauge + cost_gauge = meter.create_gauge( + name="testbench_evaluation_cost", + description="Total cost of RAGAS evaluation in USD", + unit="", + ) + cost_gauge.set(evaluation_data.total_cost, {"workflow_name": workflow_name, "execution_id": execution_id}) + logger.info( + f"testbench_evaluation_cost{{workflow_name={workflow_name}, execution_id={execution_id}}} = {evaluation_data.total_cost}" + ) - # Force flush to ensure metrics are sent provider.force_flush() - - logger.info("✓ Metrics successfully pushed via OTLP") + logger.info("Metrics successfully pushed via OTLP") except Exception as e: - logger.error(f"✗ Error pushing metrics via OTLP: {e}") + logger.error(f"Error pushing metrics via OTLP: {e}") raise finally: - # Shutdown the provider provider.shutdown() - logger.info("Published metrics:") - for metric_name, score in overall_scores.items(): - logger.info(f' - ragas_evaluation_{metric_name}{{workflow_name="{workflow_name}"}}: {score}') - -def publish_metrics(input_file: str, workflow_name: str, otlp_endpoint: str) -> None: +def publish_metrics(input_file: str, workflow_name: str, execution_id: str, otlp_endpoint: str) -> None: """ Publish evaluation metrics via OpenTelemetry OTLP. Args: - input_file: Path to the evaluation scores + input_file: Path to the evaluation scores JSON file workflow_name: Name of the test workflow (e.g., 'weather-assistant-test'). + execution_id: Testkube execution ID for this workflow run. otlp_endpoint: URL of the OTLP endpoint (e.g., 'http://localhost:4318'). """ + logger.info(f"Loading evaluation data from {input_file}...") + evaluation_data = load_evaluation_data(input_file) - # Load overall scores from the evaluation file - logger.info(f"Loading evaluation scores from {input_file}...") - overall_scores = get_overall_scores(input_file) - - if not overall_scores: - logger.warning("No overall scores found in evaluation_scores.json") + if not evaluation_data.individual_results: + logger.warning("No individual results found in evaluation_scores.json") return - # Create and push OpenTelemetry metrics - logger.info(f"Creating OpenTelemetry metrics for {len(overall_scores)} scores...") - logger.info(f"Workflow: {workflow_name}") - create_and_push_metrics(overall_scores, workflow_name, otlp_endpoint) + logger.info(f"Publishing metrics for {len(evaluation_data.individual_results)} samples...") + logger.info(f"Workflow: {workflow_name}, Execution: {execution_id}") + create_and_push_metrics(evaluation_data, workflow_name, execution_id, otlp_endpoint) if __name__ == "__main__": @@ -114,11 +178,12 @@ def publish_metrics(input_file: str, workflow_name: str, otlp_endpoint: str) -> Args: workflow_name: Name of the test workflow + execution_id: Testkube execution ID for this workflow run otlp_endpoint: (OPTIONAL) URL to the OTLP endpoint (default: localhost:4318) Examples: - python3 scripts/publish.py weather-assistant-test - python3 scripts/publish.py weather-assistant-test http://localhost:4318 + python3 scripts/publish.py weather-assistant-test exec-123 + python3 scripts/publish.py weather-assistant-test exec-123 http://localhost:4318 """ parser = argparse.ArgumentParser(description="Publish RAGAS evaluation metrics via OpenTelemetry OTLP") @@ -126,6 +191,10 @@ def publish_metrics(input_file: str, workflow_name: str, otlp_endpoint: str) -> "workflow_name", help="Name of the test workflow (e.g., 'weather-assistant-test')", ) + parser.add_argument( + "execution_id", + help="Testkube execution ID for this workflow run", + ) parser.add_argument( "otlp_endpoint", nargs="?", @@ -135,9 +204,9 @@ def publish_metrics(input_file: str, workflow_name: str, otlp_endpoint: str) -> args = parser.parse_args() - # Call 'publish_metrics' with hardcoded input file and specified 'workflow_name' & 'otlp_endpoint' publish_metrics( input_file="data/results/evaluation_scores.json", workflow_name=args.workflow_name, + execution_id=args.execution_id, otlp_endpoint=args.otlp_endpoint, ) diff --git a/tests/test_publish.py b/tests/test_publish.py index 9eab789..040c5a0 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -5,6 +5,7 @@ """ import json +import math import shutil import sys import tempfile @@ -14,7 +15,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) -from publish import create_and_push_metrics, get_overall_scores, publish_metrics +from publish import EvaluationData, _is_metric_value, create_and_push_metrics, load_evaluation_data, publish_metrics # Mock classes for OpenTelemetry meter provider (used by HTTPXClientInstrumentor) @@ -58,13 +59,28 @@ def temp_dir(): @pytest.fixture def evaluation_scores_file(temp_dir): - """Create a test evaluation scores file""" + """Create a test evaluation scores file with individual results""" test_file = Path(temp_dir) / "evaluation_scores.json" test_data = { "overall_scores": {"faithfulness": 0.85, "answer_relevancy": 0.90}, - "individual_results": [], - "total_tokens": {"input_tokens": 0, "output_tokens": 0}, - "total_cost": 0.0, + "individual_results": [ + { + "user_input": "What is the weather?", + "response": "It is sunny.", + "faithfulness": 0.85, + "answer_relevancy": 0.90, + "trace_id": "a1b2c3d4e5f6789012345678901234aa", + }, + { + "user_input": "What is the time?", + "response": "It is noon.", + "faithfulness": 0.80, + "answer_relevancy": 0.95, + "trace_id": "b2c3d4e5f6a7890123456789012345bb", + }, + ], + "total_tokens": {"input_tokens": 1000, "output_tokens": 200}, + "total_cost": 0.05, } with open(test_file, "w") as f: @@ -90,10 +106,13 @@ def realistic_scores_file(temp_dir): "response": "It is sunny.", "faithfulness": 0.85, "answer_relevancy": 0.90, + "context_precision": 0.78, + "context_recall": 0.82, + "trace_id": "c3d4e5f6a7b8901234567890123456cc", } ], - "total_tokens": {"input_tokens": 0, "output_tokens": 0}, - "total_cost": 0.0, + "total_tokens": {"input_tokens": 500, "output_tokens": 100}, + "total_cost": 0.025, } with open(test_file, "w") as f: @@ -102,25 +121,62 @@ def realistic_scores_file(temp_dir): return test_file -# TestGetOverallScores tests -def test_loads_overall_scores(evaluation_scores_file): - """Test that get_overall_scores loads the overall_scores section""" - scores = get_overall_scores(str(evaluation_scores_file)) +# Test _is_metric_value +def test_is_metric_value_with_float(): + """Test that valid floats are recognized as metric values""" + assert _is_metric_value(0.85) is True + assert _is_metric_value(1.0) is True + assert _is_metric_value(0.0) is True - assert scores["faithfulness"] == 0.85 - assert scores["answer_relevancy"] == 0.90 + +def test_is_metric_value_with_int(): + """Test that integers are recognized as metric values""" + assert _is_metric_value(1) is True + assert _is_metric_value(0) is True + + +def test_is_metric_value_with_nan(): + """Test that NaN is not recognized as a metric value""" + assert _is_metric_value(float("nan")) is False + assert _is_metric_value(math.nan) is False + + +def test_is_metric_value_with_non_numeric(): + """Test that non-numeric values are not recognized as metric values""" + assert _is_metric_value("string") is False + assert _is_metric_value(["list"]) is False + assert _is_metric_value({"dict": "value"}) is False + assert _is_metric_value(None) is False + + +# Test load_evaluation_data +def test_loads_evaluation_data(evaluation_scores_file): + """Test that load_evaluation_data loads all required fields""" + data = load_evaluation_data(str(evaluation_scores_file)) + + assert len(data.individual_results) == 2 + assert data.total_tokens["input_tokens"] == 1000 + assert data.total_tokens["output_tokens"] == 200 + assert data.total_cost == 0.05 def test_file_not_found(temp_dir): """Test behavior when file doesn't exist""" with pytest.raises(FileNotFoundError): - get_overall_scores(str(Path(temp_dir) / "nonexistent.json")) + load_evaluation_data(str(Path(temp_dir) / "nonexistent.json")) # TestCreateAndPushMetrics tests def test_creates_gauges_for_each_metric(monkeypatch): - """Test that a Gauge is created for each metric""" - overall_scores = {"faithfulness": 0.85, "answer_relevancy": 0.90} + """Test that a Gauge is created for each metric plus token/cost gauges""" + evaluation_data = EvaluationData( + individual_results=[ + {"user_input": "Question 1", "faithfulness": 0.85, "answer_relevancy": 0.90, "trace_id": "trace1"}, + {"user_input": "Question 2", "faithfulness": 0.80, "answer_relevancy": 0.95, "trace_id": "trace2"}, + ], + total_tokens={"input_tokens": 1000, "output_tokens": 200}, + total_cost=0.05, + ) # Mock the meter and gauge create_gauge_calls = [] @@ -167,34 +223,46 @@ def mock_exporter_init(endpoint): monkeypatch.setattr("publish.OTLPMetricExporter", mock_exporter_init) create_and_push_metrics( - overall_scores=overall_scores, + evaluation_data=evaluation_data, workflow_name="test-workflow", + execution_id="exec-test-123", otlp_endpoint="localhost:4318", ) - # Verify create_gauge was called for each metric - assert len(create_gauge_calls) == 2 + # Verify gauges created: 1 metric gauge + 1 token gauge + 1 cost gauge = 3 + assert len(create_gauge_calls) == 3 # Verify gauge names gauge_names = [call["name"] for call in create_gauge_calls] - assert "ragas_evaluation_faithfulness" in gauge_names - assert "ragas_evaluation_answer_relevancy" in gauge_names + assert "testbench_evaluation_metric" in gauge_names + assert "testbench_evaluation_token_usage" in gauge_names + assert "testbench_evaluation_cost" in gauge_names -def test_sets_gauge_values(monkeypatch): - """Test that gauge values are set correctly""" - overall_scores = {"faithfulness": 0.85} +def test_sets_per_sample_gauge_values(monkeypatch): + """Test that gauge values are set for each sample with execution_id and trace_id attributes""" + evaluation_data = EvaluationData( + individual_results=[ + {"user_input": "Question 1", "faithfulness": 0.85, "trace_id": "d4e5f6a7b8c9012345678901234567dd"}, + {"user_input": "Question 2", "faithfulness": 0.80, "trace_id": "e5f6a7b8c9d0123456789012345678ee"}, + ], + total_tokens={"input_tokens": 0, "output_tokens": 0}, + total_cost=0.0, + ) # Mock the meter and gauge set_calls = [] class MockGauge: + def __init__(self, name): + self.name = name + def set(self, value, attributes): - set_calls.append({"value": value, "attributes": attributes}) + set_calls.append({"name": self.name, "value": value, "attributes": attributes}) class MockMeter: def create_gauge(self, name, unit=None, description=None): - return MockGauge() + return MockGauge(name) mock_meter = MockMeter() @@ -229,20 +297,38 @@ def mock_exporter_init(endpoint): monkeypatch.setattr("publish.OTLPMetricExporter", mock_exporter_init) create_and_push_metrics( - overall_scores=overall_scores, + evaluation_data=evaluation_data, workflow_name="test-workflow", + execution_id="exec-test-123", otlp_endpoint="localhost:4318", ) - # Verify gauge.set was called with correct value and attributes - assert len(set_calls) == 1 - assert set_calls[0]["value"] == 0.85 - assert set_calls[0]["attributes"] == {"workflow_name": "test-workflow"} + # Filter to faithfulness metric calls only (name attribute = "faithfulness") + faithfulness_calls = [ + c + for c in set_calls + if c["name"] == "testbench_evaluation_metric" and c["attributes"].get("name") == "faithfulness" + ] + assert len(faithfulness_calls) == 2 + + # Verify gauge.set was called with correct values, execution_id, and trace_id attributes + assert faithfulness_calls[0]["value"] == 0.85 + assert faithfulness_calls[0]["attributes"]["workflow_name"] == "test-workflow" + assert faithfulness_calls[0]["attributes"]["execution_id"] == "exec-test-123" + assert faithfulness_calls[0]["attributes"]["trace_id"] == "d4e5f6a7b8c9012345678901234567dd" + + assert faithfulness_calls[1]["value"] == 0.80 + assert faithfulness_calls[1]["attributes"]["execution_id"] == "exec-test-123" + assert faithfulness_calls[1]["attributes"]["trace_id"] == "e5f6a7b8c9d0123456789012345678ee" def test_pushes_via_otlp(monkeypatch): """Test that metrics are pushed via OTLP""" - overall_scores = {"faithfulness": 0.85} + evaluation_data = EvaluationData( + individual_results=[{"user_input": "Q1", "faithfulness": 0.85, "trace_id": "f6a7b8c9d0e1234567890123456789ff"}], + total_tokens={"input_tokens": 100, "output_tokens": 50}, + total_cost=0.01, + ) # Mock the meter and gauge class MockGauge: @@ -288,8 +374,9 @@ def mock_exporter_init(endpoint): monkeypatch.setattr("publish.OTLPMetricExporter", mock_exporter_init) create_and_push_metrics( - overall_scores=overall_scores, + evaluation_data=evaluation_data, workflow_name="test-workflow", + execution_id="exec-test-123", otlp_endpoint="localhost:4318", ) @@ -304,21 +391,14 @@ def mock_exporter_init(endpoint): def test_handles_push_error(monkeypatch): """Test error handling when OTLP export fails""" - overall_scores = {"faithfulness": 0.85} - - # Mock the meter and gauge - class MockGauge: - def set(self, value, attributes): - pass - - class MockMeter: - def create_gauge(self, name, unit=None, description=None): - return MockGauge() - - mock_meter = MockMeter() + evaluation_data = EvaluationData( + individual_results=[{"user_input": "Q1", "faithfulness": 0.85, "trace_id": "a7b8c9d0e1f2345678901234567890aa"}], + total_tokens={"input_tokens": 0, "output_tokens": 0}, + total_cost=0.0, + ) def mock_get_meter(*args, **kwargs): - return mock_meter + return _OtelMockMeter() # Mock the provider to raise an exception on force_flush shutdown_calls = [] @@ -347,8 +427,9 @@ def mock_exporter_init(endpoint): with pytest.raises(Exception, match="Connection refused"): create_and_push_metrics( - overall_scores=overall_scores, + evaluation_data=evaluation_data, workflow_name="test-workflow", + execution_id="exec-test-123", otlp_endpoint="localhost:4318", ) @@ -361,11 +442,12 @@ def test_publish_metrics_calls_create_and_push(evaluation_scores_file, monkeypat """Test that publish_metrics calls create_and_push_metrics""" create_push_calls = [] - def mock_create_push(overall_scores, workflow_name, otlp_endpoint): + def mock_create_push(evaluation_data, workflow_name, execution_id, otlp_endpoint): create_push_calls.append( { - "overall_scores": overall_scores, + "evaluation_data": evaluation_data, "workflow_name": workflow_name, + "execution_id": execution_id, "otlp_endpoint": otlp_endpoint, } ) @@ -375,6 +457,7 @@ def mock_create_push(overall_scores, workflow_name, otlp_endpoint): publish_metrics( input_file=str(evaluation_scores_file), workflow_name="test-workflow", + execution_id="exec-test-123", otlp_endpoint="localhost:4318", ) @@ -382,16 +465,21 @@ def mock_create_push(overall_scores, workflow_name, otlp_endpoint): assert len(create_push_calls) == 1 # Verify parameters - assert create_push_calls[0]["overall_scores"]["faithfulness"] == 0.85 - assert create_push_calls[0]["overall_scores"]["answer_relevancy"] == 0.90 + assert len(create_push_calls[0]["evaluation_data"].individual_results) == 2 assert create_push_calls[0]["workflow_name"] == "test-workflow" + assert create_push_calls[0]["execution_id"] == "exec-test-123" assert create_push_calls[0]["otlp_endpoint"] == "localhost:4318" -def test_publish_metrics_with_empty_scores(temp_dir, monkeypatch): - """Test behavior when overall_scores is empty""" - # Create file with empty overall_scores - test_data = {"overall_scores": {}, "individual_results": []} +def test_publish_metrics_with_empty_results(temp_dir, monkeypatch): + """Test behavior when individual_results is empty""" + # Create file with empty individual_results + test_data = { + "overall_scores": {}, + "individual_results": [], + "total_tokens": {"input_tokens": 0, "output_tokens": 0}, + "total_cost": 0.0, + } empty_file = Path(temp_dir) / "empty_scores.json" with open(empty_file, "w") as f: @@ -399,7 +487,7 @@ def test_publish_metrics_with_empty_scores(temp_dir, monkeypatch): create_push_calls = [] - def mock_create_push(overall_scores, workflow_name, otlp_endpoint): + def mock_create_push(evaluation_data, workflow_name, execution_id, otlp_endpoint): create_push_calls.append(True) monkeypatch.setattr("publish.create_and_push_metrics", mock_create_push) @@ -407,6 +495,7 @@ def mock_create_push(overall_scores, workflow_name, otlp_endpoint): publish_metrics( input_file=str(empty_file), workflow_name="test-workflow", + execution_id="exec-test-123", otlp_endpoint="localhost:4318", ) @@ -467,11 +556,17 @@ def mock_exporter_init(endpoint): publish_metrics( input_file=str(realistic_scores_file), workflow_name="weather-assistant-test", + execution_id="exec-weather-456", otlp_endpoint="localhost:4318", ) # Verify OTLPMetricExporter was called assert len(exporter_calls) == 1 - # Verify 4 metrics were created (faithfulness, answer_relevancy, context_precision, context_recall) - assert len(create_gauge_calls) == 4 + # Verify 3 gauges: 1 metric gauge + 1 token gauge + 1 cost gauge + assert len(create_gauge_calls) == 3 + + gauge_names = [call["name"] for call in create_gauge_calls] + assert "testbench_evaluation_metric" in gauge_names + assert "testbench_evaluation_token_usage" in gauge_names + assert "testbench_evaluation_cost" in gauge_names From 3ebbe260ff170baed0ded34c1def24d404286acd Mon Sep 17 00:00:00 2001 From: Felix Kampfer Date: Wed, 17 Dec 2025 13:27:32 +0100 Subject: [PATCH 7/7] fix: port-forward lgtm:4318 in Tiltfile to test local otel interaction --- Tiltfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tiltfile b/Tiltfile index 01d7200..7829ff8 100644 --- a/Tiltfile +++ b/Tiltfile @@ -39,7 +39,7 @@ k8s_yaml(kustomize('deploy/local')) k8s_resource('ai-gateway-litellm', port_forwards=['11001:4000']) k8s_resource('weather-agent', port_forwards='11010:8000', labels=['agents'], resource_deps=['agent-runtime']) -k8s_resource('lgtm', port_forwards=['11000:3000']) +k8s_resource('lgtm', port_forwards=['11000:3000', '4318:4318']) k8s_resource('data-server', port_forwards='11020:8000') # Declare Testkube resources