From 80ef63b7f9cf1313534301d8c0b375a5fb6ae3f8 Mon Sep 17 00:00:00 2001 From: Tony Le Date: Wed, 8 Apr 2026 21:12:23 -0400 Subject: [PATCH 1/3] feat(spans): enforce max segment bytes during ingestion --- src/sentry/options/defaults.py | 7 ++ src/sentry/scripts/spans/add-buffer.lua | 82 +++++++++++++++- src/sentry/spans/buffer.py | 39 +++++--- tests/sentry/spans/test_buffer.py | 118 ++++++++++++++++++++++++ 4 files changed, 234 insertions(+), 12 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 8725c6ad5c15d0..1d14e6adc9090e 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3218,6 +3218,13 @@ default=False, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) +# Whether to enforce max-segment-bytes during ingestion via the Lua script. +register( + "spans.buffer.enforce-segment-size", + type=Bool, + default=False, + flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, +) # TTL for keys in Redis. This is a downside protection in case of bugs. register( "spans.buffer.redis-ttl", diff --git a/src/sentry/scripts/spans/add-buffer.lua b/src/sentry/scripts/spans/add-buffer.lua index fb5bfa944d66eb..503b3b2e7724d3 100644 --- a/src/sentry/scripts/spans/add-buffer.lua +++ b/src/sentry/scripts/spans/add-buffer.lua @@ -27,6 +27,8 @@ ARGS: - has_root_span -- "true" or "false" -- Whether the subsegment contains the root of the segment. - set_timeout -- int - byte_count -- int -- The total number of bytes in the subsegment. +- max_segment_bytes -- int -- Maximum allowed ingested bytes for a segment. 0 means no limit. +- salt -- str -- Salt to add to the segment key to avoid collisions. - *span_id -- str[] -- The span ids in the subsegment. RETURNS: @@ -45,7 +47,9 @@ local parent_span_id = ARGV[2] local has_root_span = ARGV[3] == "true" local set_timeout = tonumber(ARGV[4]) local byte_count = tonumber(ARGV[5]) -local NUM_ARGS = 5 +local max_segment_bytes = tonumber(ARGV[6]) +local salt = ARGV[7] or "" +local NUM_ARGS = 7 local function get_time_ms() local time = redis.call("TIME") @@ -100,6 +104,82 @@ redis.call("expire", main_redirect_key, set_timeout) local redirect_end_time_ms = get_time_ms() table.insert(latency_table, {"redirect_step_latency_ms", redirect_end_time_ms - start_time_ms}) +if salt ~= "" then + local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key) + local ingested_byte_count = tonumber(redis.call("get", ingested_byte_count_key) or 0) + + for i = NUM_ARGS + 1, NUM_ARGS + num_spans do + local span_id = ARGV[i] + if span_id ~= parent_span_id then + local child_set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id) + local child_ibc_key = string.format("span-buf:ibc:%s", child_set_key) + local child_ibc = tonumber(redis.call("get", child_ibc_key) or 0) + byte_count = byte_count + child_ibc + end + end + + -- If the segment is already too big, make this subsegment its own segment + -- with salt as the identifier. + if max_segment_bytes > 0 and tonumber(ingested_byte_count) + byte_count > max_segment_bytes then + set_span_id = salt + set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, salt) + ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key) + end + + local ingested_count_key = string.format("span-buf:ic:%s", set_key) + local members_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, set_span_id) + + for i = NUM_ARGS + 1, NUM_ARGS + num_spans do + local span_id = ARGV[i] + if span_id ~= parent_span_id then + local child_set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id) + + local child_ic_key = string.format("span-buf:ic:%s", child_set_key) + local child_ic = redis.call("get", child_ic_key) + if child_ic then + redis.call("incrby", ingested_count_key, child_ic) + redis.call("del", child_ic_key) + end + + local child_ibc_key = string.format("span-buf:ibc:%s", child_set_key) + local child_ibc = redis.call("get", child_ibc_key) + if child_ibc then + -- byte_count already holds the child's byte count, so we don't need to add again + redis.call("del", child_ibc_key) + end + + local child_members_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, span_id) + local child_members = redis.call("smembers", child_members_key) + if #child_members > 0 then + redis.call("sadd", members_key, unpack(child_members)) + redis.call("del", child_members_key) + end + end + end + + local merge_payload_keys_end_time_ms = get_time_ms() + table.insert(latency_table, {"merge_payload_keys_step_latency_ms", merge_payload_keys_end_time_ms - redirect_end_time_ms}) + + redis.call("sadd", members_key, salt) + redis.call("expire", members_key, set_timeout) + + -- Track total number of spans ingested for this segment + redis.call("incrby", ingested_count_key, num_spans) + redis.call("incrby", ingested_byte_count_key, byte_count) + redis.call("expire", ingested_count_key, set_timeout) + redis.call("expire", ingested_byte_count_key, set_timeout) + + local counter_merge_end_time_ms = get_time_ms() + table.insert(latency_table, {"counter_merge_step_latency_ms", counter_merge_end_time_ms - merge_payload_keys_end_time_ms}) + + -- Capture end time and calculate latency in milliseconds + local end_time_ms = get_time_ms() + local latency_ms = end_time_ms - start_time_ms + table.insert(latency_table, {"total_step_latency_ms", latency_ms}) + + return {set_key, has_root_span, latency_ms, latency_table, metrics_table} +end + -- Maintain member-keys (span-buf:mk) tracking sets so the flusher -- knows which payload keys to fetch. local member_keys_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, set_span_id) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index 59cec919869165..983d45b55b5e10 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -76,6 +76,7 @@ import logging import math import time +import uuid from collections.abc import Generator, MutableMapping, Sequence from typing import Any, NamedTuple, cast @@ -146,6 +147,12 @@ def effective_parent_id(self): type SpanPayload = dict[str, Any] +class Subsegment(NamedTuple): + project_and_trace: tuple[str, str] + salt: str + subsegment: list[Span] + + class OutputSpan(NamedTuple): payload: SpanPayload @@ -254,6 +261,8 @@ def process_spans(self, spans: Sequence[Span], now: int): timeout = options.get("spans.buffer.timeout") root_timeout = options.get("spans.buffer.root-timeout") max_spans_per_evalsha = options.get("spans.buffer.max-spans-per-evalsha") + max_segment_bytes = options.get("spans.buffer.max-segment-bytes") + enforce_segment_size = options.get("spans.buffer.enforce-segment-size") result_meta = [] is_root_span_count = 0 @@ -263,15 +272,15 @@ def process_spans(self, spans: Sequence[Span], now: int): # Split large subsegments into chunks to avoid Lua unpack() limits. # Chunks share the same parent_span_id but are processed separately. - tree_items: list[tuple[tuple[str, str], list[Span]]] = [] + tree_items: list[Subsegment] = [] for key, subsegment in trees.items(): if max_spans_per_evalsha > 0 and len(subsegment) > max_spans_per_evalsha: for chunk in itertools.batched(subsegment, max_spans_per_evalsha): - tree_items.append((key, list(chunk))) + tree_items.append((key, uuid.uuid4().hex, list(chunk))) else: - tree_items.append((key, subsegment)) + tree_items.append((key, uuid.uuid4().hex, subsegment)) - tree_batches: Sequence[Sequence[tuple[tuple[str, str], list[Span]]]] + tree_batches: Sequence[Sequence[Subsegment]] if pipeline_batch_size > 0: tree_batches = list(itertools.batched(tree_items, pipeline_batch_size)) else: @@ -279,9 +288,11 @@ def process_spans(self, spans: Sequence[Span], now: int): for batch in tree_batches: with self.client.pipeline(transaction=False) as p: - for (project_and_trace, parent_span_id), subsegment in batch: + for (project_and_trace, parent_span_id), salt, subsegment in batch: set_members = self._prepare_payloads(subsegment) payload_key = self._get_payload_key(project_and_trace, parent_span_id) + if enforce_segment_size: + payload_key = self._get_payload_key(project_and_trace, salt) p.sadd(payload_key, *set_members) p.expire(payload_key, redis_ttl) @@ -296,7 +307,7 @@ def process_spans(self, spans: Sequence[Span], now: int): results: list[Any] = [] for batch in tree_batches: with self.client.pipeline(transaction=False) as p: - for (project_and_trace, parent_span_id), subsegment in batch: + for (project_and_trace, parent_span_id), salt, subsegment in batch: byte_count = sum(len(span.payload) for span in subsegment) try: @@ -323,6 +334,8 @@ def process_spans(self, spans: Sequence[Span], now: int): is_segment_span, redis_ttl, byte_count, + max_segment_bytes, + salt if enforce_segment_size else "", *span_ids, ) @@ -331,7 +344,7 @@ def process_spans(self, spans: Sequence[Span], now: int): # All spans in a subsegment share the same trace_id, # so they all came from the same Kafka partition. partition = subsegment[0].partition - result_meta.append((project_and_trace, parent_span_id, partition)) + result_meta.append((project_and_trace, parent_span_id, partition, salt)) results.extend(p.execute()) @@ -349,7 +362,9 @@ def process_spans(self, spans: Sequence[Span], now: int): assert len(result_meta) == len(results) - for (project_and_trace, parent_span_id, partition), result in zip(result_meta, results): + for (project_and_trace, parent_span_id, partition, salt), result in zip( + result_meta, results + ): ( segment_key, has_root_span, @@ -402,9 +417,11 @@ def process_spans(self, spans: Sequence[Span], now: int): subsegment_spans = trees[project_and_trace, parent_span_id] delete_set = queue_deletes.setdefault(queue_key, set()) - delete_set.update( - self._get_span_key(project_and_trace, span.span_id) for span in subsegment_spans - ) + if not segment_key.endswith(salt.encode("ascii")): + delete_set.update( + self._get_span_key(project_and_trace, span.span_id) + for span in subsegment_spans + ) delete_set.discard(segment_key) for result in results: diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index 7e5f62209dc8b1..6f319f69fdaafc 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -38,6 +38,7 @@ "spans.buffer.evalsha-latency-threshold": 100, "spans.buffer.debug-traces": [], "spans.buffer.evalsha-cumulative-logger-enabled": True, + "spans.buffer.enforce-segment-size": False, "spans.process-segments.schema-validation": 1.0, "spans.buffer.chunk-oversized-segments": False, } @@ -842,6 +843,123 @@ def test_max_segment_spans_limit(mock_project_model, buffer: SpansBuffer) -> Non assert segment.spans == [] +@mock.patch("sentry.spans.buffer.Project") +def test_max_segment_bytes_detaches_over_limit(mock_project_model, buffer: SpansBuffer) -> None: + """When a segment's cumulative ingested bytes exceed max-segment-bytes, subsequent + subsegments are written to a detached (salted) key so that overflow spans are not lost.""" + mock_project = mock.Mock() + mock_project.id = 1 + mock_project.organization_id = 100 + mock_project_model.objects.get_from_cache.return_value = mock_project + + # Each payload is ~30 bytes. With limit=40, the Lua script detaches on + # the 3rd batch (cumulative 60 > 40). The flusher also enforces the limit, + # so the normal segment (60 bytes) is dropped, but the detached segment + # (30 bytes) is kept. + batch1 = [ + Span( + payload=_payload("b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + batch2 = [ + Span( + payload=_payload("c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + batch3 = [ + Span( + payload=_payload("d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + + with override_options( + {"spans.buffer.max-segment-bytes": 70, "spans.buffer.enforce-segment-size": True} + ): + buffer.process_spans(batch1, now=0) + buffer.process_spans(batch2, now=0) + buffer.process_spans(batch3, now=0) + rv = buffer.flush_segments(now=61) + + assert len(rv) == 2 + + normal_key = _segment_id(1, "a" * 32, "a" * 16) + normal_segment = rv.get(normal_key) + assert normal_segment is not None + assert len(normal_segment.spans) == 2 + + # The new segment (salt) contains the overflow span + detached_keys = [k for k in rv if k != normal_key] + assert len(detached_keys) == 1 + detached_segment = rv[detached_keys[0]] + assert len(detached_segment.spans) == 1 + assert detached_segment.spans[0].payload["span_id"] == "d" * 16 + + +@mock.patch("sentry.spans.buffer.Project") +def test_max_segment_bytes_under_limit_merges_normally( + mock_project_model, buffer: SpansBuffer +) -> None: + """When a segment is within max-segment-bytes, subsegments merge normally.""" + mock_project = mock.Mock() + mock_project.id = 1 + mock_project.organization_id = 100 + mock_project_model.objects.get_from_cache.return_value = mock_project + + batch1 = [ + Span( + payload=_payload("b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + batch2 = [ + Span( + payload=_payload("c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + + # Large limit so both batches fit + with override_options( + { + "spans.buffer.max-segment-bytes": 10 * 1024 * 1024, + "spans.buffer.enforce-segment-size": True, + } + ): + buffer.process_spans(batch1, now=0) + buffer.process_spans(batch2, now=0) + rv = buffer.flush_segments(now=61) + + # Both spans merged into a single segment + assert len(rv) == 1 + segment = rv[_segment_id(1, "a" * 32, "a" * 16)] + assert len(segment.spans) == 2 + span_ids = {s.payload["span_id"] for s in segment.spans} + assert span_ids == {"b" * 16, "c" * 16} + + @mock.patch("sentry.spans.buffer.Project") @mock.patch("sentry.spans.buffer.track_outcome") @mock.patch("sentry.spans.buffer.metrics.timing") From b4bdc26db8e86c0c45fa453813b3cbed737f4c1f Mon Sep 17 00:00:00 2001 From: Tony Le Date: Thu, 9 Apr 2026 11:10:55 -0400 Subject: [PATCH 2/3] fix: typing --- src/sentry/spans/buffer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index 983d45b55b5e10..b62d4f2e7dfbf9 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -276,9 +276,9 @@ def process_spans(self, spans: Sequence[Span], now: int): for key, subsegment in trees.items(): if max_spans_per_evalsha > 0 and len(subsegment) > max_spans_per_evalsha: for chunk in itertools.batched(subsegment, max_spans_per_evalsha): - tree_items.append((key, uuid.uuid4().hex, list(chunk))) + tree_items.append(Subsegment(key, uuid.uuid4().hex, list(chunk))) else: - tree_items.append((key, uuid.uuid4().hex, subsegment)) + tree_items.append(Subsegment(key, uuid.uuid4().hex, subsegment)) tree_batches: Sequence[Sequence[Subsegment]] if pipeline_batch_size > 0: From 6fff2a5b374e9cc6fdd04ddb20b01a57ffbeb12b Mon Sep 17 00:00:00 2001 From: Tony Le Date: Thu, 9 Apr 2026 12:05:17 -0400 Subject: [PATCH 3/3] add better documentation --- src/sentry/scripts/spans/add-buffer.lua | 3 ++- src/sentry/spans/buffer.py | 32 ++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/sentry/scripts/spans/add-buffer.lua b/src/sentry/scripts/spans/add-buffer.lua index 503b3b2e7724d3..de90a37e5dcba6 100644 --- a/src/sentry/scripts/spans/add-buffer.lua +++ b/src/sentry/scripts/spans/add-buffer.lua @@ -28,7 +28,8 @@ ARGS: - set_timeout -- int - byte_count -- int -- The total number of bytes in the subsegment. - max_segment_bytes -- int -- Maximum allowed ingested bytes for a segment. 0 means no limit. -- salt -- str -- Salt to add to the segment key to avoid collisions. +- salt -- str -- Unique identifier for this subsegment. When the segment exceeds max_segment_bytes, this subsegment + is detached into its own segment keyed by salt. Empty string disables this behavior. - *span_id -- str[] -- The span ids in the subsegment. RETURNS: diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index b62d4f2e7dfbf9..b0771943012d75 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -32,9 +32,16 @@ Now how does that look like in Redis? For each incoming span, we: -1. Store the span payload in a payload key: - "span-buf:s:{project_id:trace_id:span_id}:span_id". Each subsegment - gets its own key, distributed across Redis cluster nodes. +1. Store the span payload in a payload key. Each subsegment gets its own key, + distributed across Redis cluster nodes. + a. When segment size enforcement is disabled, the key uses the parent_span_id to + determine where to write span payloads to. + Key: `span-buf:s:{project_id:trace_id:parent_span_id}:parent_span_id` + b. When segment size enforcement is enabled, the key uses a unique salt per + subsegment. This allows us to skip merging the subsegment into the parent segment + and not lose any data, since the subsegment will become its own separate segment + and be flushed out independently. + Key: `span-buf:s:{project_id:trace_id:salt}:salt` 2. The Lua script (add-buffer.lua) receives the span IDs and: a. Follows redirects from parent_span_id (hashmap at "span-buf:ssr:{project_id:trace_id}") to find the segment root. @@ -42,6 +49,9 @@ c. Merges member-keys indexes and counters (ingested count, byte count) from span IDs that were previously separate segment roots into the current segment root. + d. If segment size enforcement is enabled and the segment exceeds + max_segment_bytes, detaches the subsegment into its own segment + keyed by the salt. 3. To a "global queue", we write the segment key, sorted by timeout. Eventually, flushing cronjob looks at that global queue, and removes all timed @@ -58,6 +68,22 @@ or using spillover topics, especially when their new partition count is lower than the original topic. +Segment size enforcement: + +Segments can grow unboundedly as spans arrive. To prevent oversized segments from +consuming excessive memory during flush, the buffer enforces a maximum byte limit +per segment (controlled by `spans.buffer.max-segment-bytes` and gated behind +`spans.buffer.enforce-segment-size`). + +Each subsegment is assigned a unique salt (UUID). The Lua script tracks cumulative +ingested bytes per segment via `span-buf:ibc` keys. If adding a subsegment would +push the segment over the byte limit, the script detaches it into a new segment +keyed by the salt instead of merging it into the parent. The detached segment is +independently tracked and flushed. + +During flush, segments that exceed `max-segment-bytes` are chunked into multiple +Kafka messages to stay within downstream size limits. + Glossary for types of keys: * span-buf:s:{project_id:trace_id:span_id}:span_id -- payload keys containing span payloads, distributed across cluster nodes.