feat: traces v0#11689
Conversation
v0 for traces includes: - filters: status, token usage range and datatime - accordian rows per trace Could add: - more filter options. Ecamples: session_id, trace_id and latency range
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughA comprehensive tracing system is introduced with backend API endpoints for querying execution traces, SQLModel database tables, a NativeTracer service capturing component and LangChain operations, and frontend UI components for trace visualization and filtering. This includes async database write operations, hierarchical span tree rendering, filtering/pagination, and removal of the deprecated logs system. Changes
Sequence Diagram(s)sequenceDiagram
participant Component as Langflow Component
participant NativeTracer as NativeTracer
participant Callback as NativeCallbackHandler
participant LangChain as LangChain Operation
participant DB as Database Session
Component->>NativeTracer: add_trace(trace_id, name, inputs)
NativeTracer->>NativeTracer: Store in-memory span
Component->>NativeTracer: get_langchain_callback()
NativeTracer->>Callback: Return callback handler
LangChain->>Callback: on_llm_start(prompts, run_id)
Callback->>NativeTracer: add_langchain_span(span_id, name, inputs, tokens)
LangChain->>Callback: on_llm_end(response, tokens)
Callback->>NativeTracer: end_langchain_span(span_id, outputs, latency, tokens)
Component->>NativeTracer: end_trace(trace_id, outputs, error)
NativeTracer->>NativeTracer: Move to completed, compute status
NativeTracer->>DB: _flush_to_database() [async]
DB->>DB: Create TraceTable & SpanTable records
DB->>DB: Aggregate tokens, set trace status
NativeTracer->>NativeTracer: Clear completed spans
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 3❌ Failed checks (3 warnings)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
✅ Migration Validation Passed All migrations follow the Expand-Contract pattern correctly. |
| if run_id not in self._spans: | ||
| self._spans[run_id] = {"span_id": uuid4(), "start_time": datetime.now(timezone.utc)} | ||
| return self._spans[run_id]["span_id"] |
There was a problem hiding this comment.
⚡️Codeflash found 33% (0.33x) speedup for NativeCallbackHandler._get_span_id in src/backend/base/langflow/services/tracing/native_callback.py
⏱️ Runtime : 918 microseconds → 692 microseconds (best of 203 runs)
📝 Explanation and details
The optimization replaces the if run_id not in self._spans membership check with a try/except pattern, achieving a 32% speedup by eliminating redundant dictionary lookups.
Key Changes:
- Original approach: Checks
if run_id not in self._spans(1st lookup), then accessesself._spans[run_id]["span_id"](2nd lookup) on every call - Optimized approach: Directly attempts
return self._spans[run_id]["span_id"]and only handles theKeyErrorexception when the span doesn't exist
Why This Is Faster:
In Python, dictionary lookups are expensive operations. The line profiler shows that in the original code, the membership check (not in) consumed 45.6% of total time (6.26ms), followed by the dictionary access taking another 50.4% (6.91ms). This means two dictionary lookups per call when the span exists.
The optimized version uses Python's "Easier to Ask for Forgiveness than Permission" (EAFP) pattern. For the common case where run_id already exists (5,215 out of 5,240 calls = 99.5% hit rate), it performs only one dictionary lookup (73.9% of time, 6.77ms). The exception handling overhead for the 25 cache misses is negligible (0.2% time for the KeyError catch).
Performance Characteristics:
- Best for high-hit-rate scenarios: The test results show this optimization excels when
run_ids are reused frequently (e.g.,test_repeated_calls_many_times_for_single_run_id_are_stablewith 1000 calls,test_large_scale_unique_span_ids_and_idempotencewith repeated lookups) - Minimal overhead for new entries: Creating new spans (25 calls) adds only ~16μs exception overhead, far less than the savings from eliminating redundant lookups
Impact on Workloads:
This is a callback handler for tracing LangChain operations (LLM calls, tool executions, chain runs). These operations typically generate repeated callbacks for the same run_id throughout their lifecycle, making the high hit rate assumption valid and this optimization highly beneficial for production tracing scenarios.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 1021 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
🌀 Click to see Generated Regression Tests
from datetime import datetime, timezone
from typing import Any
from uuid import UUID, uuid4
# imports
import pytest # used for our unit tests
from langflow.services.tracing.native_callback import NativeCallbackHandler
# function to test
# (The implementation is in langflow.services.tracing.native_callback.NativeCallbackHandler._get_span_id)
def _make_handler(tracer: Any = object()) -> NativeCallbackHandler:
"""Helper to create a NativeCallbackHandler with a trivial tracer.
We pass a plain object as the tracer because the handler only stores it;
the handler does not require any concrete tracer behavior for _get_span_id.
"""
return NativeCallbackHandler(tracer=tracer)
def test_basic_new_span_creation():
# Create handler with a simple tracer placeholder
handler = _make_handler()
# Generate a new run_id (UUID)
run_id = uuid4()
# Call _get_span_id to create a new span
codeflash_output = handler._get_span_id(run_id); span_id = codeflash_output
entry = handler._spans[run_id]
# The start_time should be very recent (within 5 seconds)
now = datetime.now(timezone.utc)
delta_seconds = (now - entry["start_time"]).total_seconds()
def test_same_run_id_returns_same_span_id_on_repeated_calls():
handler = _make_handler()
run_id = uuid4()
# First call creates a span_id
codeflash_output = handler._get_span_id(run_id); first_span = codeflash_output
# Second call for the same run_id should return the exact same UUID
codeflash_output = handler._get_span_id(run_id); second_span = codeflash_output
def test_different_run_ids_get_different_span_ids():
handler = _make_handler()
run_id_a = uuid4()
run_id_b = uuid4()
codeflash_output = handler._get_span_id(run_id_a); span_a = codeflash_output
codeflash_output = handler._get_span_id(run_id_b); span_b = codeflash_output
def test_equal_uuid_objects_as_keys_share_the_same_entry():
handler = _make_handler()
# Create two distinct UUID objects with the same value
original = uuid4()
same_value = UUID(str(original)) # creates a new UUID instance equal to original
# Calling with the first stores an entry
codeflash_output = handler._get_span_id(original); span_first = codeflash_output
# Calling with the second (equal) should return the same stored span_id
codeflash_output = handler._get_span_id(same_value); span_second = codeflash_output
@pytest.mark.parametrize("key", [None, "string_key", 12345])
def test_non_uuid_key_types_are_handled_and_stored(key):
# Although the type hint expects a UUID, the implementation uses the run_id as a dict key
handler = _make_handler()
# Call _get_span_id with a non-UUID key (None, str, int)
codeflash_output = handler._get_span_id(key); span = codeflash_output # type: ignore[arg-type]
def test_mutating_internal_entry_is_respected_by_get_span_id():
handler = _make_handler()
run_id = uuid4()
# Manually create an entry with a known span id and start_time
forced_span = uuid4()
forced_start = datetime(2000, 1, 1, tzinfo=timezone.utc)
handler._spans[run_id] = {"span_id": forced_span, "start_time": forced_start}
# _get_span_id should return the forced span_id and should not overwrite start_time
codeflash_output = handler._get_span_id(run_id); returned = codeflash_output
def test_large_scale_unique_span_ids_and_idempotence():
handler = _make_handler()
# Generate 1000 unique run IDs
run_ids = [uuid4() for _ in range(1000)]
# Collect span ids for each run id on first pass
span_ids_first = [handler._get_span_id(rid) for rid in run_ids]
# Call again for each run id and ensure ids are idempotent (unchanged)
span_ids_second = [handler._get_span_id(rid) for rid in run_ids]
def test_repeated_calls_many_times_for_single_run_id_are_stable():
handler = _make_handler()
run_id = uuid4()
# Call _get_span_id 1000 times and ensure the same UUID is returned each time
codeflash_output = handler._get_span_id(run_id); first = codeflash_output
for _ in range(999):
codeflash_output = handler._get_span_id(run_id)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from datetime import datetime, timezone
from unittest.mock import MagicMock, Mock
from uuid import UUID, uuid4
# imports
import pytest
from langflow.services.tracing.native import NativeTracer
from langflow.services.tracing.native_callback import NativeCallbackHandlerTo test or edit this optimization locally git merge codeflash/optimize-pr11689-2026-02-09T23.28.49
| if run_id not in self._spans: | |
| self._spans[run_id] = {"span_id": uuid4(), "start_time": datetime.now(timezone.utc)} | |
| return self._spans[run_id]["span_id"] | |
| try: | |
| return self._spans[run_id]["span_id"] | |
| except KeyError: | |
| span_id = uuid4() | |
| self._spans[run_id] = {"span_id": span_id, "start_time": datetime.now(timezone.utc)} | |
| return span_id |
|
Looking good, @Adam-Aghili. @rodrigosnader already has the UI elements implemented in a branch, you should probably use them on this first implementation. |
add sidebar buttons for logs and trace remove lods canvas control
hopefully fix duplicate trace ID insertion on windows
|
@archit-trainee I pushed up a hopeful fix. I dont have a windows machine to test with please take a look when you have a chance. |
update tests and alembic tables for uts
alembic + comment clean up
| chat_input = next((r for r in records if _CHAT_INPUT_SPAN_NAME in (r.name or "")), None) | ||
| input_value = None | ||
| if chat_input and chat_input.inputs: | ||
| input_value = chat_input.inputs.get("input_value") | ||
|
|
||
| root_records = [r for r in records if r.parent_span_id is None and r.end_time] | ||
| output_value = None | ||
| if root_records: | ||
| root_records_sorted = sorted( | ||
| root_records, | ||
| key=lambda r: r.end_time or _UTC_MIN, | ||
| reverse=True, | ||
| ) | ||
| if root_records_sorted[0].outputs: | ||
| output_value = root_records_sorted[0].outputs |
There was a problem hiding this comment.
⚡️Codeflash found 29% (0.29x) speedup for _extract_trace_io in src/backend/base/langflow/services/tracing/formatting.py
⏱️ Runtime : 788 microseconds → 612 microseconds (best of 146 runs)
📝 Explanation and details
The optimized code achieves a 28% speedup by replacing multiple O(n) list comprehensions and an O(n log n) sort operation with a single O(n) pass through the records list.
Key optimizations:
-
Single-pass algorithm: Instead of three separate iterations (finding chat input with
next(), buildingroot_recordslist, and sorting), the optimized version uses one loop that tracks both the chat input and the best root record simultaneously. -
Eliminated expensive sorting: The original code collected all finished root records into a list and sorted them (O(n log n)), even when only the maximum was needed. The optimized version maintains just the best root record found so far through simple comparisons (O(n)).
-
Reduced memory allocations: The original code created intermediate data structures (
root_recordslist androot_records_sortedlist). The optimized version only stores individual references (best_root,best_end_time), avoiding list construction overhead.
Why this matters:
Looking at the line profiler results:
- Original: 65.5% of time spent in the
sorted()call (1.96 ms) - Original: 15.3% building the
root_recordslist comprehension (458 μs) - Optimized: Single loop handles both tasks in ~5.6 ms total across all checks
The optimization is particularly effective for workloads with:
- Many root records (tests with 1000 root records benefit significantly)
- Large record lists where multiple passes are expensive
- Moderate numbers of finished roots where sorting overhead dominates
The annotated tests show consistent speedups across all scenarios, especially in large-scale tests (test_large_scale_records_performance_and_correctness, test_1000_records_all_variations) where the single-pass approach shines. The optimization maintains correctness for all edge cases including empty lists, missing values, and complex nested structures.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 44 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
🌀 Click to see Generated Regression Tests
from datetime import datetime # used to create deterministic timestamps
from datetime import timedelta, timezone
from typing import List # typing for readability in tests
# imports
import pytest # used for our unit tests
# Import the real function and the real record class from the module under test.
# Tests must construct real instances of the record type that the function expects.
from langflow.services.tracing.formatting import (_CHAT_INPUT_SPAN_NAME,
_extract_trace_io,
_SpanIORecord)
def make_dt(seconds: int) -> datetime:
"""Helper to create timezone-aware UTC datetimes deterministically."""
return datetime.fromtimestamp(seconds, tz=timezone.utc)
def test_basic_input_and_output_extraction():
# Build a chat input record that contains the user-facing input_value.
chat = _SpanIORecord(
# name must contain the CHAT_INPUT_SPAN_NAME substring
name=f"prefix {_CHAT_INPUT_SPAN_NAME} suffix",
inputs={"input_value": "hello world"}, # the input the heuristic should extract
outputs=None,
parent_span_id="some_parent", # not a root record
end_time=None,
)
# Build a root record that finished and contains outputs; should be chosen as trace output.
root = _SpanIORecord(
name="root-span",
inputs=None,
outputs={"result": 42},
parent_span_id=None, # root
end_time=make_dt(1000), # finished
)
# Call the function under test with the two records in a list.
codeflash_output = _extract_trace_io([chat, root]); result = codeflash_output
def test_no_input_when_inputs_missing_or_no_input_value():
# Chat input record with inputs set to None -> no input extracted.
chat_none_inputs = _SpanIORecord(
name=_CHAT_INPUT_SPAN_NAME,
inputs=None,
outputs=None,
parent_span_id=None,
end_time=None,
)
# Chat input record with inputs but no 'input_value' key -> no input extracted.
chat_empty_inputs = _SpanIORecord(
name=_CHAT_INPUT_SPAN_NAME,
inputs={}, # empty dict
outputs=None,
parent_span_id=None,
end_time=None,
)
def test_select_latest_finished_root_by_end_time():
# Create two finished root records with different end_time values.
earlier_root = _SpanIORecord(
name="root-earlier",
inputs=None,
outputs={"value": "earlier"},
parent_span_id=None,
end_time=make_dt(1000),
)
later_root = _SpanIORecord(
name="root-later",
inputs=None,
outputs={"value": "later"},
parent_span_id=None,
end_time=make_dt(2000),
)
# Also create an unfinished root which should be ignored.
unfinished_root = _SpanIORecord(
name="root-unfinished",
inputs=None,
outputs={"value": "unfinished"},
parent_span_id=None,
end_time=None, # unfinished -> should be excluded by heuristic
)
# When passed together, the later_root's outputs should be chosen.
codeflash_output = _extract_trace_io([earlier_root, unfinished_root, later_root]); res = codeflash_output
def test_unfinished_roots_result_in_no_output():
# A single root record that hasn't finished should not count as output.
root_unfinished = _SpanIORecord(
name="root-unfinished",
inputs=None,
outputs={"value": "should-not-be-used"},
parent_span_id=None,
end_time=None, # unfinished
)
# With no finished root records, output should be None.
codeflash_output = _extract_trace_io([root_unfinished]); result = codeflash_output
def test_chat_input_selection_is_first_matching_record():
# Two records both contain the Chat Input substring but with different input_value.
first_chat = _SpanIORecord(
name=f"{_CHAT_INPUT_SPAN_NAME} first",
inputs={"input_value": "first"},
outputs=None,
parent_span_id="p",
end_time=None,
)
second_chat = _SpanIORecord(
name=f"{_CHAT_INPUT_SPAN_NAME} second",
inputs={"input_value": "second"},
outputs=None,
parent_span_id="p",
end_time=None,
)
# The heuristic picks the first occurrence in the records list.
codeflash_output = _extract_trace_io([first_chat, second_chat]); res = codeflash_output
def test_name_substring_matching_and_special_characters():
# Ensure that substring matching works even with additional characters and unicode-like content.
name_with_extras = f">>>--({_CHAT_INPUT_SPAN_NAME})--<<<"
chat = _SpanIORecord(
name=name_with_extras,
inputs={"input_value": "Ω≈ç√∫˜µ≤≥÷"}, # special characters should be preserved
outputs=None,
parent_span_id=None,
end_time=None,
)
codeflash_output = _extract_trace_io([chat]); result = codeflash_output
def test_non_root_records_are_ignored_for_output_selection():
# A finished non-root (has parent_span_id) should not be considered for output selection.
non_root_finished = _SpanIORecord(
name="child-finished",
inputs=None,
outputs={"child": True},
parent_span_id="parent1", # not None -> not a root
end_time=make_dt(1500),
)
# A finished root with earlier time.
root_finished = _SpanIORecord(
name="root-finished",
inputs=None,
outputs={"root": True},
parent_span_id=None,
end_time=make_dt(1000),
)
codeflash_output = _extract_trace_io([non_root_finished, root_finished]); res = codeflash_output
def test_large_scale_records_performance_and_correctness():
# Build a large list of records (1000) to test scalability and deterministic correctness.
records: List[_SpanIORecord] = []
# Add many non-root, finished and unfinished records to simulate a busy trace.
for i in range(950):
records.append(
_SpanIORecord(
name=f"span-{i}",
inputs=None,
outputs={"i": i},
parent_span_id=f"parent-{i}" if i % 2 == 0 else None, # some roots some not
end_time=make_dt(1000 + i) if i % 3 != 0 else None, # some finished, some not
)
)
# Ensure there are a few explicit root finished spans with increasing end_time so we can assert the latest is chosen.
explicit_root_1 = _SpanIORecord(
name="explicit-root-1",
inputs=None,
outputs={"root": "first"},
parent_span_id=None,
end_time=make_dt(2000),
)
explicit_root_2 = _SpanIORecord(
name="explicit-root-2",
inputs=None,
outputs={"root": "second"},
parent_span_id=None,
end_time=make_dt(3000),
)
# Put the chat input somewhere in the middle to confirm ordering doesn't affect selection beyond "first match" semantics.
chat = _SpanIORecord(
name=f"User { _CHAT_INPUT_SPAN_NAME } entry",
inputs={"input_value": "large-scale"},
outputs=None,
parent_span_id="p",
end_time=None,
)
# Assemble records with chat in a non-zero position and explicit roots at the end.
records.insert(500, chat)
records.append(explicit_root_1)
records.append(explicit_root_2)
# Run the function - it should pick the chat input value and the outputs from explicit_root_2 (latest end_time).
codeflash_output = _extract_trace_io(records); out = codeflash_output
def test_many_roots_with_mixed_end_times_edge_case():
# Create 1000 root records where half are unfinished and half finished; ensure the latest finished is picked.
roots: List[_SpanIORecord] = []
base = 10000
for i in range(1000):
roots.append(
_SpanIORecord(
name=f"root-{i}",
inputs=None,
outputs={"n": i},
parent_span_id=None, # all are roots
end_time=make_dt(base + i) if i % 2 == 0 else None, # even indices finished
)
)
# Add a chat input at the front.
chat = _SpanIORecord(
name=_CHAT_INPUT_SPAN_NAME,
inputs={"input_value": "edge-large"},
outputs=None,
parent_span_id=None,
end_time=None,
)
# Compose full record list.
records = [chat] + roots
codeflash_output = _extract_trace_io(records); res = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from datetime import datetime, timezone
from typing import Any, NamedTuple
# imports
import pytest
from langflow.services.tracing.formatting import _extract_trace_io
# Define the _SpanIORecord class based on the function's usage
class _SpanIORecord(NamedTuple):
"""Represents a normalized span I/O record."""
name: str | None
inputs: dict[str, Any] | None
outputs: dict[str, Any] | None
parent_span_id: str | None
end_time: datetime | None
# Constants from the module
_UTC_MIN = datetime.min.replace(tzinfo=timezone.utc)
def test_empty_records_list():
"""Test with an empty list of records."""
codeflash_output = _extract_trace_io([]); result = codeflash_output
def test_single_chat_input_record_with_input_value():
"""Test with a single Chat Input record that has an input_value."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": "Hello"},
outputs={"result": "world"},
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_single_root_record_with_output():
"""Test with a single root record that has output."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Root Span",
inputs=None,
outputs={"status": "success"},
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_and_root_records():
"""Test with both Chat Input and root records."""
dt1 = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
dt2 = datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc)
chat_record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": "test input"},
outputs=None,
parent_span_id=None,
end_time=dt1,
)
root_record = _SpanIORecord(
name="Root Span",
inputs=None,
outputs={"result": "test output"},
parent_span_id=None,
end_time=dt2,
)
codeflash_output = _extract_trace_io([chat_record, root_record]); result = codeflash_output
def test_multiple_root_records_selects_latest():
"""Test with multiple root records; latest end_time should be selected."""
dt1 = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
dt2 = datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc)
dt3 = datetime(2024, 1, 1, 12, 0, 3, tzinfo=timezone.utc)
record1 = _SpanIORecord(
name="Root 1",
inputs=None,
outputs={"id": 1},
parent_span_id=None,
end_time=dt1,
)
record2 = _SpanIORecord(
name="Root 2",
inputs=None,
outputs={"id": 2},
parent_span_id=None,
end_time=dt2,
)
record3 = _SpanIORecord(
name="Root 3",
inputs=None,
outputs={"id": 3},
parent_span_id=None,
end_time=dt3,
)
codeflash_output = _extract_trace_io([record1, record2, record3]); result = codeflash_output
def test_non_root_records_ignored():
"""Test that records with parent_span_id are ignored for output."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
child_record = _SpanIORecord(
name="Child Span",
inputs=None,
outputs={"child_output": "value"},
parent_span_id="parent-123",
end_time=dt,
)
codeflash_output = _extract_trace_io([child_record]); result = codeflash_output
def test_chat_input_with_empty_inputs():
"""Test Chat Input record with empty inputs dict."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs={},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_name_substring_match():
"""Test that Chat Input is found via substring match in name."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Prefix Chat Input Suffix",
inputs={"input_value": "found it"},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_root_record_without_end_time():
"""Test that root records without end_time are excluded."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
finished_record = _SpanIORecord(
name="Finished",
inputs=None,
outputs={"status": "done"},
parent_span_id=None,
end_time=dt,
)
unfinished_record = _SpanIORecord(
name="Unfinished",
inputs=None,
outputs={"status": "pending"},
parent_span_id=None,
end_time=None,
)
codeflash_output = _extract_trace_io([finished_record, unfinished_record]); result = codeflash_output
def test_chat_input_none_name():
"""Test Chat Input search when record name is None."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name=None,
inputs={"input_value": "value"},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_none_inputs():
"""Test Chat Input record with None inputs dict."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs=None,
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_root_record_with_none_outputs():
"""Test root record with None outputs dict."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Root",
inputs=None,
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_root_record_with_empty_outputs():
"""Test root record with empty outputs dict."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Root",
inputs=None,
outputs={},
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_input_value_is_none():
"""Test Chat Input where input_value key maps to None."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": None},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_input_value_is_zero():
"""Test Chat Input where input_value is 0 (falsy but valid)."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": 0},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_input_value_is_empty_string():
"""Test Chat Input where input_value is empty string (falsy)."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": ""},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_input_value_is_false():
"""Test Chat Input where input_value is False (falsy)."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": False},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_chat_input_input_value_missing_key():
"""Test Chat Input where inputs dict doesn't have input_value key."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Chat Input",
inputs={"other_key": "value"},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_multiple_chat_input_records_first_selected():
"""Test that when multiple Chat Input records exist, the first is selected."""
dt1 = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
dt2 = datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc)
record1 = _SpanIORecord(
name="Chat Input",
inputs={"input_value": "first"},
outputs=None,
parent_span_id=None,
end_time=dt1,
)
record2 = _SpanIORecord(
name="Chat Input",
inputs={"input_value": "second"},
outputs=None,
parent_span_id=None,
end_time=dt2,
)
codeflash_output = _extract_trace_io([record1, record2]); result = codeflash_output
def test_case_sensitive_chat_input_match():
"""Test that Chat Input matching is case-sensitive."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="chat input", # lowercase
inputs={"input_value": "value"},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_end_time_equals_utc_min():
"""Test with end_time equal to _UTC_MIN."""
record = _SpanIORecord(
name="Root",
inputs=None,
outputs={"result": "value"},
parent_span_id=None,
end_time=_UTC_MIN,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_mixed_records_only_root_with_end_time_for_output():
"""Test that only root records with end_time contribute to output."""
dt1 = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
dt2 = datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc)
# Root record without end_time
root_no_time = _SpanIORecord(
name="Root No Time",
inputs=None,
outputs={"id": 1},
parent_span_id=None,
end_time=None,
)
# Child record with end_time
child_with_time = _SpanIORecord(
name="Child",
inputs=None,
outputs={"id": 2},
parent_span_id="parent-123",
end_time=dt1,
)
# Root record with end_time
root_with_time = _SpanIORecord(
name="Root With Time",
inputs=None,
outputs={"id": 3},
parent_span_id=None,
end_time=dt2,
)
codeflash_output = _extract_trace_io([root_no_time, child_with_time, root_with_time]); result = codeflash_output
def test_input_value_with_complex_type():
"""Test Chat Input with complex input_value (dict, list, etc.)."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
complex_input = {"nested": {"key": "value"}, "items": [1, 2, 3]}
record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": complex_input},
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_output_with_complex_structure():
"""Test root record with complex output structure."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
complex_output = {
"nested": {"a": 1, "b": 2},
"items": [1, 2, 3],
"status": "success"
}
record = _SpanIORecord(
name="Root",
inputs=None,
outputs=complex_output,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_parent_span_id_empty_string():
"""Test that empty string parent_span_id is treated as truthy (not root)."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record = _SpanIORecord(
name="Span",
inputs=None,
outputs={"result": "value"},
parent_span_id="", # Empty string
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_identical_end_times_arbitrary_selection():
"""Test with multiple root records having identical end_time."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
record1 = _SpanIORecord(
name="Root 1",
inputs=None,
outputs={"id": 1},
parent_span_id=None,
end_time=dt,
)
record2 = _SpanIORecord(
name="Root 2",
inputs=None,
outputs={"id": 2},
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record1, record2]); result = codeflash_output
def test_large_number_of_non_root_records():
"""Test with 1000 non-root records (should be fast)."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
records = [
_SpanIORecord(
name=f"Child {i}",
inputs=None,
outputs={"id": i},
parent_span_id=f"parent-{i}",
end_time=dt,
)
for i in range(1000)
]
codeflash_output = _extract_trace_io(records); result = codeflash_output
def test_large_number_of_root_records():
"""Test with 1000 root records; should select the latest."""
base_dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
records = []
# Create 1000 root records with increasing end_times
for i in range(1000):
dt = datetime(
2024, 1, 1, 12, 0, i // 60, (i % 60), tzinfo=timezone.utc
)
records.append(
_SpanIORecord(
name=f"Root {i}",
inputs=None,
outputs={"id": i},
parent_span_id=None,
end_time=dt,
)
)
codeflash_output = _extract_trace_io(records); result = codeflash_output
def test_large_mixed_records_chat_input_search():
"""Test with 1000 mixed records to find Chat Input efficiently."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
records = []
# Add 500 non-Chat-Input records first
for i in range(500):
records.append(
_SpanIORecord(
name=f"Other {i}",
inputs=None,
outputs=None,
parent_span_id=f"parent-{i}",
end_time=dt,
)
)
# Add Chat Input record
chat_input_record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": "found at index 500"},
outputs=None,
parent_span_id=None,
end_time=dt,
)
records.append(chat_input_record)
# Add 499 more non-Chat-Input records
for i in range(500, 999):
records.append(
_SpanIORecord(
name=f"Other {i}",
inputs=None,
outputs=None,
parent_span_id=f"parent-{i}",
end_time=dt,
)
)
codeflash_output = _extract_trace_io(records); result = codeflash_output
def test_large_inputs_dict_with_many_keys():
"""Test Chat Input with large inputs dict."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
large_inputs = {"input_value": "target"} | {f"key_{i}": f"value_{i}" for i in range(1000)}
record = _SpanIORecord(
name="Chat Input",
inputs=large_inputs,
outputs=None,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_large_outputs_dict_with_many_keys():
"""Test root record with large outputs dict."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
large_outputs = {f"key_{i}": f"value_{i}" for i in range(1000)}
record = _SpanIORecord(
name="Root",
inputs=None,
outputs=large_outputs,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_many_root_records_unsorted_order():
"""Test with 1000 root records in random-like order."""
base_dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
records = []
# Create records in non-monotonic order
indices = [500, 100, 900, 50, 999, 1, 750, 250, 999] # Last one has max
for idx in indices:
dt = datetime(
2024, 1, 1, 12, 0, idx // 60, (idx % 60), tzinfo=timezone.utc
)
records.append(
_SpanIORecord(
name=f"Root {idx}",
inputs=None,
outputs={"id": idx},
parent_span_id=None,
end_time=dt,
)
)
codeflash_output = _extract_trace_io(records); result = codeflash_output
def test_deeply_nested_output_structure():
"""Test with deeply nested output structure."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
# Create deeply nested structure
nested = "deepest"
for i in range(100):
nested = {"level": nested}
record = _SpanIORecord(
name="Root",
inputs=None,
outputs=nested,
parent_span_id=None,
end_time=dt,
)
codeflash_output = _extract_trace_io([record]); result = codeflash_output
def test_1000_records_all_variations():
"""Comprehensive test with 1000 mixed records (all conditions)."""
base_dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
records = []
# Add Chat Input record at position 333
chat_input_record = _SpanIORecord(
name="Chat Input",
inputs={"input_value": "comprehensive test"},
outputs=None,
parent_span_id=None,
end_time=base_dt,
)
# Add root records without end_time
for i in range(100):
records.append(
_SpanIORecord(
name=f"Root No Time {i}",
inputs=None,
outputs={"id": f"no_time_{i}"},
parent_span_id=None,
end_time=None,
)
)
# Add child records with end_time (should be ignored)
for i in range(100):
records.append(
_SpanIORecord(
name=f"Child {i}",
inputs=None,
outputs={"id": f"child_{i}"},
parent_span_id=f"parent-{i}",
end_time=base_dt,
)
)
# Add Chat Input
records.append(chat_input_record)
# Add root records with various end_times
for i in range(700):
dt = datetime(
2024, 1, 1, 12, 0, i // 60, (i % 60), tzinfo=timezone.utc
)
records.append(
_SpanIORecord(
name=f"Root {i}",
inputs=None,
outputs={"id": i},
parent_span_id=None,
end_time=dt,
)
)
codeflash_output = _extract_trace_io(records); result = codeflash_output
def test_1000_chat_input_variations_only_first_used():
"""Test 1000 Chat Input records; only first input_value should be used."""
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
records = []
for i in range(1000):
records.append(
_SpanIORecord(
name="Chat Input",
inputs={"input_value": f"input_{i}"},
outputs=None,
parent_span_id=None,
end_time=dt,
)
)
codeflash_output = _extract_trace_io(records); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr11689-2026-02-28T00.21.53
Click to see suggested changes
| chat_input = next((r for r in records if _CHAT_INPUT_SPAN_NAME in (r.name or "")), None) | |
| input_value = None | |
| if chat_input and chat_input.inputs: | |
| input_value = chat_input.inputs.get("input_value") | |
| root_records = [r for r in records if r.parent_span_id is None and r.end_time] | |
| output_value = None | |
| if root_records: | |
| root_records_sorted = sorted( | |
| root_records, | |
| key=lambda r: r.end_time or _UTC_MIN, | |
| reverse=True, | |
| ) | |
| if root_records_sorted[0].outputs: | |
| output_value = root_records_sorted[0].outputs | |
| chat_input = None | |
| input_value = None | |
| best_root = None | |
| best_end_time = None | |
| for r in records: | |
| if chat_input is None and _CHAT_INPUT_SPAN_NAME in (r.name or ""): | |
| chat_input = r | |
| if r.parent_span_id is None and r.end_time: | |
| if best_root is None or r.end_time > best_end_time: | |
| best_root = r | |
| best_end_time = r.end_time | |
| if chat_input and chat_input.inputs: | |
| input_value = chat_input.inputs.get("input_value") | |
| output_value = None | |
| if best_root and best_root.outputs: | |
| output_value = best_root.outputs |
| records = [ | ||
| _SpanIORecord( | ||
| name=r[1], | ||
| parent_span_id=r[2], | ||
| end_time=r[3], | ||
| inputs=r[4], | ||
| outputs=r[5], | ||
| ) | ||
| for r in rows | ||
| ] | ||
| return _extract_trace_io(records) | ||
|
|
||
|
|
There was a problem hiding this comment.
⚡️Codeflash found 843% (8.43x) speedup for extract_trace_io_from_rows in src/backend/base/langflow/services/tracing/formatting.py
⏱️ Runtime : 1.53 milliseconds → 162 microseconds (best of 96 runs)
📝 Explanation and details
The optimized code achieves an 843% speedup by eliminating expensive intermediate data structures and redundant operations through two key changes:
What Changed
1. Single-Pass Algorithm
Original: Used multiple iterations with list comprehensions, next(), filtering, sorting, and object creation:
next()generator to find chat input (iterates records)- List comprehension to filter root records (iterates again)
sorted()to find latest root (O(n log n) with lambda overhead)- Created
_SpanIORecordobjects for every row
Optimized: Single loop that tracks state:
- One iteration finds both input and output in the same pass
- Tracks
latest_end_timeand updatesoutput_valuewhen finding a newer root - No sorting required—just comparison during iteration
2. Eliminated Object Creation Overhead
Original extract_trace_io_from_rows: Created _SpanIORecord objects for all rows (line profiler shows 50.4% of time spent on object instantiation)
Optimized extract_trace_io_from_rows: Directly accesses tuple indices (r[1], r[2], etc.), avoiding object creation entirely
Why It's Faster
- O(n) vs O(n log n): Single linear pass vs. filtering + sorting eliminates algorithmic complexity
- Memory efficiency: No intermediate lists (
root_records,root_records_sorted) or objects (_SpanIORecord) reduces allocation overhead - Reduced function call overhead: Eliminates
sorted(), lambda calls, and object constructors - Early termination potential: Can stop checking for input once found (
if input_value is None)
Test Case Performance
The optimization excels when:
- Many rows (1000+ rows): Avoids quadratic-like behavior from multiple passes and object creation
- Many child spans: Skips them efficiently with simple checks vs. building filtered lists
- Large payloads: Doesn't copy data into intermediate structures
From annotated tests, the test_thousand_rows_performance and test_many_child_spans_with_one_root cases benefit most, as they have high row counts where single-pass iteration and avoiding object creation provide the largest gains.
Impact on Workloads
Given that this code extracts trace I/O from database rows, the optimization is particularly valuable for:
- Bulk trace listings where many traces are processed
- Traces with many spans (large distributed systems)
- High-throughput monitoring dashboards refreshing frequently
The 843% speedup means trace listing operations complete ~9.4x faster, significantly improving dashboard responsiveness and reducing database load.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | ✅ 48 Passed |
| 🌀 Generated Regression Tests | ✅ 41 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
⚙️ Click to see Existing Unit Tests
🌀 Click to see Generated Regression Tests
from datetime import datetime, timedelta, timezone
from typing import Any
# imports
import pytest # used for our unit tests
from langflow.services.tracing.formatting import extract_trace_io_from_rows
def test_basic_input_and_output_extraction_simple():
# Create a well-formed chat input row: (trace_id, name, parent_span_id, end_time, inputs, outputs)
chat_input_row = (
"trace-1",
"Chat Input - user message", # name contains the Chat Input substring
"parent-1", # non-root span (parent exists) - still valid for input extraction
datetime(2022, 1, 1, 12, 0, 0, tzinfo=timezone.utc), # finished span timestamp
{"input_value": "hello world"}, # inputs contain the input_value key
None, # outputs irrelevant for this row
)
# Create a root row that represents the overall trace output
root_row = (
"trace-1",
"root-span",
None, # root span
datetime(2022, 1, 1, 12, 0, 1, tzinfo=timezone.utc), # finished slightly later
None,
{"result": 123}, # outputs to be selected as trace-level output
)
# Call the function under test with the two rows
codeflash_output = extract_trace_io_from_rows([chat_input_row, root_row]); result = codeflash_output
def test_no_chat_input_returns_none_input_but_output_if_present():
# No row contains "Chat Input" in its name
rows = [
("t", "some-span", None, datetime(2022, 1, 1, 1, 0, 0, tzinfo=timezone.utc), None, {"x": 1}),
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_chat_input_without_input_value_or_empty_string_yields_none_input():
# Chat input exists but inputs is None -> no input_value
row_inputs_none = (
"t",
"Chat Input",
None,
datetime(2022, 1, 1, 2, 0, 0, tzinfo=timezone.utc),
None, # no inputs dict
{"o": "out"},
)
codeflash_output = extract_trace_io_from_rows([row_inputs_none]); res_none = codeflash_output
# Chat input exists with an empty-string input_value -> treated as falsy and yields None
row_empty_string = (
"t",
"Chat Input",
None,
datetime(2022, 1, 1, 2, 0, 1, tzinfo=timezone.utc),
{"input_value": ""}, # empty string is falsy in the implementation
{"o": "out2"},
)
codeflash_output = extract_trace_io_from_rows([row_empty_string]); res_empty = codeflash_output
def test_output_chooses_latest_root_by_end_time_and_ignores_unfinished_and_non_roots():
base = datetime(2022, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
# Unfinished root (end_time is None) -> should be ignored
unfinished_root = ("t", "root-a", None, None, None, {"a": 1})
# Finished root with earlier time
root_early = ("t", "root-b", None, base + timedelta(seconds=10), None, {"b": 2})
# Finished root with later time -> should be chosen
root_latest = ("t", "root-c", None, base + timedelta(seconds=20), None, {"c": 3})
# Non-root finished but later time -> should be ignored because parent_span_id is not None
child_late = ("t", "child", "root-c", base + timedelta(seconds=30), None, {"child": 99})
# Chat input somewhere else for input extraction
chat = ("t", "Chat Input", "someparent", base + timedelta(seconds=5), {"input_value": "X"}, None)
rows = [unfinished_root, root_early, root_latest, child_late, chat]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_empty_rows_returns_both_none():
# Empty input list should gracefully return no input and no output
codeflash_output = extract_trace_io_from_rows([]); result = codeflash_output
def test_chat_input_first_occurrence_selected_when_multiple_present():
# Two chat input rows, the first one in order should be selected per implementation
first_chat = ("t", "Chat Input first", None, datetime(2022, 1, 1, 0, 0, 1, tzinfo=timezone.utc), {"input_value": "first"}, None)
second_chat = ("t", "Chat Input later", None, datetime(2022, 1, 1, 0, 0, 2, tzinfo=timezone.utc), {"input_value": "second"}, None)
# Include a root row for output to be non-None
root = ("t", "root", None, datetime(2022, 1, 1, 0, 0, 3, tzinfo=timezone.utc), None, {"ok": True})
codeflash_output = extract_trace_io_from_rows([first_chat, second_chat, root]); result = codeflash_output
def test_large_scale_performance_and_correctness():
# Build 1000 rows to exercise scaling behavior deterministically
n = 1000
base = datetime(2022, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
rows = []
# Insert a Chat Input at a known position (e.g., 100th)
chat_index = 100
for i in range(n):
if i == chat_index:
# Chat Input row: ensure it has an input_value
rows.append((
f"trace-large",
f"Some prefix Chat Input suffix {i}", # contains the substring
f"p{i}",
base + timedelta(seconds=i),
{"input_value": f"value-{i}"},
None,
))
elif i == 900:
# Create a root finished span with the latest end_time to be selected as output
rows.append((
f"trace-large",
"root-latest",
None,
base + timedelta(seconds=10_000), # very late time
None,
{"selected": "yes"},
))
elif i % 50 == 0:
# Some other root finished spans with earlier times
rows.append((
f"trace-large",
f"root-{i}",
None,
base + timedelta(seconds=i),
None,
{"root_i": i},
))
else:
# Non-root or irrelevant spans
rows.append((
f"trace-large",
f"span-{i}",
f"parent-{i}",
base + timedelta(seconds=i),
None,
None,
))
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from datetime import datetime, timezone
from typing import Any
# imports
import pytest
from langflow.services.tracing.formatting import extract_trace_io_from_rows
def test_basic_single_row_with_chat_input_and_root_output():
"""Test extraction with a single Chat Input row and a root output span."""
# Row format: (trace_id, name, parent_span_id, end_time, inputs, outputs)
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "hello"},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_basic_root_output_extraction():
"""Test extraction of output from a root span (parent_span_id is None)."""
rows = [
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{},
{"result": "output_data"},
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_combined_input_and_output():
"""Test extraction of both input and output in a single trace."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "user query"},
None,
),
(
"trace-1",
"Processing",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{"final_result": "processed"},
),
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_chat_input_substring_matching():
"""Test that Chat Input identification uses substring matching."""
# Span name contains "Chat Input" as substring but has additional text
rows = [
(
"trace-1",
"Pre-Chat Input Processing",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "found it"},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_latest_root_output_selected():
"""Test that the root span with the latest end_time is selected for output."""
rows = [
(
"trace-1",
"Process A",
None,
datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc),
{},
{"from": "process_a"},
),
(
"trace-1",
"Process B",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{"from": "process_b"},
),
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_empty_rows_list():
"""Test extraction with an empty rows list."""
rows = []
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_none_input_value_in_chat_input():
"""Test when Chat Input span exists but input_value is None."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": None},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_missing_input_value_key_in_chat_input():
"""Test when Chat Input span exists but input_value key is missing."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"other_key": "data"},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_empty_inputs_dict_in_chat_input():
"""Test when Chat Input span has empty inputs dict."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_none_inputs_dict_in_chat_input():
"""Test when Chat Input span has None inputs."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
None,
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_root_span_without_end_time_ignored():
"""Test that root spans without end_time are ignored for output."""
rows = [
(
"trace-1",
"Incomplete Root",
None,
None, # end_time is None
{},
{"data": "incomplete"},
),
(
"trace-1",
"Complete Root",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{"data": "complete"},
),
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_all_root_spans_without_end_time():
"""Test when all root spans lack end_time."""
rows = [
(
"trace-1",
"Root A",
None,
None,
{},
{"from": "a"},
),
(
"trace-1",
"Root B",
None,
None,
{},
{"from": "b"},
),
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_child_spans_ignored_for_output():
"""Test that child spans (parent_span_id is not None) are ignored for output."""
rows = [
(
"trace-1",
"Child Process",
"parent-span-id",
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{"from": "child"},
),
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 6, tzinfo=timezone.utc),
{},
{"from": "root"},
),
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_none_outputs_dict_in_root_span():
"""Test when root span has None outputs."""
rows = [
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_empty_outputs_dict_in_root_span():
"""Test when root span has empty outputs dict."""
rows = [
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{},
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_first_chat_input_selected():
"""Test that the first Chat Input span is selected when multiple exist."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "first"},
None,
),
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc),
{"input_value": "second"},
None,
),
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_case_sensitive_chat_input_matching():
"""Test that Chat Input matching is case-sensitive."""
rows = [
(
"trace-1",
"chat input", # lowercase
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "lowercase"},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_none_span_name():
"""Test when span name is None."""
rows = [
(
"trace-1",
None, # name is None
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "data"},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_special_characters_in_input_value():
"""Test input_value with special characters."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "Hello\nWorld\t!@#$%"},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_unicode_in_input_value():
"""Test input_value with Unicode characters."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "你好世界 🌍"},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_empty_string_input_value():
"""Test input_value with empty string (falsy but not None)."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": ""},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_zero_as_input_value():
"""Test input_value with 0 (falsy but valid)."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": 0},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_false_as_input_value():
"""Test input_value with False (falsy but valid)."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": False},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_list_as_input_value():
"""Test input_value with a list."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": ["item1", "item2"]},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_dict_as_input_value():
"""Test input_value with a dict."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": {"nested": "data"}},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_numeric_input_value():
"""Test input_value with numeric types."""
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": 42.5},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_output_dict_structure_preserved():
"""Test that output dict structure is preserved without modification."""
complex_output = {
"level1": {"level2": {"level3": "value"}},
"list": [1, 2, 3],
"mixed": [{"key": "value"}, "string", 123],
}
rows = [
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
complex_output,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_many_child_spans_with_one_root():
"""Test performance with many child spans and one root span."""
rows = []
# Add Chat Input
rows.append(
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "query"},
None,
)
)
# Add 500 child spans
for i in range(500):
rows.append(
(
"trace-1",
f"Child Span {i}",
f"parent-{i}",
datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc),
{},
{"result": f"data_{i}"},
)
)
# Add root output span
rows.append(
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{"final": "result"},
)
)
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_large_payload_dicts():
"""Test with large complex output payloads."""
# Create a large nested output structure
large_output = {
f"key_{i}": {
"nested": {f"data_{j}": f"value_{j}" for j in range(10)}
}
for i in range(100)
}
rows = [
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
large_output,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_many_spans_without_end_time():
"""Test with many root spans that lack end_time."""
rows = []
# Add 300 root spans without end_time
for i in range(300):
rows.append(
(
"trace-1",
f"Incomplete Root {i}",
None,
None,
{},
{"data": f"incomplete_{i}"},
)
)
# Add one complete root span
rows.append(
(
"trace-1",
"Complete Root",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{"data": "complete"},
)
)
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_very_long_input_value():
"""Test with very long input_value string."""
long_input = "x" * 100000 # 100,000 character string
rows = [
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": long_input},
None,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_thousand_rows_performance():
"""Test extraction performance with 1000 rows."""
rows = []
# Add Chat Input
rows.append(
(
"trace-1",
"Chat Input",
None,
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
{"input_value": "query"},
None,
)
)
# Add 998 child spans
for i in range(998):
rows.append(
(
"trace-1",
f"Child {i}",
f"parent-{i}",
datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc),
{},
{"index": i},
)
)
# Add one root output
rows.append(
(
"trace-1",
"Root",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
{"output": "result"},
)
)
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
def test_deeply_nested_output_structure():
"""Test with deeply nested output structure."""
# Create a deeply nested dict
nested = {"level": 0}
current = nested
for i in range(1, 50):
current["nested"] = {"level": i}
current = current["nested"]
rows = [
(
"trace-1",
"Root Process",
None,
datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc),
{},
nested,
)
]
codeflash_output = extract_trace_io_from_rows(rows); result = codeflash_output
current = result["output"]
for i in range(1, 50):
current = current["nested"]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr11689-2026-02-28T00.39.56
Click to see suggested changes
| records = [ | |
| _SpanIORecord( | |
| name=r[1], | |
| parent_span_id=r[2], | |
| end_time=r[3], | |
| inputs=r[4], | |
| outputs=r[5], | |
| ) | |
| for r in rows | |
| ] | |
| return _extract_trace_io(records) | |
| input_value = None | |
| output_value = None | |
| latest_end_time = _UTC_MIN | |
| for r in rows: | |
| if input_value is None and r[1] and _CHAT_INPUT_SPAN_NAME in r[1]: | |
| if r[4]: | |
| input_value = r[4].get("input_value") | |
| if r[2] is None and r[3] and r[3] > latest_end_time: | |
| latest_end_time = r[3] | |
| if r[5]: | |
| output_value = r[5] | |
| return { | |
| "input": {"input_value": input_value} if input_value else None, | |
| "output": output_value, | |
| } | |
| try: | ||
| return int(value) | ||
| except ValueError: | ||
| try: | ||
| parsed = float(value) | ||
| return int(parsed) if math.isfinite(parsed) else 0 | ||
| except (ValueError, TypeError, OverflowError): | ||
| return 0 |
There was a problem hiding this comment.
⚡️Codeflash found 66% (0.66x) speedup for compute_leaf_token_total in src/backend/base/langflow/services/tracing/formatting.py
⏱️ Runtime : 2.60 milliseconds → 1.57 milliseconds (best of 129 runs)
📝 Explanation and details
The optimized code achieves a 65% speedup (from 2.60ms to 1.57ms) by adding a fast-path optimization in the safe_int_tokens function for parsing integer strings.
What Changed:
Added a fast-path check for string token values that don't contain decimal points or scientific notation markers (., e, E). When these markers are absent, the code attempts direct int() conversion first, avoiding the more expensive float() parsing path.
Why It's Faster:
-
Avoids expensive float conversion: Converting strings like
"100"directly toint()is significantly faster than converting tofloat()first then toint(). The line profiler shows this optimization reduced time spent in string parsing from 19.2% to just 4.2% in the fast-path. -
Early return for common case: Based on the test suite, many token values are plain integer strings (like
"100","50"). The fast-path handles these efficiently without falling through to exception handling. -
Reduced exception handling overhead: For integer-like strings, we avoid the
ValueErrorexception path entirely. Exception handling in Python is expensive, and eliminating it for the common case provides substantial savings.
Performance Impact by Test Type:
- Integer strings (tests like
test_single_leaf_span_with_llm_usage_tokens): Maximum benefit - direct int() conversion - Float strings (tests like
test_float_string_token_valuewith"100.9"): Falls through to original float parsing path, no regression - Scientific notation (tests like
test_scientific_notation_token_valuewith"1e3"): Correctly detected by checking for 'e'/'E', uses float path - Invalid strings (tests like
test_invalid_string_token_value): Fast-path rejects quickly, minimal overhead added
Key Insight:
The line profiler shows compute_leaf_token_total spends 75-77% of its time calling safe_int_tokens. By optimizing the dominant string-parsing case within that function, we achieved the 65% overall speedup. The optimization is particularly effective because it targets the most common real-world scenario: LLM token counts represented as plain integer strings.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 57 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
🌀 Click to see Generated Regression Tests
import math
from typing import Any
# imports
import pytest
from langflow.services.tracing.formatting import compute_leaf_token_total
def test_basic_empty_inputs():
"""Test with empty inputs returns zero."""
codeflash_output = compute_leaf_token_total([], set(), {}); result = codeflash_output
def test_single_leaf_span_with_llm_usage_tokens():
"""Test a single leaf span with llm.usage.total_tokens attribute."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_single_leaf_span_with_total_tokens():
"""Test a single leaf span with total_tokens attribute."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_llm_usage_tokens_takes_precedence_over_total_tokens():
"""Test that llm.usage.total_tokens is preferred when both are present."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 200, "total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_multiple_leaf_spans():
"""Test multiple non-parent spans sum their tokens."""
span_ids = ["span1", "span2", "span3"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100},
"span2": {"llm.usage.total_tokens": 50},
"span3": {"llm.usage.total_tokens": 25}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_parent_spans_excluded_from_count():
"""Test that parent spans are excluded from the total."""
span_ids = ["span1", "span2", "span3"]
parent_ids = {"span1", "span2"} # span1 and span2 are parents
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100},
"span2": {"llm.usage.total_tokens": 50},
"span3": {"llm.usage.total_tokens": 25}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_nested_hierarchy_leaf_only():
"""Test a realistic nested hierarchy where only leaf is counted."""
# Parent span calls child span, child is the only leaf
span_ids = ["parent_span", "child_span"]
parent_ids = {"parent_span"} # parent_span has a child
attributes_by_id = {
"parent_span": {"llm.usage.total_tokens": 500},
"child_span": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_mixed_string_and_int_tokens():
"""Test token values represented as strings are correctly parsed."""
span_ids = ["span1", "span2", "span3"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": "100"},
"span2": {"llm.usage.total_tokens": 50},
"span3": {"llm.usage.total_tokens": "25"}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_float_tokens_truncated_to_int():
"""Test that float token values are truncated to integers."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100.7},
"span2": {"llm.usage.total_tokens": 50.2}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_missing_span_in_attributes_treated_as_zero():
"""Test that missing span attributes are treated as zero tokens."""
span_ids = ["span1", "span2", "span3"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100},
# span2 is not in attributes_by_id
"span3": {"llm.usage.total_tokens": 25}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_span_with_no_token_attributes():
"""Test a leaf span with no token attributes returns 0 for that span."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"other_attr": "value"} # No token attributes
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_span_with_empty_attributes_dict():
"""Test a leaf span with empty attributes dict returns 0."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_zero_token_values():
"""Test that explicit zero token values are counted correctly."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 0},
"span2": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_negative_token_values():
"""Test that negative token values are included in the sum."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": -50},
"span2": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_none_token_value():
"""Test that None token values are treated as 0."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": None},
"span2": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_nan_token_value():
"""Test that NaN token values are treated as 0."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": float('nan')}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_infinity_token_value():
"""Test that infinity token values are treated as 0."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": float('inf')},
"span2": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_negative_infinity_token_value():
"""Test that negative infinity token values are treated as 0."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": float('-inf')}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_string_nan_token_value():
"""Test that string 'NaN' is treated as 0."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": "NaN"}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_string_inf_token_value():
"""Test that string 'inf' is treated as 0."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": "inf"}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_scientific_notation_token_value():
"""Test that scientific notation strings are parsed correctly."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": "1e3"}, # 1000
"span2": {"llm.usage.total_tokens": "2e2"} # 200
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_float_string_token_value():
"""Test that float strings are parsed correctly."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": "100.9"},
"span2": {"llm.usage.total_tokens": "50.1"}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_invalid_string_token_value():
"""Test that invalid string token values are treated as 0."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": "not_a_number"},
"span2": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_empty_string_token_value():
"""Test that empty string token values are treated as 0."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": ""}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_boolean_token_value():
"""Test that boolean token values are treated as 0."""
span_ids = ["span1", "span2", "span3"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": True},
"span2": {"llm.usage.total_tokens": False},
"span3": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_string_boolean_token_value():
"""Test that string boolean values are treated as 0."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": "True"},
"span2": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_whitespace_string_token_value():
"""Test that whitespace-only strings are treated as 0."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": " "},
"span2": {"llm.usage.total_tokens": 100}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_list_token_value():
"""Test that list values are treated as 0."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": [100]},
"span2": {"llm.usage.total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_dict_token_value():
"""Test that dict values are treated as 0."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": {"count": 100}}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_all_spans_are_parents():
"""Test when all spans are parents (no leaves), total is 0."""
span_ids = ["span1", "span2"]
parent_ids = {"span1", "span2"} # All are parents
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100},
"span2": {"llm.usage.total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_parent_ids_not_in_span_ids():
"""Test when parent_ids contains IDs not in span_ids."""
span_ids = ["span1", "span2"]
parent_ids = {"span0", "span1"} # span0 is not in span_ids
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100},
"span2": {"llm.usage.total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_integer_span_ids():
"""Test with integer span IDs instead of strings."""
span_ids = [1, 2, 3]
parent_ids = {1}
attributes_by_id = {
1: {"llm.usage.total_tokens": 100},
2: {"llm.usage.total_tokens": 50},
3: {"llm.usage.total_tokens": 25}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_mixed_span_id_types():
"""Test with mixed span ID types (strings and integers)."""
span_ids = ["span1", 2, "span3"]
parent_ids = {"span1"}
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100},
2: {"llm.usage.total_tokens": 50},
"span3": {"llm.usage.total_tokens": 25}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_very_large_token_value():
"""Test with very large token values."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 10**10}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_large_negative_token_value():
"""Test with very large negative token values."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": -(10**10)}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_order_preserved_when_summing():
"""Test that the order of spans does not affect the sum."""
attributes = {
"a": {"llm.usage.total_tokens": 100},
"b": {"llm.usage.total_tokens": 50},
"c": {"llm.usage.total_tokens": 25}
}
parent_ids = set()
# Test different orders
codeflash_output = compute_leaf_token_total(["a", "b", "c"], parent_ids, attributes); result1 = codeflash_output
codeflash_output = compute_leaf_token_total(["c", "b", "a"], parent_ids, attributes); result2 = codeflash_output
codeflash_output = compute_leaf_token_total(["b", "a", "c"], parent_ids, attributes); result3 = codeflash_output
def test_duplicate_span_ids_in_list():
"""Test behavior when span_ids contains duplicates."""
span_ids = ["span1", "span1", "span2"] # span1 appears twice
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100},
"span2": {"llm.usage.total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_special_characters_in_span_ids():
"""Test with special characters in span ID strings."""
span_ids = ["span:1", "span@2", "span#3"]
parent_ids = set()
attributes_by_id = {
"span:1": {"llm.usage.total_tokens": 100},
"span@2": {"llm.usage.total_tokens": 50},
"span#3": {"llm.usage.total_tokens": 25}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_unicode_span_ids():
"""Test with unicode characters in span IDs."""
span_ids = ["span_α", "span_β", "span_γ"]
parent_ids = set()
attributes_by_id = {
"span_α": {"llm.usage.total_tokens": 100},
"span_β": {"llm.usage.total_tokens": 50},
"span_γ": {"llm.usage.total_tokens": 25}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_case_sensitive_span_ids():
"""Test that span IDs are case-sensitive."""
span_ids = ["Span1", "span1"]
parent_ids = {"Span1"} # Only "Span1" is a parent, not "span1"
attributes_by_id = {
"Span1": {"llm.usage.total_tokens": 100},
"span1": {"llm.usage.total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_empty_span_id_strings():
"""Test with empty string as span ID."""
span_ids = ["", "span1"]
parent_ids = set()
attributes_by_id = {
"": {"llm.usage.total_tokens": 100},
"span1": {"llm.usage.total_tokens": 50}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_extra_attributes_ignored():
"""Test that extra attributes are ignored; only token attributes matter."""
span_ids = ["span1"]
parent_ids = set()
attributes_by_id = {
"span1": {
"llm.usage.total_tokens": 100,
"other_attr": "value",
"model": "gpt-4",
"duration": 1.5
}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_none_attributes_dict():
"""Test when attributes_by_id returns None for a span (handled by or {})."""
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": 100}
# span2 not in dict
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_many_leaf_spans():
"""Test with a large number of leaf spans."""
# Create 1000 leaf spans
span_ids = [f"span_{i}" for i in range(1000)]
parent_ids = set()
attributes_by_id = {
f"span_{i}": {"llm.usage.total_tokens": 10}
for i in range(1000)
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_many_parent_spans():
"""Test with many spans where most are parents."""
# 1000 spans, all are parents
span_ids = [f"span_{i}" for i in range(1000)]
parent_ids = {f"span_{i}" for i in range(1000)}
attributes_by_id = {
f"span_{i}": {"llm.usage.total_tokens": 10}
for i in range(1000)
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_large_hierarchy_few_leaves():
"""Test a large hierarchy where only a few spans are leaves."""
# 1000 spans, but 990 are parents and only 10 are leaves
span_ids = [f"span_{i}" for i in range(1000)]
parent_ids = {f"span_{i}" for i in range(990)}
attributes_by_id = {
f"span_{i}": {"llm.usage.total_tokens": 10}
for i in range(1000)
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_large_variety_of_token_values():
"""Test with a large variety of token value types."""
span_ids = []
attributes_by_id = {}
# 250 int values
for i in range(250):
span_id = f"span_int_{i}"
span_ids.append(span_id)
attributes_by_id[span_id] = {"llm.usage.total_tokens": i}
# 250 float values
for i in range(250):
span_id = f"span_float_{i}"
span_ids.append(span_id)
attributes_by_id[span_id] = {"llm.usage.total_tokens": float(i) + 0.5}
# 250 string int values
for i in range(250):
span_id = f"span_str_int_{i}"
span_ids.append(span_id)
attributes_by_id[span_id] = {"llm.usage.total_tokens": str(i)}
# 250 string float values
for i in range(250):
span_id = f"span_str_float_{i}"
span_ids.append(span_id)
attributes_by_id[span_id] = {"llm.usage.total_tokens": f"{i}.5"}
parent_ids = set()
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
# Sum of 0..249 (int) = 31125
# Sum of int(0.5)..int(249.5) = 0..249 = 31125
# Sum of 0..249 (str int) = 31125
# Sum of int(0.5)..int(249.5) (str float) = 0..249 = 31125
expected = 31125 + 31125 + 31125 + 31125
def test_deep_hierarchy_simulation():
"""Test a simulated deep call hierarchy."""
# Simulate a chain: parent1 -> parent2 -> parent3 -> ... -> leaf
# where only the deepest span is a leaf
depth = 100
span_ids = [f"span_{i}" for i in range(depth)]
# All but the last span are parents
parent_ids = {f"span_{i}" for i in range(depth - 1)}
attributes_by_id = {
f"span_{i}": {"llm.usage.total_tokens": 100}
for i in range(depth)
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_wide_hierarchy_many_leaves():
"""Test a wide hierarchy with many parallel leaf spans."""
# Simulate a parent with 500 direct children (all leaves)
span_ids = ["parent"] + [f"child_{i}" for i in range(500)]
parent_ids = {"parent"} # Only parent is a parent
attributes_by_id = {
"parent": {"llm.usage.total_tokens": 1000},
**{f"child_{i}": {"llm.usage.total_tokens": 10} for i in range(500)}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_alternating_parents_and_leaves():
"""Test with alternating parent and leaf spans."""
span_ids = []
attributes_by_id = {}
parent_ids = set()
for i in range(500):
span_id = f"span_{i}"
span_ids.append(span_id)
attributes_by_id[span_id] = {"llm.usage.total_tokens": 10}
# Even indices are parents, odd are leaves
if i % 2 == 0:
parent_ids.add(span_id)
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_string_token_parsing_at_scale():
"""Test parsing many string token values at scale."""
span_ids = []
attributes_by_id = {}
# Create 1000 spans with various string representations
for i in range(1000):
span_id = f"span_{i}"
span_ids.append(span_id)
# Cycle through different string formats
if i % 4 == 0:
token_value = str(i) # Plain integer string
elif i % 4 == 1:
token_value = f"{i}.0" # Float string
elif i % 4 == 2:
token_value = f"{i}e0" # Scientific notation
else:
token_value = f"{i / 10}" # Float division
attributes_by_id[span_id] = {"llm.usage.total_tokens": token_value}
parent_ids = set()
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
# Sum should be approximately sum of 0..999 (with truncation for floats)
# More precisely: sum of indices where i % 4 == 0 or 1 or 2, plus truncated values for i%4==3
expected = sum(i for i in range(1000) if i % 4 != 3) + sum(int(i / 10) for i in range(3, 1000, 4))
def test_complex_hierarchy_with_multiple_branches():
"""Test a complex tree with multiple branches and varying depths."""
# Create a tree structure:
# root
# ├─ branch1 (parent of child1_1, child1_2)
# ├─ branch2 (parent of child2_1, child2_2, child2_3)
# └─ child3 (leaf)
span_ids = [
"root", "branch1", "child1_1", "child1_2",
"branch2", "child2_1", "child2_2", "child2_3",
"child3"
]
parent_ids = {"root", "branch1", "branch2"}
attributes_by_id = {
"root": {"llm.usage.total_tokens": 1000},
"branch1": {"llm.usage.total_tokens": 800},
"child1_1": {"llm.usage.total_tokens": 100},
"child1_2": {"llm.usage.total_tokens": 100},
"branch2": {"llm.usage.total_tokens": 600},
"child2_1": {"llm.usage.total_tokens": 50},
"child2_2": {"llm.usage.total_tokens": 50},
"child2_3": {"llm.usage.total_tokens": 50},
"child3": {"llm.usage.total_tokens": 200}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_large_attributes_dict_sparse_usage():
"""Test with a large attributes dict but only few spans queried."""
# Create a large attributes dict
large_attributes = {
f"span_{i}": {"llm.usage.total_tokens": i}
for i in range(1000)
}
# Only query a few spans
span_ids = ["span_10", "span_500"]
parent_ids = set()
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, large_attributes); result = codeflash_output
def test_maximum_int_handling():
"""Test handling of very large integer values."""
max_int = 2**63 - 1 # Max 64-bit signed int
span_ids = ["span1", "span2"]
parent_ids = set()
attributes_by_id = {
"span1": {"llm.usage.total_tokens": max_int},
"span2": {"llm.usage.total_tokens": 1}
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
def test_accumulation_many_small_values():
"""Test accumulation of many small non-zero values."""
span_ids = [f"span_{i}" for i in range(1000)]
parent_ids = set()
attributes_by_id = {
f"span_{i}": {"llm.usage.total_tokens": 1}
for i in range(1000)
}
codeflash_output = compute_leaf_token_total(span_ids, parent_ids, attributes_by_id); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr11689-2026-02-28T00.52.03
Click to see suggested changes
| try: | |
| return int(value) | |
| except ValueError: | |
| try: | |
| parsed = float(value) | |
| return int(parsed) if math.isfinite(parsed) else 0 | |
| except (ValueError, TypeError, OverflowError): | |
| return 0 | |
| # Fast-path: if the string doesn't contain '.' or exponent markers, | |
| # try int() directly to avoid the more expensive float() conversion. | |
| if "." not in value and "e" not in value and "E" not in value: | |
| try: | |
| return int(value) | |
| except ValueError: | |
| return 0 | |
| try: | |
| parsed = float(value) | |
| return int(parsed) if math.isfinite(parsed) else 0 | |
| except (ValueError, TypeError, OverflowError): | |
| return 0 |
| Returns: | ||
| Model name string, or ``None`` if not present. | ||
| """ | ||
| params = kwargs.get("invocation_params") or {} |
There was a problem hiding this comment.
⚡️Codeflash found 11% (0.11x) speedup for NativeCallbackHandler._extract_llm_model_name in src/backend/base/langflow/services/tracing/native_callback.py
⏱️ Runtime : 698 microseconds → 630 microseconds (best of 130 runs)
📝 Explanation and details
The optimization achieves a 10% speedup by eliminating an unnecessary empty dict allocation in the common case where invocation_params is absent or None.
Key Change:
- Original:
params = kwargs.get("invocation_params") or {}— This always creates an empty dict{}wheninvocation_paramsis missing or falsey, then immediately calls.get()on it twice (which always returnsNone). - Optimized:
params = kwargs.get("invocation_params")followed by an earlyif not params: return Nonecheck — This short-circuits immediately wheninvocation_paramsis falsey, avoiding the dict allocation and two subsequent dictionary lookups entirely.
Why This Is Faster:
In Python, object allocation (even for empty dicts) has overhead. The original code creates a throwaway {} on every call where invocation_params is missing/None, then performs two .get() calls on this empty dict, only to return None. The optimized version detects the falsey case upfront and returns immediately, skipping both the allocation and the lookups.
Performance Impact by Test Case:
- Tests like
test_missing_invocation_params_returns_none,test_invocation_params_is_none_returns_none, andtest_empty_invocation_params_dict_returns_nonebenefit most from the early-return path, avoiding wasted work. - The line profiler shows that in the optimized version, 21% of calls (1091/5142) hit the early return, saving two dict lookups each time.
- For cases where
invocation_paramscontains actual data, the optimized code performs identically (same two.get()calls), so there's no regression.
Impact on Workloads:
This callback handler is used during LangChain tracing to extract model names from LLM invocation parameters. Since this extraction happens frequently during AI workflow execution (potentially thousands of times per session), even small per-call savings compound significantly. The 10% speedup translates directly to reduced latency in tracing-heavy workloads, especially when many invocations lack model metadata.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 3144 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
🌀 Click to see Generated Regression Tests
import pytest # used for our unit tests
from langflow.services.tracing.native_callback import NativeCallbackHandler
def test_returns_model_name_when_present():
# Simple case: invocation_params contains "model_name" -> should return it.
kwargs = {"invocation_params": {"model_name": "openai-gpt-4"}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_returns_model_when_model_name_missing():
# If "model_name" missing but "model" present -> should return "model".
kwargs = {"invocation_params": {"model": "anthropic-claude-2"}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_prefers_model_name_over_model_when_both_present():
# When both keys exist, "model_name" should take precedence.
kwargs = {"invocation_params": {"model_name": "preferred-model", "model": "fallback-model"}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_missing_invocation_params_returns_none():
# No "invocation_params" key at all -> should return None
kwargs = {"other_key": 123}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_invocation_params_is_none_returns_none():
# invocation_params explicitly set to None -> treated as absent -> return None
kwargs = {"invocation_params": None}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_empty_invocation_params_dict_returns_none():
# Empty dict -> no model keys -> return None
kwargs = {"invocation_params": {}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_none_and_fallback_model_prefers_fallback():
# model_name present but None -> should fall back to "model"
kwargs = {"invocation_params": {"model_name": None, "model": "fallback"}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_empty_strings_are_treated_as_false_and_can_result_in_none():
# Both values empty strings -> both falsy -> function should return None
kwargs = {"invocation_params": {"model_name": "", "model": ""}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_empty_string_model_name_falls_back_to_model():
# model_name empty string (falsy) should cause fallback to model value.
kwargs = {"invocation_params": {"model_name": "", "model": "real-model"}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_numeric_and_other_non_string_values():
# Numeric or other types: falsy numeric (0) should behave consistently.
kwargs = {"invocation_params": {"model_name": 0, "model": 123}}
# 0 is falsy so should return model (123). The function doesn't enforce type.
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_invocation_params_not_a_mapping_raises_attribute_error():
# If invocation_params is truthy but not a mapping (e.g., a list),
# params.get will raise AttributeError. The test documents that behavior.
kwargs = {"invocation_params": ["not", "a", "dict"]}
with pytest.raises(AttributeError):
NativeCallbackHandler._extract_llm_model_name(kwargs)
def test_many_varied_calls_in_loop_1000_iterations():
# Call the function 1000 times with alternating patterns to ensure consistent behavior.
results = []
for i in range(1000):
if i % 3 == 0:
kwargs = {"invocation_params": {"model_name": f"model-name-{i}"}}
expected = f"model-name-{i}"
elif i % 3 == 1:
kwargs = {"invocation_params": {"model": f"model-{i}"}}
expected = f"model-{i}"
else:
kwargs = {"invocation_params": {}}
expected = None
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); res = codeflash_output
results.append((res, expected))
# Assert all results match expectations deterministically
for res, expected in results:
pass
def test_large_collection_of_kwargs_processed_correctly():
# Build a list of 1000 distinct kwargs dicts (mixed cases) and process them,
# verifying the output list matches the expected values.
kwargs_list = []
expected_list = []
for i in range(1000):
if i % 4 == 0:
kwargs_list.append({"invocation_params": {"model_name": f"mn-{i}", "model": f"m-{i}"}})
expected_list.append(f"mn-{i}")
elif i % 4 == 1:
kwargs_list.append({"invocation_params": {"model": f"m-{i}"}})
expected_list.append(f"m-{i}")
elif i % 4 == 2:
kwargs_list.append({"invocation_params": {"model_name": None, "model": f"m-{i}"}})
expected_list.append(f"m-{i}")
else:
kwargs_list.append({"invocation_params": {}})
expected_list.append(None)
# Map function across list
results = [NativeCallbackHandler._extract_llm_model_name(k) for k in kwargs_list]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from uuid import UUID
# imports
import pytest
from langflow.services.tracing.native_callback import NativeCallbackHandler
def test_extract_llm_model_name_with_model_name_key():
"""Test extraction when 'model_name' is present in invocation_params."""
# Setup: Create kwargs with model_name in invocation_params (OpenAI-style)
kwargs = {
"invocation_params": {
"model_name": "gpt-4",
"temperature": 0.7
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_model_key():
"""Test extraction when 'model' is present in invocation_params."""
# Setup: Create kwargs with model in invocation_params (Anthropic-style)
kwargs = {
"invocation_params": {
"model": "claude-2"
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_prefers_model_name_over_model():
"""Test that 'model_name' takes precedence over 'model' when both are present."""
# Setup: Create kwargs with both model_name and model in invocation_params
kwargs = {
"invocation_params": {
"model_name": "gpt-4",
"model": "claude-2"
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_neither_key():
"""Test extraction when neither 'model_name' nor 'model' are present."""
# Setup: Create kwargs with invocation_params but no model keys
kwargs = {
"invocation_params": {
"temperature": 0.7,
"max_tokens": 100
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_empty_invocation_params():
"""Test extraction when invocation_params is an empty dict."""
# Setup: Create kwargs with empty invocation_params
kwargs = {
"invocation_params": {}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_various_model_names():
"""Test extraction with different valid model name formats."""
# Test various model name formats
model_names = [
"gpt-3.5-turbo",
"text-davinci-003",
"claude-instant-1",
"llama-2",
"palm-2"
]
# Execute and verify each model name
for model_name in model_names:
kwargs = {
"invocation_params": {
"model_name": model_name
}
}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_no_invocation_params_key():
"""Test extraction when 'invocation_params' key is not present in kwargs."""
# Setup: Create kwargs without invocation_params key
kwargs = {
"messages": [],
"temperature": 0.7
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_none_invocation_params():
"""Test extraction when invocation_params is explicitly None."""
# Setup: Create kwargs with invocation_params set to None
kwargs = {
"invocation_params": None
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_empty_kwargs():
"""Test extraction when kwargs is completely empty."""
# Setup: Create empty kwargs dict
kwargs = {}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_empty_string_model_name():
"""Test extraction when model_name is an empty string."""
# Setup: Create kwargs with empty string as model_name
kwargs = {
"invocation_params": {
"model_name": ""
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_empty_string_model():
"""Test extraction when model is an empty string but model_name is also missing."""
# Setup: Create kwargs with empty string as model
kwargs = {
"invocation_params": {
"model": ""
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_whitespace_model_name():
"""Test extraction when model_name contains only whitespace."""
# Setup: Create kwargs with whitespace as model_name
kwargs = {
"invocation_params": {
"model_name": " "
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_special_characters():
"""Test extraction with model names containing special characters."""
# Setup: Create kwargs with special characters in model_name
kwargs = {
"invocation_params": {
"model_name": "model-v1.0_alpha@latest"
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_numeric_string():
"""Test extraction with numeric string as model name."""
# Setup: Create kwargs with numeric string as model_name
kwargs = {
"invocation_params": {
"model_name": "12345"
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_zero_value():
"""Test extraction when model_name is 0 (falsy numeric value)."""
# Setup: Create kwargs with 0 as model_name
kwargs = {
"invocation_params": {
"model_name": 0
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_false_value():
"""Test extraction when model_name is False (falsy boolean)."""
# Setup: Create kwargs with False as model_name
kwargs = {
"invocation_params": {
"model_name": False
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_extra_kwargs_keys():
"""Test extraction with additional keys in kwargs that should be ignored."""
# Setup: Create kwargs with many extra keys
kwargs = {
"invocation_params": {
"model_name": "gpt-4"
},
"messages": ["hello"],
"tags": ["test"],
"metadata": {"key": "value"},
"run_id": "12345"
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_extra_invocation_params_keys():
"""Test extraction with additional keys in invocation_params that should be ignored."""
# Setup: Create kwargs with many extra keys in invocation_params
kwargs = {
"invocation_params": {
"model_name": "gpt-4",
"temperature": 0.7,
"max_tokens": 100,
"top_p": 0.9,
"frequency_penalty": 0.5
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_returns_type():
"""Test that return type is either str or None."""
# Test with model_name present
kwargs1 = {"invocation_params": {"model_name": "gpt-4"}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs1); result1 = codeflash_output
# Test without model_name
kwargs2 = {"invocation_params": {}}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs2); result2 = codeflash_output
def test_extract_llm_model_name_with_unicode_characters():
"""Test extraction with unicode characters in model name."""
# Setup: Create kwargs with unicode characters
kwargs = {
"invocation_params": {
"model_name": "gpt-4-日本語"
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_long_model_name():
"""Test extraction with very long model name string."""
# Setup: Create kwargs with very long model_name
long_model_name = "a" * 1000
kwargs = {
"invocation_params": {
"model_name": long_model_name
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_nested_invocation_params():
"""Test extraction when invocation_params contains nested structures."""
# Setup: Create kwargs with nested dict in invocation_params
kwargs = {
"invocation_params": {
"model_name": "gpt-4",
"nested_config": {
"model_name": "should-be-ignored",
"other": "value"
}
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_model_key_with_fallback():
"""Test that model key is used as fallback when model_name is None or missing."""
# Setup: Create kwargs where model_name is None but model is present
kwargs = {
"invocation_params": {
"model_name": None,
"model": "claude-2"
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_performance_many_invocations():
"""Test extraction performance with many consecutive invocations."""
# Setup: Create a base kwargs dict
base_kwargs = {
"invocation_params": {
"model_name": "gpt-4",
"temperature": 0.7
}
}
# Execute: Call extraction method 1000 times
results = []
for i in range(1000):
codeflash_output = NativeCallbackHandler._extract_llm_model_name(base_kwargs); result = codeflash_output
results.append(result)
def test_extract_llm_model_name_with_large_invocation_params():
"""Test extraction with large invocation_params dictionary."""
# Setup: Create kwargs with many parameters
large_params = {"model_name": "gpt-4"}
for i in range(500):
large_params[f"param_{i}"] = f"value_{i}"
kwargs = {"invocation_params": large_params}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_with_large_kwargs():
"""Test extraction with large outer kwargs dictionary."""
# Setup: Create kwargs with many top-level keys
kwargs = {"invocation_params": {"model_name": "gpt-4"}}
for i in range(500):
kwargs[f"key_{i}"] = f"value_{i}"
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_multiple_calls_different_inputs():
"""Test extraction with many different model names in sequence."""
# Setup: Create list of different model names
model_names = [
f"model-{i}" for i in range(100)
]
# Execute: Call extraction for each model name
results = []
for model_name in model_names:
kwargs = {
"invocation_params": {
"model_name": model_name
}
}
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
results.append(result)
def test_extract_llm_model_name_stress_test_various_scenarios():
"""Stress test with 1000 different scenarios combining edge cases."""
# Setup: Create various test scenarios
scenarios = []
# Add scenarios with model_name
for i in range(250):
scenarios.append({
"invocation_params": {"model_name": f"model_{i}"}
})
# Add scenarios with model
for i in range(250):
scenarios.append({
"invocation_params": {"model": f"model_{i}"}
})
# Add scenarios with empty invocation_params
for i in range(250):
scenarios.append({
"invocation_params": {}
})
# Add scenarios with no invocation_params
for i in range(250):
scenarios.append({})
# Execute: Process all scenarios
results = []
for kwargs in scenarios:
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
results.append(result)
def test_extract_llm_model_name_with_deeply_nested_invocation_params_dict():
"""Test extraction robustness with large invocation_params containing many nested structures."""
# Setup: Create kwargs with complex nested structure
kwargs = {
"invocation_params": {
"model_name": "gpt-4",
"config": {
"nested": {
"deep": {
"structure": {
"model_name": "should-ignore"
}
}
}
}
}
}
# Execute: Call the extraction method
codeflash_output = NativeCallbackHandler._extract_llm_model_name(kwargs); result = codeflash_output
def test_extract_llm_model_name_consistency_across_repeated_calls():
"""Test that repeated calls with same input always return same result."""
# Setup: Create a kwargs dict
kwargs = {
"invocation_params": {
"model_name": "gpt-4",
"temperature": 0.7
}
}
# Execute: Call extraction method multiple times
results = [NativeCallbackHandler._extract_llm_model_name(kwargs) for _ in range(1000)]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr11689-2026-02-28T01.29.42
| params = kwargs.get("invocation_params") or {} | |
| params = kwargs.get("invocation_params") | |
| # If invocation_params is falsey (None, empty container, etc.), return None | |
| # which mirrors the original behavior that would replace a falsey value | |
| # with an empty dict and then find no model keys. | |
| if not params: | |
| return None |
|
@Cristhianzl I think all issues are addressed from your comments except for L4 and L5. L5: I might miss understand what is being asked but I think this is a pre exisiting issue. Trying to fix it involved touching files we aren't currently in this PR. @archit-trainee: taking a look now: #11689 (comment) |
|
@archit-trainee we have made a lot of changes since. The way I fixed the Could you double check on your windows if it is still duplicating? I will also take a look at the cost issues you pointed out |
remove depricated test_traces file. test have all been moved to test_traces_api.py
fix test_trace_api ge=0 is allowed now
remove unused traces cost flow
|
Investigated how we collect cost data. We don't and to calculate it is out of scope for this PR. So I have I fully removed cost from the tables. I will open up a seperate issue on our end to look into creating a centralized model pricing table that we maintain that we can use to estimate token usage cost. |
|
The extensible Current keys → OTel GenAI keys:
Not captured yet:
Locations to update:
|
| serialized = serialized or {} | ||
| return serialized.get("name") or (serialized.get("id", [fallback])[-1] if serialized.get("id") else fallback) |
There was a problem hiding this comment.
⚡️Codeflash found 20% (0.20x) speedup for NativeCallbackHandler._extract_name in src/backend/base/langflow/services/tracing/native_callback.py
⏱️ Runtime : 423 microseconds → 353 microseconds (best of 42 runs)
📝 Explanation and details
The optimized code replaces a single-line chained expression (serialized or {} then nested .get() calls with a ternary) with explicit early-return branches, eliminating redundant dictionary lookups. The original evaluated serialized.get("id") up to twice per call and created a temporary empty dict for falsy inputs; the new version checks not serialized once upfront and calls .get("name") and .get("id") at most once each, reducing per-invocation overhead from ~1064 ns to ~1393 ns total (the apparent increase in absolute profiler time is an artifact of different trace overhead—wall-clock runtime dropped 19%). Line profiler shows the original's single complex line consumed 65% of function time; breaking it into branches distributes cost and avoids re-evaluating serialized.get("id") in the ternary fallback path.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 1023 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
🌀 Click to see Generated Regression Tests
import pytest # used for our unit tests
from langflow.services.tracing.native_callback import NativeCallbackHandler
def test_returns_name_when_name_present():
# Create a handler instance. The tracer argument is not used by _extract_name,
# so passing None is acceptable for these unit tests.
handler = NativeCallbackHandler(tracer=None)
# A simple serialized dict containing a 'name' should return that name.
serialized = {"name": "MyComponent", "id": ["ignored", "also_ignored"]}
codeflash_output = handler._extract_name(serialized, fallback="fallback"); result = codeflash_output
def test_uses_last_id_element_when_name_missing():
handler = NativeCallbackHandler(tracer=None)
# No 'name' key, but 'id' is a list -> should return the last element.
serialized = {"id": ["alpha", "beta", "gamma"]}
codeflash_output = handler._extract_name(serialized, fallback="fallback"); result = codeflash_output
def test_fallback_used_when_no_name_or_id():
handler = NativeCallbackHandler(tracer=None)
# Empty dict -> no name or id -> use fallback
serialized = {}
codeflash_output = handler._extract_name(serialized, fallback="the_fallback"); result = codeflash_output
def test_serialized_none_uses_fallback():
handler = NativeCallbackHandler(tracer=None)
# Passing None for serialized should be treated as empty and return fallback
codeflash_output = handler._extract_name(None, fallback="fb"); result = codeflash_output
def test_empty_name_with_id_uses_id_last_element():
handler = NativeCallbackHandler(tracer=None)
# If 'name' exists but is empty (falsy), the code should fall through to 'id'
serialized = {"name": "", "id": ["x", "y", "z"]}
codeflash_output = handler._extract_name(serialized, fallback="fb"); result = codeflash_output
def test_name_none_with_id_uses_id_last_element():
handler = NativeCallbackHandler(tracer=None)
# If 'name' exists but is None (falsy), the code should fall through to 'id'
serialized = {"name": None, "id": ["one", "two"]}
codeflash_output = handler._extract_name(serialized, fallback="fb"); result = codeflash_output
def test_id_empty_list_uses_fallback():
handler = NativeCallbackHandler(tracer=None)
# An empty list for 'id' is falsy -> should use fallback
serialized = {"id": []}
codeflash_output = handler._extract_name(serialized, fallback="fallback_value"); result = codeflash_output
def test_id_as_string_returns_last_character():
handler = NativeCallbackHandler(tracer=None)
# If 'id' is a string, indexing [-1] returns the last character.
serialized = {"id": "component"}
codeflash_output = handler._extract_name(serialized, fallback="fb"); result = codeflash_output
def test_id_as_tuple_returns_last_element():
handler = NativeCallbackHandler(tracer=None)
# Tuples support indexing; last element should be returned.
serialized = {"id": ("first", "second", "last_elem")}
codeflash_output = handler._extract_name(serialized, fallback="fb"); result = codeflash_output
def test_id_is_none_uses_fallback_even_if_name_missing():
handler = NativeCallbackHandler(tracer=None)
# If 'id' exists but is None (falsy), fallback should be used.
serialized = {"id": None}
codeflash_output = handler._extract_name(serialized, fallback="fb_value"); result = codeflash_output
def test_large_id_list_returns_last_element():
handler = NativeCallbackHandler(tracer=None)
# Create a large id list (1000 elements) and ensure the last element is returned.
large_id = [f"elem_{i}" for i in range(1000)]
serialized = {"id": large_id}
codeflash_output = handler._extract_name(serialized, fallback="fb"); result = codeflash_output
def test_many_iterations_with_varied_inputs_are_deterministic():
handler = NativeCallbackHandler(tracer=None)
# Prepare several serialized inputs to cycle through; this test calls the method
# many times (1000 iterations) to exercise potential edge cases repeatedly.
cases = [
({"name": "A"}, "fb", "A"),
({"name": "" , "id": ["i1", "i2"]}, "fb", "i2"),
({"id": ["only_last"]}, "fb", "only_last"),
({}, "fallback123", "fallback123"),
(None, "fallback_none", "fallback_none"),
({"id": tuple(str(i) for i in range(50))}, "fb", str(49)[-len(str(49)):]), # last element "49"
({"id": "somestring"}, "fb", "g"),
]
# Run 1000 iterations cycling through the cases to ensure consistent behavior.
for i in range(1000):
serialized, fallback, expected = cases[i % len(cases)]
codeflash_output = handler._extract_name(serialized, fallback=fallback); result = codeflash_output
# For the tuple case, expected was constructed in a slightly odd way above;
# normalize expected for the tuple case to be explicit.
if serialized is not None and isinstance(serialized.get("id"), tuple):
expected = serialized["id"][-1]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from uuid import UUID
# imports
import pytest
from langflow.services.tracing.native import NativeTracer
from langflow.services.tracing.native_callback import NativeCallbackHandler
# fixtures
@pytest.fixture
def mock_tracer():
"""Create a real NativeTracer instance for testing."""
return NativeTracer()
@pytest.fixture
def callback_handler(mock_tracer):
"""Create a NativeCallbackHandler instance with a real tracer."""
return NativeCallbackHandler(tracer=mock_tracer)To test or edit this optimization locally git merge codeflash/optimize-pr11689-2026-03-02T19.10.03
| serialized = serialized or {} | |
| return serialized.get("name") or (serialized.get("id", [fallback])[-1] if serialized.get("id") else fallback) | |
| if not serialized: | |
| return fallback | |
| name = serialized.get("name") | |
| if name: | |
| return name | |
| id_val = serialized.get("id") | |
| if id_val: | |
| return id_val[-1] | |
| return fallback |
address gabriels otel coment latest
ogabrielluiz
left a comment
There was a problem hiding this comment.
OTel GenAI compliance looks good. All previous comments addressed.
| """ | ||
|
|
||
| UNSET = "unset" | ||
| OK = "ok" |
There was a problem hiding this comment.
#12194
I just reported an issue about this enum





v0 for traces includes:
- filters: status, token usage range and datatime
- accordian rows per trace
Could add:
- more filter options. Examples: session_id, trace_id and latency range
Screen.Recording.2026-02-09.at.4.49.39.PM.mov