Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3218,6 +3218,13 @@
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# Whether to enforce max-segment-bytes during ingestion via the Lua script.
register(
"spans.buffer.enforce-segment-size",
type=Bool,
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# TTL for keys in Redis. This is a downside protection in case of bugs.
register(
"spans.buffer.redis-ttl",
Expand Down
83 changes: 82 additions & 1 deletion src/sentry/scripts/spans/add-buffer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ ARGS:
- has_root_span -- "true" or "false" -- Whether the subsegment contains the root of the segment.
- set_timeout -- int
- byte_count -- int -- The total number of bytes in the subsegment.
- max_segment_bytes -- int -- Maximum allowed ingested bytes for a segment. 0 means no limit.
- salt -- str -- Unique identifier for this subsegment. When the segment exceeds max_segment_bytes, this subsegment
is detached into its own segment keyed by salt. Empty string disables this behavior.
- *span_id -- str[] -- The span ids in the subsegment.

RETURNS:
Expand All @@ -45,7 +48,9 @@ local parent_span_id = ARGV[2]
local has_root_span = ARGV[3] == "true"
local set_timeout = tonumber(ARGV[4])
local byte_count = tonumber(ARGV[5])
local NUM_ARGS = 5
local max_segment_bytes = tonumber(ARGV[6])
local salt = ARGV[7] or ""
local NUM_ARGS = 7

local function get_time_ms()
local time = redis.call("TIME")
Expand Down Expand Up @@ -100,6 +105,82 @@ redis.call("expire", main_redirect_key, set_timeout)
local redirect_end_time_ms = get_time_ms()
table.insert(latency_table, {"redirect_step_latency_ms", redirect_end_time_ms - start_time_ms})

if salt ~= "" then
local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
local ingested_byte_count = tonumber(redis.call("get", ingested_byte_count_key) or 0)

for i = NUM_ARGS + 1, NUM_ARGS + num_spans do
local span_id = ARGV[i]
if span_id ~= parent_span_id then
local child_set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id)
local child_ibc_key = string.format("span-buf:ibc:%s", child_set_key)
local child_ibc = tonumber(redis.call("get", child_ibc_key) or 0)
byte_count = byte_count + child_ibc
end
end

-- If the segment is already too big, make this subsegment its own segment
-- with salt as the identifier.
if max_segment_bytes > 0 and tonumber(ingested_byte_count) + byte_count > max_segment_bytes then
set_span_id = salt
set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, salt)
ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
end

local ingested_count_key = string.format("span-buf:ic:%s", set_key)
local members_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, set_span_id)

for i = NUM_ARGS + 1, NUM_ARGS + num_spans do
local span_id = ARGV[i]
if span_id ~= parent_span_id then
local child_set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id)

local child_ic_key = string.format("span-buf:ic:%s", child_set_key)
local child_ic = redis.call("get", child_ic_key)
if child_ic then
redis.call("incrby", ingested_count_key, child_ic)
redis.call("del", child_ic_key)
end

local child_ibc_key = string.format("span-buf:ibc:%s", child_set_key)
local child_ibc = redis.call("get", child_ibc_key)
if child_ibc then
-- byte_count already holds the child's byte count, so we don't need to add again
redis.call("del", child_ibc_key)
end

local child_members_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, span_id)
local child_members = redis.call("smembers", child_members_key)
if #child_members > 0 then
redis.call("sadd", members_key, unpack(child_members))
redis.call("del", child_members_key)
end
end
end

local merge_payload_keys_end_time_ms = get_time_ms()
table.insert(latency_table, {"merge_payload_keys_step_latency_ms", merge_payload_keys_end_time_ms - redirect_end_time_ms})

redis.call("sadd", members_key, salt)
redis.call("expire", members_key, set_timeout)

-- Track total number of spans ingested for this segment
redis.call("incrby", ingested_count_key, num_spans)
redis.call("incrby", ingested_byte_count_key, byte_count)
redis.call("expire", ingested_count_key, set_timeout)
redis.call("expire", ingested_byte_count_key, set_timeout)

local counter_merge_end_time_ms = get_time_ms()
table.insert(latency_table, {"counter_merge_step_latency_ms", counter_merge_end_time_ms - merge_payload_keys_end_time_ms})

-- Capture end time and calculate latency in milliseconds
local end_time_ms = get_time_ms()
local latency_ms = end_time_ms - start_time_ms
table.insert(latency_table, {"total_step_latency_ms", latency_ms})

return {set_key, has_root_span, latency_ms, latency_table, metrics_table}
end

-- Maintain member-keys (span-buf:mk) tracking sets so the flusher
-- knows which payload keys to fetch.
local member_keys_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, set_span_id)
Expand Down
71 changes: 57 additions & 14 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,26 @@

Now how does that look like in Redis? For each incoming span, we:

1. Store the span payload in a payload key:
"span-buf:s:{project_id:trace_id:span_id}:span_id". Each subsegment
gets its own key, distributed across Redis cluster nodes.
1. Store the span payload in a payload key. Each subsegment gets its own key,
distributed across Redis cluster nodes.
a. When segment size enforcement is disabled, the key uses the parent_span_id to
determine where to write span payloads to.
Key: `span-buf:s:{project_id:trace_id:parent_span_id}:parent_span_id`
b. When segment size enforcement is enabled, the key uses a unique salt per
subsegment. This allows us to skip merging the subsegment into the parent segment
and not lose any data, since the subsegment will become its own separate segment
and be flushed out independently.
Key: `span-buf:s:{project_id:trace_id:salt}:salt`
2. The Lua script (add-buffer.lua) receives the span IDs and:
a. Follows redirects from parent_span_id (hashmap at
"span-buf:ssr:{project_id:trace_id}") to find the segment root.
b. Updates the redirect table so future spans can find the segment root.
c. Merges member-keys indexes and counters (ingested count, byte count)
from span IDs that were previously separate segment roots into the
current segment root.
d. If segment size enforcement is enabled and the segment exceeds
max_segment_bytes, detaches the subsegment into its own segment
keyed by the salt.
3. To a "global queue", we write the segment key, sorted by timeout.

Eventually, flushing cronjob looks at that global queue, and removes all timed
Expand All @@ -58,6 +68,22 @@
or using spillover topics, especially when their new partition count is lower
than the original topic.

Segment size enforcement:

Segments can grow unboundedly as spans arrive. To prevent oversized segments from
consuming excessive memory during flush, the buffer enforces a maximum byte limit
per segment (controlled by `spans.buffer.max-segment-bytes` and gated behind
`spans.buffer.enforce-segment-size`).

Each subsegment is assigned a unique salt (UUID). The Lua script tracks cumulative
ingested bytes per segment via `span-buf:ibc` keys. If adding a subsegment would
push the segment over the byte limit, the script detaches it into a new segment
keyed by the salt instead of merging it into the parent. The detached segment is
independently tracked and flushed.

During flush, segments that exceed `max-segment-bytes` are chunked into multiple
Kafka messages to stay within downstream size limits.

Glossary for types of keys:

* span-buf:s:{project_id:trace_id:span_id}:span_id -- payload keys containing span payloads, distributed across cluster nodes.
Expand All @@ -76,6 +102,7 @@
import logging
import math
import time
import uuid
from collections.abc import Generator, MutableMapping, Sequence
from typing import Any, NamedTuple, cast

Expand Down Expand Up @@ -146,6 +173,12 @@ def effective_parent_id(self):
type SpanPayload = dict[str, Any]


class Subsegment(NamedTuple):
project_and_trace: tuple[str, str]
salt: str
subsegment: list[Span]


class OutputSpan(NamedTuple):
payload: SpanPayload

Expand Down Expand Up @@ -254,6 +287,8 @@ def process_spans(self, spans: Sequence[Span], now: int):
timeout = options.get("spans.buffer.timeout")
root_timeout = options.get("spans.buffer.root-timeout")
max_spans_per_evalsha = options.get("spans.buffer.max-spans-per-evalsha")
max_segment_bytes = options.get("spans.buffer.max-segment-bytes")
enforce_segment_size = options.get("spans.buffer.enforce-segment-size")
result_meta = []
is_root_span_count = 0

Expand All @@ -263,25 +298,27 @@ def process_spans(self, spans: Sequence[Span], now: int):

# Split large subsegments into chunks to avoid Lua unpack() limits.
# Chunks share the same parent_span_id but are processed separately.
tree_items: list[tuple[tuple[str, str], list[Span]]] = []
tree_items: list[Subsegment] = []
for key, subsegment in trees.items():
if max_spans_per_evalsha > 0 and len(subsegment) > max_spans_per_evalsha:
for chunk in itertools.batched(subsegment, max_spans_per_evalsha):
tree_items.append((key, list(chunk)))
tree_items.append(Subsegment(key, uuid.uuid4().hex, list(chunk)))
else:
tree_items.append((key, subsegment))
tree_items.append(Subsegment(key, uuid.uuid4().hex, subsegment))

tree_batches: Sequence[Sequence[tuple[tuple[str, str], list[Span]]]]
tree_batches: Sequence[Sequence[Subsegment]]
if pipeline_batch_size > 0:
tree_batches = list(itertools.batched(tree_items, pipeline_batch_size))
else:
tree_batches = [tree_items]

for batch in tree_batches:
with self.client.pipeline(transaction=False) as p:
for (project_and_trace, parent_span_id), subsegment in batch:
for (project_and_trace, parent_span_id), salt, subsegment in batch:
set_members = self._prepare_payloads(subsegment)
payload_key = self._get_payload_key(project_and_trace, parent_span_id)
if enforce_segment_size:
payload_key = self._get_payload_key(project_and_trace, salt)
p.sadd(payload_key, *set_members)
p.expire(payload_key, redis_ttl)

Expand All @@ -296,7 +333,7 @@ def process_spans(self, spans: Sequence[Span], now: int):
results: list[Any] = []
for batch in tree_batches:
with self.client.pipeline(transaction=False) as p:
for (project_and_trace, parent_span_id), subsegment in batch:
for (project_and_trace, parent_span_id), salt, subsegment in batch:
byte_count = sum(len(span.payload) for span in subsegment)

try:
Expand All @@ -323,6 +360,8 @@ def process_spans(self, spans: Sequence[Span], now: int):
is_segment_span,
redis_ttl,
byte_count,
max_segment_bytes,
salt if enforce_segment_size else "",
*span_ids,
)

Expand All @@ -331,7 +370,7 @@ def process_spans(self, spans: Sequence[Span], now: int):
# All spans in a subsegment share the same trace_id,
# so they all came from the same Kafka partition.
partition = subsegment[0].partition
result_meta.append((project_and_trace, parent_span_id, partition))
result_meta.append((project_and_trace, parent_span_id, partition, salt))

results.extend(p.execute())

Expand All @@ -349,7 +388,9 @@ def process_spans(self, spans: Sequence[Span], now: int):

assert len(result_meta) == len(results)

for (project_and_trace, parent_span_id, partition), result in zip(result_meta, results):
for (project_and_trace, parent_span_id, partition, salt), result in zip(
result_meta, results
):
(
segment_key,
has_root_span,
Expand Down Expand Up @@ -402,9 +443,11 @@ def process_spans(self, spans: Sequence[Span], now: int):

subsegment_spans = trees[project_and_trace, parent_span_id]
delete_set = queue_deletes.setdefault(queue_key, set())
delete_set.update(
self._get_span_key(project_and_trace, span.span_id) for span in subsegment_spans
)
if not segment_key.endswith(salt.encode("ascii")):
delete_set.update(
self._get_span_key(project_and_trace, span.span_id)
for span in subsegment_spans
)
delete_set.discard(segment_key)

for result in results:
Expand Down
Loading
Loading