Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def _configure_internal(
token_resolver=exporter_options.token_resolver,
cluster_category=exporter_options.cluster_category,
use_s2s_endpoint=exporter_options.use_s2s_endpoint,
max_payload_bytes=exporter_options.max_payload_bytes,
)

else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from opentelemetry.trace import StatusCode

from .utils import (
DEFAULT_MAX_PAYLOAD_BYTES,
build_export_url,
chunk_by_size,
estimate_span_bytes,
get_validated_domain_override,
hex_span_id,
hex_trace_id,
Expand Down Expand Up @@ -56,6 +59,7 @@ def __init__(
token_resolver: Callable[[str, str], str | None],
cluster_category: str = "prod",
use_s2s_endpoint: bool = False,
max_payload_bytes: int = DEFAULT_MAX_PAYLOAD_BYTES,
):
Comment thread
alexlu4250 marked this conversation as resolved.
if token_resolver is None:
raise ValueError("token_resolver must be provided.")
Expand All @@ -65,6 +69,7 @@ def __init__(
self._token_resolver = token_resolver
self._cluster_category = cluster_category
self._use_s2s_endpoint = use_s2s_endpoint
self._max_payload_bytes = max_payload_bytes
# Read domain override once at initialization
self._domain_override = get_validated_domain_override()

Expand All @@ -89,8 +94,21 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:

any_failure = False
for (tenant_id, agent_id), activities in groups.items():
payload = self._build_export_request(activities)
body = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
# Map and truncate spans first, then chunk by estimated byte size
mapped_spans = self._map_and_truncate_spans(activities)
resource_attrs = self._get_resource_attributes(activities)
chunks = chunk_by_size(
mapped_spans,
lambda ms: estimate_span_bytes(ms[0]),
self._max_payload_bytes,
)

if len(chunks) > 1:
# Logged at DEBUG to avoid leaking tenant/agent IDs in production logs.
logger.debug(
f"Split {len(activities)} spans into {len(chunks)} chunks "
f"for tenantId: {tenant_id}, agentId: {agent_id}"
)

# Resolve endpoint: domain override > default URL
if self._domain_override:
Expand Down Expand Up @@ -128,11 +146,40 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
any_failure = True
continue

# Basic retry loop
ok = self._post_with_retries(url, body, headers)

if not ok:
any_failure = True
# Send each chunk (all-or-nothing: fail group on first chunk failure)
group_failed = False
for i, chunk in enumerate(chunks):
payload = self._build_envelope(chunk, resource_attrs)
body = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
body_bytes = len(body.encode("utf-8"))
logger.debug(
f"Sending chunk {i + 1} of {len(chunks)} "
Comment thread
alexlu4250 marked this conversation as resolved.
f"({len(chunk)} spans, {body_bytes} bytes)"
)
# Defensive check: the estimator covers per-span content but not
# envelope overhead (resource attributes, scope wrappers). Warn if
# the assembled body exceeds the configured limit so operators can
# observe estimator drift before the server starts rejecting requests.
if body_bytes > self._max_payload_bytes:
logger.warning(
f"Chunk {i + 1} of {len(chunks)} body size ({body_bytes} bytes) "
f"exceeds max_payload_bytes ({self._max_payload_bytes}); "
"estimator may be under-counting envelope overhead. "
f"Tenant: {tenant_id}, agent: {agent_id}, spans: {len(chunk)}."
)

ok = self._post_with_retries(url, body, headers)
if not ok:
logger.error(
f"Chunk {i + 1} of {len(chunks)} failed for "
f"tenant {tenant_id}, agent {agent_id}"
)
any_failure = True
group_failed = True
break

if group_failed:
continue

return SpanExportResult.FAILURE if any_failure else SpanExportResult.SUCCESS

Expand Down Expand Up @@ -231,32 +278,47 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo

# ------------- Payload mapping ------------------

def _build_export_request(self, spans: Sequence[ReadableSpan]) -> dict[str, Any]:
# Group by instrumentation scope (name, version)
scope_map: dict[tuple[str, str | None], list[dict[str, Any]]] = {}
def _map_and_truncate_spans(
self, spans: Sequence[ReadableSpan]
) -> list[tuple[dict[str, Any], str, str | None]]:
"""Map ReadableSpans to OTLP dicts and apply per-span truncation.

Returns a list of (mapped_span, scope_name, scope_version) tuples so
that envelope grouping by instrumentation scope can be performed
efficiently after byte-size chunking.
"""
result: list[tuple[dict[str, Any], str, str | None]] = []
for sp in spans:
scope = sp.instrumentation_scope
scope_key = (scope.name, scope.version)
scope_map.setdefault(scope_key, []).append(self._map_span(sp))

scope_spans: list[dict[str, Any]] = []
for (name, version), mapped_spans in scope_map.items():
scope_spans.append(
{
"scope": {
"name": name,
"version": version,
},
"spans": mapped_spans,
}
)
scope_name = scope.name if scope is not None else "unknown"
scope_version = scope.version if scope is not None else None
result.append((self._map_span(sp), scope_name, scope_version))
return result

# Resource attributes (from the first span – all spans in a batch usually share resource)
# If you need to merge across spans, adapt accordingly.
resource_attrs = {}
@staticmethod
def _get_resource_attributes(spans: Sequence[ReadableSpan]) -> dict[str, Any]:
"""Extract resource attributes from the first span in the batch."""
if spans:
resource_attrs = dict(getattr(spans[0].resource, "attributes", {}) or {})
return dict(getattr(spans[0].resource, "attributes", {}) or {})
return {}

def _build_envelope(
self,
mapped_spans: Sequence[tuple[dict[str, Any], str, str | None]],
resource_attrs: dict[str, Any],
) -> dict[str, Any]:
"""Build an OTLP export request envelope from pre-mapped spans."""
scope_map: dict[tuple[str, str | None], list[dict[str, Any]]] = {}
for mapped_span, scope_name, scope_version in mapped_spans:
scope_map.setdefault((scope_name, scope_version), []).append(mapped_span)

scope_spans: list[dict[str, Any]] = [
{
"scope": {"name": name, "version": version},
"spans": spans,
}
for (name, version), spans in scope_map.items()
]

return {
"resourceSpans": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from typing import Awaitable, Callable, Optional

from .utils import DEFAULT_MAX_PAYLOAD_BYTES


class Agent365ExporterOptions:
"""
Expand All @@ -19,6 +21,7 @@ def __init__(
scheduled_delay_ms: int = 5000,
exporter_timeout_ms: int = 30000,
max_export_batch_size: int = 512,
max_payload_bytes: int = DEFAULT_MAX_PAYLOAD_BYTES,
):
"""
Args:
Expand All @@ -29,6 +32,10 @@ def __init__(
scheduled_delay_ms: Delay between export batches (ms). Default is 5000.
exporter_timeout_ms: Timeout for the export operation (ms). Default is 30000.
max_export_batch_size: Maximum batch size for export operations. Default is 512.
max_payload_bytes: Upper bound on HTTP request body size in bytes. The exporter
splits per-identity batches into sub-batches whose estimated size stays under
this limit, providing headroom under the A365 1 MB server limit. Default is
900_000 (~100 KB headroom for estimator error and JSON envelope overhead).
"""
self.cluster_category = cluster_category
self.token_resolver = token_resolver
Expand All @@ -37,3 +44,4 @@ def __init__(
self.scheduled_delay_ms = scheduled_delay_ms
self.exporter_timeout_ms = exporter_timeout_ms
self.max_export_batch_size = max_export_batch_size
self.max_payload_bytes = max_payload_bytes
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import json
import logging
import os
from collections.abc import Sequence
from typing import Any
from collections.abc import Callable, Sequence
from typing import Any, TypeVar
from urllib.parse import urlparse

from opentelemetry.sdk.trace import ReadableSpan
Expand Down Expand Up @@ -262,3 +262,133 @@ def is_agent365_exporter_enabled() -> bool:
# Check environment variable
enable_exporter = os.getenv(ENABLE_A365_OBSERVABILITY_EXPORTER, "").lower()
return (enable_exporter) in ("true", "1", "yes", "on")


# ---------------------------------------------------------------------------
# Span size estimation and byte-level chunking
# ---------------------------------------------------------------------------

# Default upper bound on HTTP request body size in bytes. Provides ~100 KB
# headroom under the A365 1 MB server limit for estimator error and JSON/
# envelope overhead (e.g. resource attributes and scope wrappers).
DEFAULT_MAX_PAYLOAD_BYTES = 900_000

# Overhead constant for OTLP JSON span fixed fields (traceId, spanId,
# parentSpanId, kind, timestamps, status, scope wrapper, etc.). Intentionally
# generous to account for serializer variance.
_SPAN_BASE_OVERHEAD = 2000

# Overhead per attribute in OTLP JSON format. Covers key/value JSON wrapping.
_ATTR_OVERHEAD = 80

# Overhead per event in OTLP JSON.
_EVENT_OVERHEAD = 200


def _utf8_len(s: str) -> int:
return len(s.encode("utf-8"))


def estimate_value_bytes(value: Any) -> int:
"""Estimate the serialized byte size of a single attribute value in OTLP/HTTP JSON."""
if isinstance(value, str):
return 40 + _utf8_len(value)
# bool is a subclass of int; check before sequence/list handling below
if isinstance(value, bool):
return 40
if isinstance(value, (list, tuple)):
if len(value) == 0:
return 60
first = value[0]
if isinstance(first, str):
total = 60
for s in value:
total += 40 + _utf8_len(str(s))
return total
return 60 + 50 * len(value)
return 40 # int/float/None/other


def estimate_span_bytes(span: dict[str, Any]) -> int:
"""Heuristic estimator for the serialized size of an OTLP span in HTTP JSON.

Uses generous constants tuned to over-estimate by ~25-50%, providing
headroom for JSON serializer variance (whitespace, enum representation,
integer-as-string).
"""
total = _SPAN_BASE_OVERHEAD
name = span.get("name")
if isinstance(name, str):
total += _utf8_len(name)

attributes = span.get("attributes")
if attributes:
for key, value in attributes.items():
total += _ATTR_OVERHEAD
total += _utf8_len(str(key))
total += estimate_value_bytes(value)

events = span.get("events")
if events:
for ev in events:
total += _EVENT_OVERHEAD
ev_name = ev.get("name") if isinstance(ev, dict) else None
if isinstance(ev_name, str):
total += _utf8_len(ev_name)
ev_attrs = ev.get("attributes") if isinstance(ev, dict) else None
if ev_attrs:
for key, value in ev_attrs.items():
total += _ATTR_OVERHEAD
total += _utf8_len(str(key))
total += estimate_value_bytes(value)
return total


T = TypeVar("T")


def chunk_by_size(
items: Sequence[T],
get_size: Callable[[T], int],
max_chunk_bytes: int,
) -> list[list[T]]:
Comment thread
alexlu4250 marked this conversation as resolved.
"""Split items into sub-batches whose cumulative estimated size stays under ``max_chunk_bytes``.

Multi-item chunks are guaranteed to stay within the limit. A single item
whose estimated size exceeds ``max_chunk_bytes`` forms its own one-item
chunk (never silently dropped) even though that chunk exceeds the limit.

Invariants:
- Input order is preserved across chunks.
- Empty input produces empty output.
- No item is ever dropped.
- No chunk is ever empty.
Comment thread
alexlu4250 marked this conversation as resolved.

Raises:
ValueError: If ``max_chunk_bytes`` is not positive, or if ``get_size``
returns a negative value for any item.
"""
if max_chunk_bytes <= 0:
raise ValueError(f"max_chunk_bytes must be positive, got {max_chunk_bytes}")

chunks: list[list[T]] = []
current: list[T] = []
current_bytes = 0

for item in items:
item_bytes = get_size(item)
if item_bytes < 0:
raise ValueError(
f"get_size returned a negative value ({item_bytes}); sizes must be non-negative"
)
if current and current_bytes + item_bytes > max_chunk_bytes:
chunks.append(current)
current = []
current_bytes = 0
current.append(item)
current_bytes += item_bytes

if current:
chunks.append(current)

return chunks
Loading
Loading