Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class RawSpanGrouperConstants {
public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY =
"dataflow.metriccollection.sampling.percent";
public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count";
public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count";
public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans";
public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces";
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class RawSpansProcessor
private To outputTopic;
private double dataflowSamplingPercent = -1;
private static final Map<String, Long> maxSpanCountMap = new HashMap<>();
private long defaultMaxSpanCountLimit = Long.MAX_VALUE;

// counter for number of spans dropped per tenant
private static final ConcurrentMap<String, Counter> droppedSpansCounter =
Expand Down Expand Up @@ -99,6 +101,10 @@ public void init(ProcessorContext context) {
});
}

if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
defaultMaxSpanCountLimit = jobConfig.getLong(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT);
}

this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER);
restorePunctuators();
}
Expand Down Expand Up @@ -164,9 +170,16 @@ public KeyValue<String, StructuredTrace> transform(TraceIdentity key, RawSpan va
}

private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
if (traceState != null
&& maxSpanCountMap.containsKey(key.getTenantId())
&& traceState.getSpanIds().size() >= maxSpanCountMap.get(key.getTenantId())) {
int inFlightSpansPerTrace =
traceState != null ? traceState.getSpanIds().size() : Integer.MIN_VALUE;
long maxSpanCountTenantLimit =
maxSpanCountMap.containsKey(key.getTenantId())
? maxSpanCountMap.get(key.getTenantId())
: Long.MAX_VALUE;

if (inFlightSpansPerTrace >= defaultMaxSpanCountLimit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought tenant specific config acts as an override the default.
But the condition here doesn't seem to indicate that. We are picking the least of both.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What @ravisingal & I had discussed offline of having a global upper limit, and so I had prior notatin of upper.max.span.count. But, what you are saying is that for a specific tenant, we want a higher limit, that makes sense.

Let me know @avinashkolluru @ravisingal if we want to modify and go with that if that is making more sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest made the tenant specific config as an override. It shouldn't matter whether its is lower or higher than the default.

|| inFlightSpansPerTrace >= maxSpanCountTenantLimit) {

if (logger.isDebugEnabled()) {
logger.debug(
"Dropping span [{}] from tenant_id={}, trace_id={} after grouping {} spans",
Expand All @@ -175,6 +188,7 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
HexUtils.getHex(key.getTraceId()),
traceState.getSpanIds().size());
}

// increment the counter for dropped spans
droppedSpansCounter
.computeIfAbsent(
Expand All @@ -183,20 +197,21 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
PlatformMetricsRegistry.registerCounter(
DROPPED_SPANS_COUNTER, Map.of("tenantId", k)))
.increment();

// increment the counter when the number of spans reaches the max.span.count limit.
if (inFlightSpansPerTrace == defaultMaxSpanCountLimit
|| inFlightSpansPerTrace == maxSpanCountTenantLimit) {
truncatedTracesCounter
.computeIfAbsent(
key.getTenantId(),
k ->
PlatformMetricsRegistry.registerCounter(
TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k)))
.increment();
}
// drop the span as limit is reached
return true;
}
// increment the counter when the number of spans reaches the max.span.count limit.
if (traceState != null
&& maxSpanCountMap.containsKey(key.getTenantId())
&& traceState.getSpanIds().size() == maxSpanCountMap.get(key.getTenantId())) {
truncatedTracesCounter
.computeIfAbsent(
key.getTenantId(),
k ->
PlatformMetricsRegistry.registerCounter(
TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k)))
.increment();
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir

String tenantId = "tenant1";

// create spans for trace-1 of tenant1
RawSpan span1 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-1".getBytes()))
Expand All @@ -95,6 +96,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
.setCustomerId("tenant1")
.setEvent(createEvent("event-3", "tenant1"))
.build();

// create spans for trace-2 of tenant1
RawSpan span4 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-2".getBytes()))
Expand All @@ -107,6 +110,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
.setCustomerId("tenant1")
.setEvent(createEvent("event-5", "tenant1"))
.build();

// create spans for trace-3 of tenant1
RawSpan span6 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
Expand Down Expand Up @@ -144,6 +149,57 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
.setEvent(createEvent("event-11", "tenant1"))
.build();

// create 8 spans for tenant-2 for trace-4
String tenant2 = "tenant2";
RawSpan span12 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-12", tenant2))
.build();
RawSpan span13 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-13", tenant2))
.build();
RawSpan span14 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-14", tenant2))
.build();
RawSpan span15 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-15", tenant2))
.build();
RawSpan span16 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-16", tenant2))
.build();
RawSpan span17 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-17", tenant2))
.build();
RawSpan span18 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-18", tenant2))
.build();
RawSpan span19 =
RawSpan.newBuilder()
.setTraceId(ByteBuffer.wrap("trace-4".getBytes()))
.setCustomerId(tenant2)
.setEvent(createEvent("event-19", tenant2))
.build();

inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1);
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4);
td.advanceWallClockTime(Duration.ofSeconds(1));
Expand Down Expand Up @@ -199,6 +255,20 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
// trace should be truncated with 5 spans
trace = (StructuredTrace) outputTopic.readValue();
assertEquals(5, trace.getEventList().size());

// input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only
// 6
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19);
td.advanceWallClockTime(Duration.ofSeconds(35));
trace = (StructuredTrace) outputTopic.readValue();
assertEquals(6, trace.getEventList().size());
}

private Event createEvent(String eventId, String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ span.window.store.retention.time.mins = ${?SPAN_WINDOW_STORE_RETENTION_TIME_MINS
span.window.store.segment.size.mins = 20
span.window.store.segment.size.mins = ${?SPAN_WINDOW_STORE_SEGMENT_SIZE_MINS}

default.max.span.count = 6
max.span.count = {
tenant1 = 5
}
Expand Down