Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
hex_trace_id,
kind_name,
parse_retry_after,
partition_by_identity,
filter_and_partition_by_identity,
status_name,
truncate_span,
)
Expand Down Expand Up @@ -80,10 +80,10 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return SpanExportResult.FAILURE

try:
groups = partition_by_identity(spans)
groups = filter_and_partition_by_identity(spans)
if not groups:
# No spans with identity; treat as success
logger.info("No spans with tenant/agent identity found; nothing exported.")
# No eligible genAI spans to export after filtering/partitioning; treat as success
logger.info("No eligible genAI spans to export; nothing exported.")
return SpanExportResult.SUCCESS

# Log number of groups and total span count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,35 @@
from opentelemetry.trace import SpanKind, StatusCode

from ..constants import (
CHAT_OPERATION_NAME,
ENABLE_A365_OBSERVABILITY_EXPORTER,
EXECUTE_TOOL_OPERATION_NAME,
GEN_AI_AGENT_ID_KEY,
GEN_AI_OPERATION_NAME_KEY,
INVOKE_AGENT_OPERATION_NAME,
OUTPUT_MESSAGES_OPERATION_NAME,
TENANT_ID_KEY,
)
from ..inference_operation_type import InferenceOperationType

logger = logging.getLogger(__name__)

# Maximum allowed span size in bytes (250KB)
MAX_SPAN_SIZE_BYTES = 250 * 1024

# Operation names that identify a span as eligible for export to the Agent 365
# observability ingest service. Only spans whose gen_ai.operation.name matches
# one of these values are included; all other spans are filtered out.
GEN_AI_OPERATION_NAMES: frozenset[str] = frozenset(
{
INVOKE_AGENT_OPERATION_NAME,
EXECUTE_TOOL_OPERATION_NAME,
OUTPUT_MESSAGES_OPERATION_NAME,
CHAT_OPERATION_NAME,
InferenceOperationType.CHAT.value,
}
)


def hex_trace_id(value: int) -> str:
# 128-bit -> 32 hex chars
Expand Down Expand Up @@ -127,22 +146,44 @@ def truncate_span(span_dict: dict[str, Any]) -> dict[str, Any]:
return span_dict


def partition_by_identity(
def filter_and_partition_by_identity(
spans: Sequence[ReadableSpan],
) -> dict[tuple[str, str], list[ReadableSpan]]:
Comment thread
alexlu4250 marked this conversation as resolved.
"""
Extract (tenantId, agentId). Prefer attributes; if you also stamp baggage
into attributes via a processor, they'll be here already.
Filter export-eligible spans and partition them by (tenantId, agentId).

Only spans whose ``gen_ai.operation.name`` is in
``GEN_AI_OPERATION_NAMES`` are included; non-genAI spans (e.g. HTTP, DB)
and spans with other operation names are filtered out. Spans without
both tenant and agent identity are also skipped.
"""
groups: dict[tuple[str, str], list[ReadableSpan]] = {}
non_gen_ai_count = 0
missing_identity_count = 0
for sp in spans:
attrs = sp.attributes or {}
operation_name = as_str(attrs.get(GEN_AI_OPERATION_NAME_KEY))
if not operation_name or operation_name not in GEN_AI_OPERATION_NAMES:
non_gen_ai_count += 1
Comment thread
alexlu4250 marked this conversation as resolved.
continue
tenant = as_str(attrs.get(TENANT_ID_KEY))
agent = as_str(attrs.get(GEN_AI_AGENT_ID_KEY))
if not tenant or not agent:
missing_identity_count += 1
continue
key = (tenant, agent)
groups.setdefault(key, []).append(sp)

if non_gen_ai_count > 0:
logger.debug(
"[Agent365Exporter] %d spans without an eligible gen_ai.operation.name filtered out",
non_gen_ai_count,
)
if missing_identity_count > 0:
Comment thread
alexlu4250 marked this conversation as resolved.
logger.debug(
"[Agent365Exporter] %d spans skipped due to missing tenant or agent ID",
missing_identity_count,
)
return groups


Expand Down
3 changes: 3 additions & 0 deletions tests/observability/core/exporters/test_payload_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from unittest.mock import Mock, patch

from microsoft_agents_a365.observability.core.constants import (
CHAT_OPERATION_NAME,
GEN_AI_AGENT_ID_KEY,
GEN_AI_OPERATION_NAME_KEY,
TENANT_ID_KEY,
)
from microsoft_agents_a365.observability.core.exporters.agent365_exporter import (
Expand Down Expand Up @@ -165,6 +167,7 @@ def _make_span(self, span_id: int, attribute_size: int) -> ReadableSpan:
mock_span.attributes = {
TENANT_ID_KEY: "tenant-1",
GEN_AI_AGENT_ID_KEY: "agent-1",
GEN_AI_OPERATION_NAME_KEY: CHAT_OPERATION_NAME,
"payload": "x" * attribute_size,
}
mock_span.events = []
Expand Down
109 changes: 103 additions & 6 deletions tests/observability/core/test_agent365_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import unittest
from unittest.mock import Mock, patch

from microsoft_agents_a365.observability.core.constants import GEN_AI_AGENT_ID_KEY, TENANT_ID_KEY
from microsoft_agents_a365.observability.core.constants import (
GEN_AI_AGENT_ID_KEY,
GEN_AI_OPERATION_NAME_KEY,
INVOKE_AGENT_OPERATION_NAME,
TENANT_ID_KEY,
)
from microsoft_agents_a365.observability.core.exporters.agent365_exporter import (
DEFAULT_ENDPOINT_URL,
_Agent365Exporter,
Expand Down Expand Up @@ -54,6 +59,7 @@ def _create_mock_span(
scope_version: str = "1.0.0",
tenant_id: str = "test-tenant-123",
agent_id: str = "test-agent-456",
operation_name: str | None = INVOKE_AGENT_OPERATION_NAME,
) -> ReadableSpan:
"""Create a mock ReadableSpan for testing."""
mock_span = Mock(spec=ReadableSpan)
Expand All @@ -78,13 +84,15 @@ def _create_mock_span(
mock_span.kind = Mock()
mock_span.kind.name = "INTERNAL"

# Add identity attributes for partition_by_identity to work
# Add identity attributes for filter_and_partition_by_identity to work
span_attributes = attributes or {}
if tenant_id and agent_id:
span_attributes.update({
TENANT_ID_KEY: tenant_id,
GEN_AI_AGENT_ID_KEY: agent_id,
})
if operation_name is not None and GEN_AI_OPERATION_NAME_KEY not in span_attributes:
span_attributes[GEN_AI_OPERATION_NAME_KEY] = operation_name

mock_span.attributes = span_attributes
mock_span.events = []
Expand Down Expand Up @@ -350,10 +358,8 @@ def test_export_error_logging(self, mock_logger):
# Verify export succeeded (no identity spans are treated as success)
self.assertEqual(result, SpanExportResult.SUCCESS)

# Verify info log for no identity
mock_logger.info.assert_called_with(
"No spans with tenant/agent identity found; nothing exported."
)
# Verify info log for no eligible spans
mock_logger.info.assert_called_with("No eligible genAI spans to export; nothing exported.")

def test_exporter_is_internal(self):
"""Test that _Agent365Exporter is marked as internal/private.
Expand Down Expand Up @@ -657,6 +663,97 @@ def test_export_no_fallback_when_default_succeeds(self):
self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_called_once()

def test_export_filters_out_non_genai_spans(self):
"""Spans without a known gen_ai.operation.name are filtered out."""
# Arrange: one genAI span and two non-genAI spans (no/unknown operation name)
genai_span = self._create_mock_span("genai_span", trace_id=1, span_id=2)
no_op_span = self._create_mock_span("http_span", trace_id=3, span_id=4, operation_name=None)
unknown_op_span = self._create_mock_span(
"db_span", trace_id=5, span_id=6, operation_name="some_random_op"
)

with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post:
# Act
result = self.exporter.export([genai_span, no_op_span, unknown_op_span])

# Assert: only the genAI span is exported
self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_called_once()
_, body, _ = mock_post.call_args[0]
request_data = json.loads(body)
spans_out = request_data["resourceSpans"][0]["scopeSpans"][0]["spans"]
self.assertEqual(len(spans_out), 1)
self.assertEqual(spans_out[0]["name"], "genai_span")

def test_export_filters_out_only_non_genai_spans_returns_success(self):
"""When all spans are filtered out, export returns SUCCESS without HTTP call."""
# Arrange
spans = [
self._create_mock_span("http_span", operation_name=None),
self._create_mock_span("db_span", operation_name="other"),
]

with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post:
# Act
result = self.exporter.export(spans)

# Assert
self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_not_called()

def test_export_includes_inference_operation_type_chat_spans(self):
"""Spans with InferenceOperationType.CHAT value ('Chat') are kept without normalization."""
# Arrange — server accepts 'Chat' via case-insensitive matching
chat_span = self._create_mock_span(
"chat_span", trace_id=1, span_id=2, operation_name="Chat"
)

with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post:
result = self.exporter.export([chat_span])

self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_called_once()
_, body, _ = mock_post.call_args[0]
request_data = json.loads(body)
spans_out = request_data["resourceSpans"][0]["scopeSpans"][0]["spans"]
self.assertEqual(len(spans_out), 1)
# Value is preserved as-is; no normalization
self.assertEqual(spans_out[0]["attributes"][GEN_AI_OPERATION_NAME_KEY], "Chat")

def test_export_filters_out_unsupported_inference_operation_types(self):
"""Spans with TextCompletion / GenerateContent are filtered out."""
text_completion_span = self._create_mock_span(
"text_completion_span", trace_id=3, span_id=4, operation_name="TextCompletion"
)
generate_content_span = self._create_mock_span(
"generate_content_span", trace_id=5, span_id=6, operation_name="GenerateContent"
)
Comment thread
alexlu4250 marked this conversation as resolved.

with patch.object(self.exporter, "_post_with_retries", return_value=True) as mock_post:
result = self.exporter.export([text_completion_span, generate_content_span])

# Both are filtered out — nothing to export
self.assertEqual(result, SpanExportResult.SUCCESS)
mock_post.assert_not_called()

def test_export_does_not_normalize_canonical_operation_names(self):
"""invoke_agent / execute_tool / output_messages / chat are not rewritten."""
cases = ["invoke_agent", "execute_tool", "output_messages", "chat"]
for op in cases:
with self.subTest(operation_name=op):
span = self._create_mock_span(
f"{op}_span", trace_id=1, span_id=2, operation_name=op
)
with patch.object(
self.exporter, "_post_with_retries", return_value=True
) as mock_post:
result = self.exporter.export([span])
self.assertEqual(result, SpanExportResult.SUCCESS)
_, body, _ = mock_post.call_args[0]
request_data = json.loads(body)
span_out = request_data["resourceSpans"][0]["scopeSpans"][0]["spans"][0]
self.assertEqual(span_out["attributes"][GEN_AI_OPERATION_NAME_KEY], op)


if __name__ == "__main__":
unittest.main()
Loading