diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py index ce451328..e1757127 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py @@ -180,6 +180,7 @@ def _configure_internal( token_resolver=exporter_options.token_resolver, cluster_category=exporter_options.cluster_category, use_s2s_endpoint=exporter_options.use_s2s_endpoint, + max_payload_bytes=exporter_options.max_payload_bytes, ) else: diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py index fb170cec..e3c6e73b 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py @@ -18,7 +18,10 @@ from opentelemetry.trace import StatusCode from .utils import ( + DEFAULT_MAX_PAYLOAD_BYTES, build_export_url, + chunk_by_size, + estimate_span_bytes, get_validated_domain_override, hex_span_id, hex_trace_id, @@ -56,6 +59,7 @@ def __init__( token_resolver: Callable[[str, str], str | None], cluster_category: str = "prod", use_s2s_endpoint: bool = False, + max_payload_bytes: int = DEFAULT_MAX_PAYLOAD_BYTES, ): if token_resolver is None: raise ValueError("token_resolver must be provided.") @@ -65,6 +69,7 @@ def __init__( self._token_resolver = token_resolver self._cluster_category = cluster_category self._use_s2s_endpoint = use_s2s_endpoint + self._max_payload_bytes = max_payload_bytes # Read domain override once at initialization self._domain_override = get_validated_domain_override() @@ -89,8 +94,21 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: any_failure = False for (tenant_id, agent_id), activities in groups.items(): - payload = self._build_export_request(activities) - body = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) + # Map and truncate spans first, then chunk by estimated byte size + mapped_spans = self._map_and_truncate_spans(activities) + resource_attrs = self._get_resource_attributes(activities) + chunks = chunk_by_size( + mapped_spans, + lambda ms: estimate_span_bytes(ms[0]), + self._max_payload_bytes, + ) + + if len(chunks) > 1: + # Logged at DEBUG to avoid leaking tenant/agent IDs in production logs. + logger.debug( + f"Split {len(activities)} spans into {len(chunks)} chunks " + f"for tenantId: {tenant_id}, agentId: {agent_id}" + ) # Resolve endpoint: domain override > default URL if self._domain_override: @@ -128,11 +146,40 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: any_failure = True continue - # Basic retry loop - ok = self._post_with_retries(url, body, headers) - - if not ok: - any_failure = True + # Send each chunk (all-or-nothing: fail group on first chunk failure) + group_failed = False + for i, chunk in enumerate(chunks): + payload = self._build_envelope(chunk, resource_attrs) + body = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) + body_bytes = len(body.encode("utf-8")) + logger.debug( + f"Sending chunk {i + 1} of {len(chunks)} " + f"({len(chunk)} spans, {body_bytes} bytes)" + ) + # Defensive check: the estimator covers per-span content but not + # envelope overhead (resource attributes, scope wrappers). Warn if + # the assembled body exceeds the configured limit so operators can + # observe estimator drift before the server starts rejecting requests. + if body_bytes > self._max_payload_bytes: + logger.warning( + f"Chunk {i + 1} of {len(chunks)} body size ({body_bytes} bytes) " + f"exceeds max_payload_bytes ({self._max_payload_bytes}); " + "estimator may be under-counting envelope overhead. " + f"Tenant: {tenant_id}, agent: {agent_id}, spans: {len(chunk)}." + ) + + ok = self._post_with_retries(url, body, headers) + if not ok: + logger.error( + f"Chunk {i + 1} of {len(chunks)} failed for " + f"tenant {tenant_id}, agent {agent_id}" + ) + any_failure = True + group_failed = True + break + + if group_failed: + continue return SpanExportResult.FAILURE if any_failure else SpanExportResult.SUCCESS @@ -231,32 +278,47 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo # ------------- Payload mapping ------------------ - def _build_export_request(self, spans: Sequence[ReadableSpan]) -> dict[str, Any]: - # Group by instrumentation scope (name, version) - scope_map: dict[tuple[str, str | None], list[dict[str, Any]]] = {} + def _map_and_truncate_spans( + self, spans: Sequence[ReadableSpan] + ) -> list[tuple[dict[str, Any], str, str | None]]: + """Map ReadableSpans to OTLP dicts and apply per-span truncation. + Returns a list of (mapped_span, scope_name, scope_version) tuples so + that envelope grouping by instrumentation scope can be performed + efficiently after byte-size chunking. + """ + result: list[tuple[dict[str, Any], str, str | None]] = [] for sp in spans: scope = sp.instrumentation_scope - scope_key = (scope.name, scope.version) - scope_map.setdefault(scope_key, []).append(self._map_span(sp)) - - scope_spans: list[dict[str, Any]] = [] - for (name, version), mapped_spans in scope_map.items(): - scope_spans.append( - { - "scope": { - "name": name, - "version": version, - }, - "spans": mapped_spans, - } - ) + scope_name = scope.name if scope is not None else "unknown" + scope_version = scope.version if scope is not None else None + result.append((self._map_span(sp), scope_name, scope_version)) + return result - # Resource attributes (from the first span – all spans in a batch usually share resource) - # If you need to merge across spans, adapt accordingly. - resource_attrs = {} + @staticmethod + def _get_resource_attributes(spans: Sequence[ReadableSpan]) -> dict[str, Any]: + """Extract resource attributes from the first span in the batch.""" if spans: - resource_attrs = dict(getattr(spans[0].resource, "attributes", {}) or {}) + return dict(getattr(spans[0].resource, "attributes", {}) or {}) + return {} + + def _build_envelope( + self, + mapped_spans: Sequence[tuple[dict[str, Any], str, str | None]], + resource_attrs: dict[str, Any], + ) -> dict[str, Any]: + """Build an OTLP export request envelope from pre-mapped spans.""" + scope_map: dict[tuple[str, str | None], list[dict[str, Any]]] = {} + for mapped_span, scope_name, scope_version in mapped_spans: + scope_map.setdefault((scope_name, scope_version), []).append(mapped_span) + + scope_spans: list[dict[str, Any]] = [ + { + "scope": {"name": name, "version": version}, + "spans": spans, + } + for (name, version), spans in scope_map.items() + ] return { "resourceSpans": [ diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter_options.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter_options.py index 894f017d..a3981df8 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter_options.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter_options.py @@ -3,6 +3,8 @@ from typing import Awaitable, Callable, Optional +from .utils import DEFAULT_MAX_PAYLOAD_BYTES + class Agent365ExporterOptions: """ @@ -19,6 +21,7 @@ def __init__( scheduled_delay_ms: int = 5000, exporter_timeout_ms: int = 30000, max_export_batch_size: int = 512, + max_payload_bytes: int = DEFAULT_MAX_PAYLOAD_BYTES, ): """ Args: @@ -29,6 +32,10 @@ def __init__( scheduled_delay_ms: Delay between export batches (ms). Default is 5000. exporter_timeout_ms: Timeout for the export operation (ms). Default is 30000. max_export_batch_size: Maximum batch size for export operations. Default is 512. + max_payload_bytes: Upper bound on HTTP request body size in bytes. The exporter + splits per-identity batches into sub-batches whose estimated size stays under + this limit, providing headroom under the A365 1 MB server limit. Default is + 900_000 (~100 KB headroom for estimator error and JSON envelope overhead). """ self.cluster_category = cluster_category self.token_resolver = token_resolver @@ -37,3 +44,4 @@ def __init__( self.scheduled_delay_ms = scheduled_delay_ms self.exporter_timeout_ms = exporter_timeout_ms self.max_export_batch_size = max_export_batch_size + self.max_payload_bytes = max_payload_bytes diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py index aabe9953..36906f0d 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py @@ -6,8 +6,8 @@ import json import logging import os -from collections.abc import Sequence -from typing import Any +from collections.abc import Callable, Sequence +from typing import Any, TypeVar from urllib.parse import urlparse from opentelemetry.sdk.trace import ReadableSpan @@ -262,3 +262,133 @@ def is_agent365_exporter_enabled() -> bool: # Check environment variable enable_exporter = os.getenv(ENABLE_A365_OBSERVABILITY_EXPORTER, "").lower() return (enable_exporter) in ("true", "1", "yes", "on") + + +# --------------------------------------------------------------------------- +# Span size estimation and byte-level chunking +# --------------------------------------------------------------------------- + +# Default upper bound on HTTP request body size in bytes. Provides ~100 KB +# headroom under the A365 1 MB server limit for estimator error and JSON/ +# envelope overhead (e.g. resource attributes and scope wrappers). +DEFAULT_MAX_PAYLOAD_BYTES = 900_000 + +# Overhead constant for OTLP JSON span fixed fields (traceId, spanId, +# parentSpanId, kind, timestamps, status, scope wrapper, etc.). Intentionally +# generous to account for serializer variance. +_SPAN_BASE_OVERHEAD = 2000 + +# Overhead per attribute in OTLP JSON format. Covers key/value JSON wrapping. +_ATTR_OVERHEAD = 80 + +# Overhead per event in OTLP JSON. +_EVENT_OVERHEAD = 200 + + +def _utf8_len(s: str) -> int: + return len(s.encode("utf-8")) + + +def estimate_value_bytes(value: Any) -> int: + """Estimate the serialized byte size of a single attribute value in OTLP/HTTP JSON.""" + if isinstance(value, str): + return 40 + _utf8_len(value) + # bool is a subclass of int; check before sequence/list handling below + if isinstance(value, bool): + return 40 + if isinstance(value, (list, tuple)): + if len(value) == 0: + return 60 + first = value[0] + if isinstance(first, str): + total = 60 + for s in value: + total += 40 + _utf8_len(str(s)) + return total + return 60 + 50 * len(value) + return 40 # int/float/None/other + + +def estimate_span_bytes(span: dict[str, Any]) -> int: + """Heuristic estimator for the serialized size of an OTLP span in HTTP JSON. + + Uses generous constants tuned to over-estimate by ~25-50%, providing + headroom for JSON serializer variance (whitespace, enum representation, + integer-as-string). + """ + total = _SPAN_BASE_OVERHEAD + name = span.get("name") + if isinstance(name, str): + total += _utf8_len(name) + + attributes = span.get("attributes") + if attributes: + for key, value in attributes.items(): + total += _ATTR_OVERHEAD + total += _utf8_len(str(key)) + total += estimate_value_bytes(value) + + events = span.get("events") + if events: + for ev in events: + total += _EVENT_OVERHEAD + ev_name = ev.get("name") if isinstance(ev, dict) else None + if isinstance(ev_name, str): + total += _utf8_len(ev_name) + ev_attrs = ev.get("attributes") if isinstance(ev, dict) else None + if ev_attrs: + for key, value in ev_attrs.items(): + total += _ATTR_OVERHEAD + total += _utf8_len(str(key)) + total += estimate_value_bytes(value) + return total + + +T = TypeVar("T") + + +def chunk_by_size( + items: Sequence[T], + get_size: Callable[[T], int], + max_chunk_bytes: int, +) -> list[list[T]]: + """Split items into sub-batches whose cumulative estimated size stays under ``max_chunk_bytes``. + + Multi-item chunks are guaranteed to stay within the limit. A single item + whose estimated size exceeds ``max_chunk_bytes`` forms its own one-item + chunk (never silently dropped) even though that chunk exceeds the limit. + + Invariants: + - Input order is preserved across chunks. + - Empty input produces empty output. + - No item is ever dropped. + - No chunk is ever empty. + + Raises: + ValueError: If ``max_chunk_bytes`` is not positive, or if ``get_size`` + returns a negative value for any item. + """ + if max_chunk_bytes <= 0: + raise ValueError(f"max_chunk_bytes must be positive, got {max_chunk_bytes}") + + chunks: list[list[T]] = [] + current: list[T] = [] + current_bytes = 0 + + for item in items: + item_bytes = get_size(item) + if item_bytes < 0: + raise ValueError( + f"get_size returned a negative value ({item_bytes}); sizes must be non-negative" + ) + if current and current_bytes + item_bytes > max_chunk_bytes: + chunks.append(current) + current = [] + current_bytes = 0 + current.append(item) + current_bytes += item_bytes + + if current: + chunks.append(current) + + return chunks diff --git a/tests/observability/core/exporters/test_payload_chunking.py b/tests/observability/core/exporters/test_payload_chunking.py new file mode 100644 index 00000000..4ffe7702 --- /dev/null +++ b/tests/observability/core/exporters/test_payload_chunking.py @@ -0,0 +1,241 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for payload byte-size chunking in the Agent 365 exporter.""" + +from __future__ import annotations + +import json +import unittest +from unittest.mock import Mock, patch + +from microsoft_agents_a365.observability.core.constants import ( + GEN_AI_AGENT_ID_KEY, + TENANT_ID_KEY, +) +from microsoft_agents_a365.observability.core.exporters.agent365_exporter import ( + _Agent365Exporter, +) +from microsoft_agents_a365.observability.core.exporters.utils import ( + chunk_by_size, + estimate_span_bytes, + estimate_value_bytes, + truncate_span, +) +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult +from opentelemetry.trace import StatusCode + + +def _make_otlp_span(attrs: dict, name: str = "test") -> dict: + return { + "traceId": "00000000000000000000000000000001", + "spanId": "0000000000000002", + "name": name, + "kind": "INTERNAL", + "startTimeUnixNano": 1000000000, + "endTimeUnixNano": 2000000000, + "attributes": attrs, + "events": None, + "links": None, + "status": {"code": "UNSET", "message": ""}, + } + + +class TestEstimateSpanBytes(unittest.TestCase): + def test_over_estimates_relative_to_actual_json_size(self) -> None: + span = _make_otlp_span({ + "gen_ai.system": "openai", + "gen_ai.tool.arguments": "x" * 1000, + "gen_ai.tool.call_result": "y" * 1000, + }) + actual = len(json.dumps(span).encode("utf-8")) + self.assertGreaterEqual(estimate_span_bytes(span), actual) + + def test_grows_with_attribute_content(self) -> None: + small = _make_otlp_span({"key": "val"}) + large = _make_otlp_span({"key": "x" * 10000}) + self.assertGreater(estimate_span_bytes(large), estimate_span_bytes(small)) + + def test_accounts_for_events(self) -> None: + base = _make_otlp_span({}) + with_events = {**base, "events": [{"name": "ev", "attributes": {"k": "v"}}]} + self.assertGreater(estimate_span_bytes(with_events), estimate_span_bytes(base)) + + +class TestEstimateValueBytes(unittest.TestCase): + def test_handles_all_value_types(self) -> None: + self.assertEqual(estimate_value_bytes("hello"), 40 + 5) + self.assertEqual(estimate_value_bytes([]), 60) + self.assertEqual(estimate_value_bytes(["a", "bb"]), 60 + (40 + 1) + (40 + 2)) + self.assertEqual(estimate_value_bytes([1, 2]), 60 + 50 * 2) + self.assertEqual(estimate_value_bytes(True), 40) + self.assertEqual(estimate_value_bytes(42), 40) + self.assertEqual(estimate_value_bytes(None), 40) + + +class TestChunkBySize(unittest.TestCase): + def _get_size(self, item: dict) -> int: + return int(item["size"]) + + def test_empty_input_returns_empty_output(self) -> None: + self.assertEqual(chunk_by_size([], self._get_size, 900_000), []) + + def test_small_items_fit_in_one_chunk(self) -> None: + items = [{"id": f"s{i}", "size": 100} for i in range(10)] + chunks = chunk_by_size(items, self._get_size, 900_000) + self.assertEqual(len(chunks), 1) + self.assertEqual(len(chunks[0]), 10) + + def test_splits_when_cumulative_exceeds_limit_and_preserves_order(self) -> None: + items = [{"id": f"s{i}", "size": 300_000} for i in range(5)] + chunks = chunk_by_size(items, self._get_size, 900_000) + self.assertGreaterEqual(len(chunks), 2) + flat_ids = [item["id"] for chunk in chunks for item in chunk] + self.assertEqual(flat_ids, ["s0", "s1", "s2", "s3", "s4"]) + + def test_oversize_single_item_gets_its_own_chunk(self) -> None: + chunks = chunk_by_size([{"id": "big", "size": 2_000_000}], self._get_size, 900_000) + self.assertEqual(len(chunks), 1) + self.assertEqual(chunks[0][0]["id"], "big") + + def test_multi_item_chunks_respect_limit_no_chunk_is_empty(self) -> None: + items = [{"id": f"s{i}", "size": 200_000} for i in range(5)] + chunks = chunk_by_size(items, self._get_size, 500_000) + for chunk in chunks: + self.assertGreater(len(chunk), 0) + if len(chunk) > 1: + self.assertLessEqual(sum(item["size"] for item in chunk), 500_000) + + def test_rejects_zero_max_chunk_bytes(self) -> None: + with self.assertRaises(ValueError): + chunk_by_size([{"id": "s", "size": 1}], self._get_size, 0) + + def test_rejects_negative_max_chunk_bytes(self) -> None: + with self.assertRaises(ValueError): + chunk_by_size([{"id": "s", "size": 1}], self._get_size, -1) + + def test_rejects_negative_item_size(self) -> None: + with self.assertRaises(ValueError): + chunk_by_size([{"id": "s", "size": -1}], self._get_size, 100) + + +class TestTruncateSpanVerification(unittest.TestCase): + MAX = 250 * 1024 + + def test_leaves_small_span_unchanged(self) -> None: + span = _make_otlp_span({"k": "small"}) + self.assertEqual(truncate_span(span)["attributes"]["k"], "small") + + def test_shrinks_oversize_span_below_limit(self) -> None: + span = _make_otlp_span({"small": "ok", "fat": "x" * 300_000}) + result = truncate_span(span) + self.assertLessEqual(len(json.dumps(result).encode("utf-8")), self.MAX) + self.assertEqual(result["attributes"]["small"], "ok") + self.assertEqual(result["attributes"]["fat"], "TRUNCATED") + + +class TestExporterChunking(unittest.TestCase): + """End-to-end test: the exporter splits oversized batches into multiple HTTP requests.""" + + def setUp(self) -> None: + self.token_resolver = Mock(return_value="test_token") + + def _make_span(self, span_id: int, attribute_size: int) -> ReadableSpan: + mock_span = Mock(spec=ReadableSpan) + mock_span.name = f"span_{span_id}" + + ctx = Mock() + ctx.trace_id = 0x1 + ctx.span_id = span_id + mock_span.context = ctx + mock_span.parent = None + mock_span.start_time = 1640995200000000000 + mock_span.end_time = 1640995260000000000 + + status = Mock() + status.status_code = StatusCode.OK + status.description = "" + mock_span.status = status + + kind = Mock() + kind.name = "INTERNAL" + mock_span.kind = kind + + mock_span.attributes = { + TENANT_ID_KEY: "tenant-1", + GEN_AI_AGENT_ID_KEY: "agent-1", + "payload": "x" * attribute_size, + } + mock_span.events = [] + mock_span.links = [] + + scope = Mock() + scope.name = "test.scope" + scope.version = "1.0" + mock_span.instrumentation_scope = scope + + resource = Mock() + resource.attributes = {"service.name": "test"} + mock_span.resource = resource + return mock_span + + def test_oversized_batch_is_split_into_multiple_requests(self) -> None: + exporter = _Agent365Exporter( + token_resolver=self.token_resolver, + cluster_category="test", + max_payload_bytes=300_000, + ) + + # Each span carries ~200 KB of payload; with a 300 KB chunk limit, + # 5 spans should yield at least 2 chunks. + spans = [self._make_span(span_id=i + 1, attribute_size=200_000) for i in range(5)] + + with patch.object(exporter, "_post_with_retries", return_value=True) as mock_post: + result = exporter.export(spans) + + self.assertEqual(result, SpanExportResult.SUCCESS) + self.assertGreaterEqual(mock_post.call_count, 2) + + # Each chunk body must be valid JSON with the expected envelope structure. + total_spans_sent = 0 + for call in mock_post.call_args_list: + url, body, headers = call.args + data = json.loads(body) + self.assertIn("resourceSpans", data) + scope_spans = data["resourceSpans"][0]["scopeSpans"] + for ss in scope_spans: + total_spans_sent += len(ss["spans"]) + self.assertEqual(total_spans_sent, len(spans)) + + def test_chunk_failure_short_circuits_remaining_chunks(self) -> None: + exporter = _Agent365Exporter( + token_resolver=self.token_resolver, + cluster_category="test", + max_payload_bytes=300_000, + ) + spans = [self._make_span(span_id=i + 1, attribute_size=200_000) for i in range(5)] + + with patch.object(exporter, "_post_with_retries", return_value=False) as mock_post: + result = exporter.export(spans) + + self.assertEqual(result, SpanExportResult.FAILURE) + # First chunk fails; remaining chunks must not be sent. + self.assertEqual(mock_post.call_count, 1) + + def test_small_batch_uses_single_request(self) -> None: + exporter = _Agent365Exporter( + token_resolver=self.token_resolver, + cluster_category="test", + ) + spans = [self._make_span(span_id=1, attribute_size=100)] + + with patch.object(exporter, "_post_with_retries", return_value=True) as mock_post: + result = exporter.export(spans) + + self.assertEqual(result, SpanExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/observability/core/test_agent365.py b/tests/observability/core/test_agent365.py index ac8adaed..5ae441c5 100644 --- a/tests/observability/core/test_agent365.py +++ b/tests/observability/core/test_agent365.py @@ -120,6 +120,7 @@ def test_batch_span_processor_and_exporter_called_with_correct_values( token_resolver=self.mock_token_resolver, cluster_category="staging", use_s2s_endpoint=True, + max_payload_bytes=900_000, ) # Verify BatchSpanProcessor was called with correct parameters from exporter_options diff --git a/tests/observability/core/test_spectra_exporter.py b/tests/observability/core/test_spectra_exporter.py index e4d19603..b454d61b 100644 --- a/tests/observability/core/test_spectra_exporter.py +++ b/tests/observability/core/test_spectra_exporter.py @@ -188,6 +188,7 @@ def test_configure_with_agent365_options_unchanged(self, mock_is_enabled, mock_e token_resolver=mock_token_resolver, cluster_category="staging", use_s2s_endpoint=True, + max_payload_bytes=900_000, ) @patch("microsoft_agents_a365.observability.core.config.OTLPSpanExporter")