Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 100 additions & 83 deletions src/agentunit/adapters/agentops_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

logger = logging.getLogger(__name__)

_LANGSMITH_UNSUPPORTED = "LangSmith features are not supported by AgentOpsAdapter"


class AgentOpsAdapter(MultiAgentAdapter, ProductionIntegration):
"""
Expand Down Expand Up @@ -67,21 +69,9 @@ def __init__(
self.project_id = project_id
self.default_tags = default_tags or []
self.auto_start_session = auto_start_session
self.enable_tracing: bool = kwargs.get("enable_tracing", True)
self.client: Any = None
self.platform = MonitoringPlatform.AGENTOPS
"""
Initialize LangSmith adapter.

Args:
api_key: LangSmith API key
project_name: Project name for organizing traces
endpoint: Optional custom LangSmith endpoint
enable_tracing: Whether to enable automatic tracing
enable_feedback: Whether to collect feedback data
"""
self.api_key = api_key
self.project_id = project_id
self.default_tags = default_tags or []
self.auto_start_session = auto_start_session

# Initialize AgentOps client
self._initialize_agentops()
Expand All @@ -98,21 +88,18 @@ def _initialize_agentops(self):
"""Initialize AgentOps client and verify connection."""
try:
# Import AgentOps SDK
import agentops

self.agentops = agentops
import agentops # type: ignore[import-not-found]

# Initialize AgentOps
if self.api_key:
agentops.init(
api_key=self.api_key,
default_tags=self.default_tags,
auto_start_session=self.auto_start_session,
)
else:
agentops.init(
default_tags=self.default_tags, auto_start_session=self.auto_start_session
)
session = agentops.init(
api_key=self.api_key,
default_tags=self.default_tags,
auto_start_session=self.auto_start_session,
)

self.agentops = agentops
self.session = session
self.client = session

logger.info("Successfully connected to AgentOps")

Expand All @@ -124,11 +111,6 @@ def _initialize_agentops(self):
logger.error(f"Failed to connect to AgentOps: {e}")
raise

@property
def platform(self) -> MonitoringPlatform:
"""Return the monitoring platform type."""
return MonitoringPlatform.AGENTOPS

def create_agent(self, role: AgentRole, agent_id: str | None = None, **kwargs) -> AgentMetadata:
"""
Create an agent for AgentOps monitoring.
Expand Down Expand Up @@ -244,7 +226,7 @@ def send_message(
interaction = AgentInteraction(
interaction_id=interaction_id,
from_agent=from_agent,
to_agent=to_agent,
to_agent=to_agent or "broadcast",
content=message,
timestamp=timestamp,
metadata=metadata or {},
Expand Down Expand Up @@ -339,12 +321,15 @@ def calculate_coordination_metrics(self) -> dict[str, float]:
for interaction in self.session_interactions:
unique_agents.add(interaction.from_agent)
if interaction.to_agent:
unique_agents.add(interaction.to_agent)
if isinstance(interaction.to_agent, list):
unique_agents.update(interaction.to_agent)
else:
unique_agents.add(interaction.to_agent)

agent_participation = len(unique_agents)

# Calculate message distribution
agent_counts = {}
agent_counts: dict[str, int] = {}
for interaction in self.session_interactions:
from_agent = interaction.from_agent
agent_counts[from_agent] = agent_counts.get(from_agent, 0) + 1
Expand Down Expand Up @@ -374,34 +359,38 @@ def calculate_coordination_metrics(self) -> dict[str, float]:
else 0.0,
}

def run_scenario(self, scenario: Scenario) -> ScenarioResult:
async def run_scenario(self, scenario: Scenario) -> ScenarioResult:
"""
Run a scenario with LangSmith integration.
Run a scenario with AgentOps integration.

Args:
scenario: Scenario to execute

Returns:
ScenarioResult: Execution results with LangSmith trace data
ScenarioResult: Execution results with AgentOps trace data
"""
logger.info(f"Running scenario with LangSmith: {scenario.name}")
if self.enable_tracing:
logger.warning(
"run_scenario tracing uses LangSmith-style APIs which are not fully supported by AgentOps"
)

logger.info(f"Running scenario with AgentOps: {scenario.name}")

# Start LangSmith run for the scenario
scenario_run_id = None
# Start AgentOps run for the scenario
agentops_trace = None
scenario_run_id: str | None = None
if self.enable_tracing:
try:
run = self.client.create_run(
name=f"Scenario: {scenario.name}",
run_type="chain",
project_name=self.project_name,
inputs={"scenario": scenario.name, "description": scenario.description},
tags=["agentunit", "scenario"],
agentops_trace = self.agentops.start_trace(
trace_name=f"Scenario: {scenario.name}",
tags=[*self.default_tags, "agentunit", "scenario"],
)
scenario_run_id = str(run.id)
scenario_run_id = str(agentops_trace)
except Exception as e:
logger.warning(f"Failed to create scenario run: {e}")
logger.warning(f"Failed to create scenario trace: {e}")

# Execute scenario (this would typically involve running the actual test)
session_summary: dict[str, Any] = {}
start_time = time.time()

try:
Expand Down Expand Up @@ -459,9 +448,11 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult:
# Create trace log
from agentunit.core.trace import TraceLog

trace = TraceLog()
trace.record(
"scenario_complete", run_id=scenario_run_id, session_summary=session_summary
trace_log = TraceLog()
trace_log.record(
"scenario_complete",
run_id=scenario_run_id or "unknown",
session_summary=session_summary,
)

# Create scenario run
Expand All @@ -473,27 +464,32 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult:
success=True,
metrics=session_summary.get("metrics", {}),
duration_ms=execution_time * 1000,
trace=trace,
trace=trace_log,
)

# Create result
result = ScenarioResult(name=scenario.name)
result.add_run(scenario_run)

# Update LangSmith run with results
if scenario_run_id and self.enable_tracing:
# Update AgentOps run with results
trace_metadata = {
"scenario_name": scenario.name,
"metrics": session_summary.get("metrics", {}),
"success": True,
}

if trace_log and self.enable_tracing:
try:
self.client.update_run(
run_id=scenario_run_id,
outputs={
"result": result.passed,
"execution_time": execution_time,
"details": result.details,
},
end_time=datetime.now(timezone.utc),
self.agentops.update_trace_metadata(
trace_metadata,
prefix="trace.metadata",
)
self.agentops.end_trace(
trace=agentops_trace,
end_state="success",
)
except Exception as e:
logger.warning(f"Failed to update scenario run: {e}")
logger.warning(f"Failed to finalize scenario trace: {e}")

logger.info(f"Scenario completed: {scenario.name}")
return result
Expand All @@ -504,8 +500,8 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult:
# Create trace log
from agentunit.core.trace import TraceLog

trace = TraceLog()
trace.record("scenario_error", error=str(e), run_id=scenario_run_id)
trace_log = TraceLog()
trace_log.record("scenario_error", error=str(e), run_id=scenario_run_id or "unknown")

# Create scenario run
from agentunit.reporting.results import ScenarioRun
Expand All @@ -516,30 +512,40 @@ def run_scenario(self, scenario: Scenario) -> ScenarioResult:
success=False,
metrics={},
duration_ms=(time.time() - start_time) * 1000,
trace=trace,
trace=trace_log,
error=str(e),
)

# Create result
result = ScenarioResult(name=scenario.name)
result.add_run(scenario_run)

# Update LangSmith run with error
if scenario_run_id and self.enable_tracing:
# Update AgentOps run with error
trace_metadata = {
"scenario_name": scenario.name,
"metrics": session_summary.get("metrics", {}),
"success": False,
"error": str(e),
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if trace_log and self.enable_tracing:
try:
self.client.update_run(
run_id=scenario_run_id,
outputs={"error": str(e)},
end_time=datetime.now(timezone.utc),
self.agentops.update_trace_metadata(
trace_metadata,
prefix="trace.metadata",
)
self.agentops.end_trace(
trace=agentops_trace,
end_state="failed",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except Exception as e:
logger.warning(f"Failed to update failed scenario run: {e}")
logger.warning(f"Failed to finalize failed scenario trace: {e}")

return result

def collect_metrics(self, scenario: Any, result: Any, **kwargs) -> ProductionMetrics:
"""
Collect production metrics from LangSmith.
Collect production metrics from AgentOps.

Args:
scenario: The scenario being evaluated
Expand All @@ -549,9 +555,12 @@ def collect_metrics(self, scenario: Any, result: Any, **kwargs) -> ProductionMet
Returns:
ProductionMetrics: Current production metrics
"""

raise NotImplementedError(_LANGSMITH_UNSUPPORTED)

try:
# Query recent runs from LangSmith
runs = list(self.client.list_runs(project_name=self.project_name, limit=100))
# Query recent runs from AgentOps
runs = list(self.client.list_runs(project_name=self.project_id, limit=100))

if not runs:
return ProductionMetrics(
Expand Down Expand Up @@ -626,6 +635,8 @@ def establish_baseline(
Returns:
BaselineMetrics: Calculated baseline metrics
"""
raise NotImplementedError(_LANGSMITH_UNSUPPORTED)

days = kwargs.get("days", 7)
try:
from datetime import timedelta
Expand All @@ -637,7 +648,7 @@ def establish_baseline(
# Query historical runs
runs = list(
self.client.list_runs(
project_name=self.project_name, start_time=start_date, end_time=end_date
project_name=self.project_id, start_time=start_date, end_time=end_date
)
)

Expand Down Expand Up @@ -680,7 +691,7 @@ def establish_baseline(
)

def _extract_run_metrics(self, runs):
"""Extract metrics from LangSmith runs."""
"""Extract metrics from AgentOps runs."""
durations = []
token_counts = []
success_count = 0
Expand Down Expand Up @@ -717,7 +728,7 @@ def create_evaluation_dataset(
self, name: str, examples: list[dict[str, Any]], description: str | None = None
) -> str:
"""
Create an evaluation dataset in LangSmith.
Create an evaluation dataset in AgentOps.

Args:
name: Dataset name
Expand All @@ -727,6 +738,9 @@ def create_evaluation_dataset(
Returns:
str: Created dataset ID
"""

raise NotImplementedError(_LANGSMITH_UNSUPPORTED)

try:
dataset = self.client.create_dataset(
dataset_name=name,
Expand All @@ -749,9 +763,9 @@ def create_evaluation_dataset(
logger.error(f"Failed to create LangSmith dataset: {e}")
raise

def run_evaluation(self, dataset_id: str, evaluator_function: Any, **kwargs) -> dict[str, Any]:
def run_evaluation(self, dataset_id: str, evaluator_function: Any, **kwargs) -> Any:
"""
Run evaluation on a LangSmith dataset.
Run evaluation on a AgentOps dataset.

Args:
dataset_id: Dataset ID to evaluate
Expand All @@ -761,13 +775,16 @@ def run_evaluation(self, dataset_id: str, evaluator_function: Any, **kwargs) ->
Returns:
Dict[str, Any]: Evaluation results
"""

raise NotImplementedError(_LANGSMITH_UNSUPPORTED)

try:
from langsmith.evaluation import evaluate

results = evaluate(
evaluator_function,
data=dataset_id,
project_name=f"{self.project_name}-evaluation",
experiment_prefix=f"{self.project_id}-evaluation",
**kwargs,
)

Expand Down
Loading