diff --git a/src/agentunit/adapters/agentops_adapter.py b/src/agentunit/adapters/agentops_adapter.py index f02d2b4..7ea075a 100644 --- a/src/agentunit/adapters/agentops_adapter.py +++ b/src/agentunit/adapters/agentops_adapter.py @@ -32,6 +32,8 @@ logger = logging.getLogger(__name__) +_LANGSMITH_UNSUPPORTED = "LangSmith features are not supported by AgentOpsAdapter" + class AgentOpsAdapter(MultiAgentAdapter, ProductionIntegration): """ @@ -67,21 +69,9 @@ def __init__( self.project_id = project_id self.default_tags = default_tags or [] self.auto_start_session = auto_start_session + self.enable_tracing: bool = kwargs.get("enable_tracing", True) + self.client: Any = None self.platform = MonitoringPlatform.AGENTOPS - """ - Initialize LangSmith adapter. - - Args: - api_key: LangSmith API key - project_name: Project name for organizing traces - endpoint: Optional custom LangSmith endpoint - enable_tracing: Whether to enable automatic tracing - enable_feedback: Whether to collect feedback data - """ - self.api_key = api_key - self.project_id = project_id - self.default_tags = default_tags or [] - self.auto_start_session = auto_start_session # Initialize AgentOps client self._initialize_agentops() @@ -98,21 +88,18 @@ def _initialize_agentops(self): """Initialize AgentOps client and verify connection.""" try: # Import AgentOps SDK - import agentops - - self.agentops = agentops + import agentops # type: ignore[import-not-found] # Initialize AgentOps - if self.api_key: - agentops.init( - api_key=self.api_key, - default_tags=self.default_tags, - auto_start_session=self.auto_start_session, - ) - else: - agentops.init( - default_tags=self.default_tags, auto_start_session=self.auto_start_session - ) + session = agentops.init( + api_key=self.api_key, + default_tags=self.default_tags, + auto_start_session=self.auto_start_session, + ) + + self.agentops = agentops + self.session = session + self.client = session logger.info("Successfully connected to AgentOps") @@ -124,11 +111,6 @@ def _initialize_agentops(self): logger.error(f"Failed to connect to AgentOps: {e}") raise - @property - def platform(self) -> MonitoringPlatform: - """Return the monitoring platform type.""" - return MonitoringPlatform.AGENTOPS - def create_agent(self, role: AgentRole, agent_id: str | None = None, **kwargs) -> AgentMetadata: """ Create an agent for AgentOps monitoring. @@ -244,7 +226,7 @@ def send_message( interaction = AgentInteraction( interaction_id=interaction_id, from_agent=from_agent, - to_agent=to_agent, + to_agent=to_agent or "broadcast", content=message, timestamp=timestamp, metadata=metadata or {}, @@ -339,12 +321,15 @@ def calculate_coordination_metrics(self) -> dict[str, float]: for interaction in self.session_interactions: unique_agents.add(interaction.from_agent) if interaction.to_agent: - unique_agents.add(interaction.to_agent) + if isinstance(interaction.to_agent, list): + unique_agents.update(interaction.to_agent) + else: + unique_agents.add(interaction.to_agent) agent_participation = len(unique_agents) # Calculate message distribution - agent_counts = {} + agent_counts: dict[str, int] = {} for interaction in self.session_interactions: from_agent = interaction.from_agent agent_counts[from_agent] = agent_counts.get(from_agent, 0) + 1 @@ -374,34 +359,38 @@ def calculate_coordination_metrics(self) -> dict[str, float]: else 0.0, } - def run_scenario(self, scenario: Scenario) -> ScenarioResult: + async def run_scenario(self, scenario: Scenario) -> ScenarioResult: """ - Run a scenario with LangSmith integration. + Run a scenario with AgentOps integration. Args: scenario: Scenario to execute Returns: - ScenarioResult: Execution results with LangSmith trace data + ScenarioResult: Execution results with AgentOps trace data """ - logger.info(f"Running scenario with LangSmith: {scenario.name}") + if self.enable_tracing: + logger.warning( + "run_scenario tracing uses LangSmith-style APIs which are not fully supported by AgentOps" + ) + + logger.info(f"Running scenario with AgentOps: {scenario.name}") - # Start LangSmith run for the scenario - scenario_run_id = None + # Start AgentOps run for the scenario + agentops_trace = None + scenario_run_id: str | None = None if self.enable_tracing: try: - run = self.client.create_run( - name=f"Scenario: {scenario.name}", - run_type="chain", - project_name=self.project_name, - inputs={"scenario": scenario.name, "description": scenario.description}, - tags=["agentunit", "scenario"], + agentops_trace = self.agentops.start_trace( + trace_name=f"Scenario: {scenario.name}", + tags=[*self.default_tags, "agentunit", "scenario"], ) - scenario_run_id = str(run.id) + scenario_run_id = str(agentops_trace) except Exception as e: - logger.warning(f"Failed to create scenario run: {e}") + logger.warning(f"Failed to create scenario trace: {e}") # Execute scenario (this would typically involve running the actual test) + session_summary: dict[str, Any] = {} start_time = time.time() try: @@ -459,9 +448,11 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult: # Create trace log from agentunit.core.trace import TraceLog - trace = TraceLog() - trace.record( - "scenario_complete", run_id=scenario_run_id, session_summary=session_summary + trace_log = TraceLog() + trace_log.record( + "scenario_complete", + run_id=scenario_run_id or "unknown", + session_summary=session_summary, ) # Create scenario run @@ -473,27 +464,32 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult: success=True, metrics=session_summary.get("metrics", {}), duration_ms=execution_time * 1000, - trace=trace, + trace=trace_log, ) # Create result result = ScenarioResult(name=scenario.name) result.add_run(scenario_run) - # Update LangSmith run with results - if scenario_run_id and self.enable_tracing: + # Update AgentOps run with results + trace_metadata = { + "scenario_name": scenario.name, + "metrics": session_summary.get("metrics", {}), + "success": True, + } + + if trace_log and self.enable_tracing: try: - self.client.update_run( - run_id=scenario_run_id, - outputs={ - "result": result.passed, - "execution_time": execution_time, - "details": result.details, - }, - end_time=datetime.now(timezone.utc), + self.agentops.update_trace_metadata( + trace_metadata, + prefix="trace.metadata", + ) + self.agentops.end_trace( + trace=agentops_trace, + end_state="success", ) except Exception as e: - logger.warning(f"Failed to update scenario run: {e}") + logger.warning(f"Failed to finalize scenario trace: {e}") logger.info(f"Scenario completed: {scenario.name}") return result @@ -504,8 +500,8 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult: # Create trace log from agentunit.core.trace import TraceLog - trace = TraceLog() - trace.record("scenario_error", error=str(e), run_id=scenario_run_id) + trace_log = TraceLog() + trace_log.record("scenario_error", error=str(e), run_id=scenario_run_id or "unknown") # Create scenario run from agentunit.reporting.results import ScenarioRun @@ -516,7 +512,7 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult: success=False, metrics={}, duration_ms=(time.time() - start_time) * 1000, - trace=trace, + trace=trace_log, error=str(e), ) @@ -524,22 +520,32 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult: result = ScenarioResult(name=scenario.name) result.add_run(scenario_run) - # Update LangSmith run with error - if scenario_run_id and self.enable_tracing: + # Update AgentOps run with error + trace_metadata = { + "scenario_name": scenario.name, + "metrics": session_summary.get("metrics", {}), + "success": False, + "error": str(e), + } + + if trace_log and self.enable_tracing: try: - self.client.update_run( - run_id=scenario_run_id, - outputs={"error": str(e)}, - end_time=datetime.now(timezone.utc), + self.agentops.update_trace_metadata( + trace_metadata, + prefix="trace.metadata", + ) + self.agentops.end_trace( + trace=agentops_trace, + end_state="failed", ) except Exception as e: - logger.warning(f"Failed to update failed scenario run: {e}") + logger.warning(f"Failed to finalize failed scenario trace: {e}") return result def collect_metrics(self, scenario: Any, result: Any, **kwargs) -> ProductionMetrics: """ - Collect production metrics from LangSmith. + Collect production metrics from AgentOps. Args: scenario: The scenario being evaluated @@ -549,9 +555,12 @@ def collect_metrics(self, scenario: Any, result: Any, **kwargs) -> ProductionMet Returns: ProductionMetrics: Current production metrics """ + + raise NotImplementedError(_LANGSMITH_UNSUPPORTED) + try: - # Query recent runs from LangSmith - runs = list(self.client.list_runs(project_name=self.project_name, limit=100)) + # Query recent runs from AgentOps + runs = list(self.client.list_runs(project_name=self.project_id, limit=100)) if not runs: return ProductionMetrics( @@ -626,6 +635,8 @@ def establish_baseline( Returns: BaselineMetrics: Calculated baseline metrics """ + raise NotImplementedError(_LANGSMITH_UNSUPPORTED) + days = kwargs.get("days", 7) try: from datetime import timedelta @@ -637,7 +648,7 @@ def establish_baseline( # Query historical runs runs = list( self.client.list_runs( - project_name=self.project_name, start_time=start_date, end_time=end_date + project_name=self.project_id, start_time=start_date, end_time=end_date ) ) @@ -680,7 +691,7 @@ def establish_baseline( ) def _extract_run_metrics(self, runs): - """Extract metrics from LangSmith runs.""" + """Extract metrics from AgentOps runs.""" durations = [] token_counts = [] success_count = 0 @@ -717,7 +728,7 @@ def create_evaluation_dataset( self, name: str, examples: list[dict[str, Any]], description: str | None = None ) -> str: """ - Create an evaluation dataset in LangSmith. + Create an evaluation dataset in AgentOps. Args: name: Dataset name @@ -727,6 +738,9 @@ def create_evaluation_dataset( Returns: str: Created dataset ID """ + + raise NotImplementedError(_LANGSMITH_UNSUPPORTED) + try: dataset = self.client.create_dataset( dataset_name=name, @@ -749,9 +763,9 @@ def create_evaluation_dataset( logger.error(f"Failed to create LangSmith dataset: {e}") raise - def run_evaluation(self, dataset_id: str, evaluator_function: Any, **kwargs) -> dict[str, Any]: + def run_evaluation(self, dataset_id: str, evaluator_function: Any, **kwargs) -> Any: """ - Run evaluation on a LangSmith dataset. + Run evaluation on a AgentOps dataset. Args: dataset_id: Dataset ID to evaluate @@ -761,13 +775,16 @@ def run_evaluation(self, dataset_id: str, evaluator_function: Any, **kwargs) -> Returns: Dict[str, Any]: Evaluation results """ + + raise NotImplementedError(_LANGSMITH_UNSUPPORTED) + try: from langsmith.evaluation import evaluate results = evaluate( evaluator_function, data=dataset_id, - project_name=f"{self.project_name}-evaluation", + experiment_prefix=f"{self.project_id}-evaluation", **kwargs, ) diff --git a/src/agentunit/core/scenario.py b/src/agentunit/core/scenario.py index 70bba2a..3523707 100644 --- a/src/agentunit/core/scenario.py +++ b/src/agentunit/core/scenario.py @@ -7,7 +7,7 @@ import random from dataclasses import dataclass, field from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from agentunit.datasets.registry import resolve_dataset @@ -68,7 +68,7 @@ def from_openai_agents( adapter = OpenAIAgentsAdapter.from_flow(flow, **options) ds = resolve_dataset(dataset) - scenario_name = name or getattr(flow, "__name__", "openai-agents-scenario") + scenario_name = name or getattr(flow, "__name__", None) or "openai-agents-scenario" return cls(name=scenario_name, adapter=adapter, dataset=ds) @classmethod @@ -95,7 +95,7 @@ def from_autogen( orchestrator: object, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: """ Create scenario from AutoGen orchestrator. @@ -113,7 +113,7 @@ def from_haystack( pipeline: object, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.haystack import HaystackAdapter @@ -128,7 +128,7 @@ def from_llama_index( engine: object, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.llama_index import LlamaIndexAdapter @@ -143,7 +143,7 @@ def from_semantic_kernel( invoker: object, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.semantic_kernel import SemanticKernelAdapter @@ -158,7 +158,7 @@ def from_phidata( agent: object, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.phidata import PhidataAdapter @@ -173,7 +173,7 @@ def from_promptflow( flow: object, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.promptflow import PromptFlowAdapter @@ -188,7 +188,7 @@ def from_openai_swarm( swarm: object, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.openai_swarm import OpenAISwarmAdapter @@ -204,7 +204,7 @@ def from_anthropic_bedrock( model_id: str, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.anthropic_bedrock import AnthropicBedrockAdapter @@ -219,7 +219,7 @@ def from_mistral_server( base_url: str, dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.mistral_server import MistralServerAdapter @@ -239,7 +239,7 @@ def from_rasa_endpoint( target: str | Callable[[dict], object], dataset: str | DatasetSource | None = None, name: str | None = None, - **options: object, + **options: Any, ) -> Scenario: from agentunit.adapters.rasa import RasaAdapter diff --git a/src/agentunit/datasets/builtins.py b/src/agentunit/datasets/builtins.py index 4b10a0f..4d67445 100644 --- a/src/agentunit/datasets/builtins.py +++ b/src/agentunit/datasets/builtins.py @@ -2,7 +2,9 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TypedDict + +from typing_extensions import NotRequired from .base import DatasetCase, DatasetSource @@ -11,7 +13,18 @@ from collections.abc import Iterable -_GAIA_L1_SHOPPING: list[dict[str, object]] = [ +class DatasetRow(TypedDict): + """Provides structure for type hints""" + + id: str + query: str + expected_output: str | None + tools: list[str] | None + context: list[str] | None + metadata: NotRequired[dict[str, object]] + + +_GAIA_L1_SHOPPING: list[DatasetRow] = [ { "id": "gaia-shopping-001", "query": "Find the best price for a pack of AA rechargeable batteries with at least 2500mAh capacity.", @@ -28,7 +41,7 @@ }, ] -_SWE_BENCH_LITE: list[dict[str, object]] = [ +_SWE_BENCH_LITE: list[DatasetRow] = [ { "id": "swe-lite-001", "query": "Fix the bug where the API returns HTTP 500 when the username is missing.", @@ -48,7 +61,7 @@ ] -def _build_loader(rows: list[dict[str, object]]) -> Iterable[DatasetCase]: +def _build_loader(rows: list[DatasetRow]) -> Iterable[DatasetCase]: for row in rows: yield DatasetCase( id=row["id"], diff --git a/src/agentunit/metrics/builtin.py b/src/agentunit/metrics/builtin.py index 3369dc4..638d0c7 100644 --- a/src/agentunit/metrics/builtin.py +++ b/src/agentunit/metrics/builtin.py @@ -143,8 +143,9 @@ def evaluate(self, case: DatasetCase, trace: TraceLog, outcome: Any) -> MetricRe cost = 0.0 # Check trace metadata - if trace.metadata and "cost" in trace.metadata: - cost = float(trace.metadata["cost"]) + metadata = getattr(trace, "metadata", {}) + if metadata and "cost" in metadata: + cost = float(metadata["cost"]) # Check outcome elif hasattr(outcome, "cost"): @@ -168,8 +169,9 @@ def evaluate(self, case: DatasetCase, trace: TraceLog, outcome: Any) -> MetricRe total_tokens = 0 # Check trace metadata - if trace.metadata and "usage" in trace.metadata: - usage = trace.metadata["usage"] + metadata = getattr(trace, "metadata", {}) + if metadata and "usage" in metadata: + usage = metadata["usage"] prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) total_tokens = usage.get("total_tokens", 0) diff --git a/src/agentunit/production/integrations.py b/src/agentunit/production/integrations.py index f5ab9a7..a7199a1 100644 --- a/src/agentunit/production/integrations.py +++ b/src/agentunit/production/integrations.py @@ -97,7 +97,7 @@ def _calculate_baseline_stats( self, historical_data: list[dict[str, Any]], metrics: list[str] ) -> dict[str, dict[str, dict[str, float]]]: """Calculate baseline statistics from historical data.""" - baseline_stats = { + baseline_stats: dict[str, dict[str, dict[str, float]]] = { "performance_baseline": {}, "quality_baseline": {}, "reliability_baseline": {},