Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 168 additions & 134 deletions benchmarks/utils/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, List, Optional, Tuple
from typing import Any, Callable, List, Optional, Tuple
from uuid import UUID

from lmnr import Laminar
Expand Down Expand Up @@ -369,19 +369,20 @@ def attempt_on_result(instance: EvalInstance, out: EvalOutput) -> None:
pending_instances: dict[Future, PendingInstance] = {}
try:
for index, inst in enumerate(instances_to_process):
datapoint_id, lmnr_span_ctx = (
LaminarService.get().create_evaluation_datapoint(
self.metadata.lmnr.eval_id,
inst.id,
self.metadata.model_dump(mode="json"),
index,
session_id=self._laminar_session_id,
trace_metadata=self._laminar_trace_meta,
)
datapoint_id = LaminarService.get().create_evaluation_datapoint(
self.metadata.lmnr.eval_id,
inst.id,
self.metadata.model_dump(mode="json"),
index,
)

fut = pool.submit(
self._process_one_mp, inst, lmnr_span_ctx, attempt
self._process_one_mp,
inst,
attempt,
lmnr_session_id=self._laminar_session_id,
lmnr_trace_metadata=self._laminar_trace_meta,
lmnr_datapoint_id=datapoint_id,
)
futures.append(fut)
pending_instances[fut] = PendingInstance(
Expand Down Expand Up @@ -557,7 +558,12 @@ def _calculate_resource_factor(self, runtime_failure_count: int) -> int:

# --- Worker-side method (executed in child processes) ---------------------------
def _process_one_mp(
self, instance: EvalInstance, eval_span_ctx: str | None, critic_attempt: int
self,
instance: EvalInstance,
critic_attempt: int,
lmnr_session_id: str | None = None,
lmnr_trace_metadata: dict[str, Any] | None = None,
lmnr_datapoint_id: UUID | None = None,
) -> Tuple[EvalInstance, EvalOutput]:
"""Execute one instance in a child process with retry logic.

Expand All @@ -578,148 +584,176 @@ def _process_one_mp(
with redirect_stdout_stderr(log_file):
logger.info("[child] start id=%s", instance.id)

# Create root "Evaluation" span in the child so the timeline
# reflects actual execution start, then update the datapoint
# with the span's trace_id.
eval_span = Laminar.start_active_span(
"Evaluation",
span_type="EVALUATION", # type: ignore
session_id=lmnr_session_id,
metadata=lmnr_trace_metadata,
)
eval_span_ctx = Laminar.get_laminar_span_context(eval_span)

if lmnr_datapoint_id is not None and self.metadata.lmnr is not None:
trace_id = UUID(int=eval_span.get_span_context().trace_id)
LaminarService.get().initialize()
LaminarService.get().update_datapoint_trace_id(
eval_id=self.metadata.lmnr.eval_id,
datapoint_id=lmnr_datapoint_id,
trace_id=trace_id,
)

retry_count = 0
runtime_failure_count = 0
last_error = None
max_retries = self.metadata.max_retries
runtime_runs: list[RemoteRuntimeAllocation] = []

while retry_count <= max_retries:
workspace = None

# Start Laminar execution span and inject context into os.environ so workspace can pick it up
# Escape the serialized context to safely pass as a cli argument
lmnr_span = Laminar.start_active_span(
"Execution",
span_type="EXECUTOR", # type: ignore
parent_span_context=Laminar.deserialize_span_context(eval_span_ctx)
if eval_span_ctx
else None,
)
exec_span_ctx = json.dumps(Laminar.serialize_span_context(lmnr_span))
os.environ["LMNR_SPAN_CONTEXT"] = exec_span_ctx or ""

try:
# Calculate resource factor based on runtime failures
resource_factor = self._calculate_resource_factor(
runtime_failure_count
try:
while retry_count <= max_retries:
workspace = None

# Start Laminar execution span and inject context into os.environ so workspace can pick it up
# Escape the serialized context to safely pass as a cli argument
exec_span = Laminar.start_active_span(
"Execution",
span_type="EXECUTOR", # type: ignore
parent_span_context=eval_span_ctx,
)
if runtime_failure_count > 0:
logger.warning(
f"[child] Instance {instance.id}: "
f"attempt {retry_count + 1}/{max_retries + 1}, "
f"runtime_failure_count={runtime_failure_count}, "
f"resource_factor={resource_factor}"
)

workspace = self.prepare_workspace(
instance,
resource_factor=resource_factor,
forward_env=LMNR_ENV_VARS,
exec_span_ctx = json.dumps(
Laminar.serialize_span_context(exec_span)
)
os.environ["LMNR_SPAN_CONTEXT"] = exec_span_ctx or ""

# Record runtime/pod mapping only for remote runtimes
if isinstance(workspace, APIRemoteWorkspace):
retry_number = retry_count + 1 # 1-indexed for readability
runtime_run = RemoteRuntimeAllocation(
runtime_id=getattr(workspace, "_runtime_id", None),
session_id=getattr(workspace, "session_id", None),
runtime_url=getattr(workspace, "_runtime_url", None),
resource_factor=resource_factor,
critic_attempt=critic_attempt,
retry=retry_number,
started_at=datetime.now(timezone.utc),
)
runtime_runs.append(runtime_run)
logger.info(
"[child] runtime allocated instance=%s attempt=%d retry=%d workspace=%s runtime_id=%s session_id=%s resource_factor=%s",
instance.id,
critic_attempt,
retry_number,
workspace.__class__.__name__,
runtime_run.runtime_id,
runtime_run.session_id,
runtime_run.resource_factor,
try:
# Calculate resource factor based on runtime failures
resource_factor = self._calculate_resource_factor(
runtime_failure_count
)
out = self.evaluate_instance(instance, workspace)
if runtime_runs:
out.runtime_runs = runtime_runs
logger.info("[child] done id=%s", instance.id)
return instance, out
except Exception as e:
last_error = e
retry_count += 1
lmnr_span.record_exception(e)
if runtime_failure_count > 0:
logger.warning(
f"[child] Instance {instance.id}: "
f"attempt {retry_count + 1}/{max_retries + 1}, "
f"runtime_failure_count={runtime_failure_count}, "
f"resource_factor={resource_factor}"
)

# Log structured runtime allocation/init failures so we can trace instance -> runtime/pod
runtime_id = (
getattr(workspace, "_runtime_id", None) if workspace else None
)
session_id = (
getattr(workspace, "session_id", None) if workspace else None
)
if isinstance(workspace, APIRemoteWorkspace) or (
"Runtime not yet ready" in str(e)
):
logger.warning(
"[child] runtime init failure instance=%s attempt=%d retry=%d runtime_id=%s session_id=%s error=%s",
instance.id,
critic_attempt,
retry_count,
runtime_id,
session_id,
str(e),
workspace = self.prepare_workspace(
instance,
resource_factor=resource_factor,
forward_env=LMNR_ENV_VARS,
)

# TODO(#277): add an exception classifier to decide when to bump resources
runtime_failure_count += 1
logger.warning(
f"[child] Instance {instance.id}: runtime_failure_count="
f"{runtime_failure_count}"
)

if retry_count <= max_retries:
logger.warning(
f"[child] Instance {instance.id} failed "
f"(attempt {retry_count}/{max_retries}): "
f"{str(e)}"
)
else:
logger.error(
f"[child] Instance {instance.id} failed after "
f"{max_retries} retries. Last error: {str(e)}",
exc_info=True,
# Record runtime/pod mapping only for remote runtimes
if isinstance(workspace, APIRemoteWorkspace):
retry_number = retry_count + 1 # 1-indexed for readability
runtime_run = RemoteRuntimeAllocation(
runtime_id=getattr(workspace, "_runtime_id", None),
session_id=getattr(workspace, "session_id", None),
runtime_url=getattr(workspace, "_runtime_url", None),
resource_factor=resource_factor,
critic_attempt=critic_attempt,
retry=retry_number,
started_at=datetime.now(timezone.utc),
)
runtime_runs.append(runtime_run)
logger.info(
"[child] runtime allocated instance=%s attempt=%d retry=%d workspace=%s runtime_id=%s session_id=%s resource_factor=%s",
instance.id,
critic_attempt,
retry_number,
workspace.__class__.__name__,
runtime_run.runtime_id,
runtime_run.session_id,
runtime_run.resource_factor,
)
out = self.evaluate_instance(instance, workspace)
if runtime_runs:
out.runtime_runs = runtime_runs
logger.info("[child] done id=%s", instance.id)
return instance, out
except Exception as e:
last_error = e
retry_count += 1
exec_span.record_exception(e)

# Log structured runtime allocation/init failures so we can trace instance -> runtime/pod
runtime_id = (
getattr(workspace, "_runtime_id", None)
if workspace
else None
)
# Create error output for final failure
error_output = self._create_error_output(
instance, last_error, max_retries
session_id = (
getattr(workspace, "session_id", None)
if workspace
else None
)
if runtime_runs:
error_output.runtime_runs = runtime_runs
return instance, error_output
finally:
# Ensure workspace cleanup happens regardless of success or failure
if workspace is not None:
try:
self._capture_conversation_archive(workspace, instance)
except Exception as archive_error:
if isinstance(workspace, APIRemoteWorkspace) or (
"Runtime not yet ready" in str(e)
):
logger.warning(
"[child] Failed to capture conversation archive for %s: %s",
"[child] runtime init failure instance=%s attempt=%d retry=%d runtime_id=%s session_id=%s error=%s",
instance.id,
archive_error,
)
try:
# Use the context manager protocol for cleanup
workspace.__exit__(None, None, None)
logger.debug(
"[child] cleaned up workspace for id=%s", instance.id
critic_attempt,
retry_count,
runtime_id,
session_id,
str(e),
)
except Exception as cleanup_error:

# TODO(#277): add an exception classifier to decide when to bump resources
runtime_failure_count += 1
logger.warning(
f"[child] Instance {instance.id}: runtime_failure_count="
f"{runtime_failure_count}"
)

if retry_count <= max_retries:
logger.warning(
f"[child] Failed to cleanup workspace for {instance.id}: "
f"{str(cleanup_error)[:50]}"
f"[child] Instance {instance.id} failed "
f"(attempt {retry_count}/{max_retries}): "
f"{str(e)}"
)
else:
logger.error(
f"[child] Instance {instance.id} failed after "
f"{max_retries} retries. Last error: {str(e)}",
exc_info=True,
)
# Create error output for final failure
error_output = self._create_error_output(
instance, last_error, max_retries
)
lmnr_span.end()
if runtime_runs:
error_output.runtime_runs = runtime_runs
return instance, error_output
finally:
# Ensure workspace cleanup happens regardless of success or failure
if workspace is not None:
try:
self._capture_conversation_archive(workspace, instance)
except Exception as archive_error:
logger.warning(
"[child] Failed to capture conversation archive for %s: %s",
instance.id,
archive_error,
)
try:
# Use the context manager protocol for cleanup
workspace.__exit__(None, None, None)
logger.debug(
"[child] cleaned up workspace for id=%s",
instance.id,
)
except Exception as cleanup_error:
logger.warning(
f"[child] Failed to cleanup workspace for {instance.id}: "
f"{str(cleanup_error)[:50]}"
)
exec_span.end()
finally:
eval_span.end()

# This should never be reached, but added for type safety
error_output = self._create_error_output(
Expand Down
Loading