diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java index 092bcb01e..244782024 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java @@ -14,6 +14,7 @@ public class RawSpanGrouperConstants { public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY = "dataflow.metriccollection.sampling.percent"; public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count"; + public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count"; public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans"; public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces"; } diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index a1d8ea941..b165870ba 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -2,6 +2,7 @@ import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER; @@ -64,6 +65,7 @@ public class RawSpansProcessor private To outputTopic; private double dataflowSamplingPercent = -1; private static final Map maxSpanCountMap = new HashMap<>(); + private long defaultMaxSpanCountLimit = Long.MAX_VALUE; // counter for number of spans dropped per tenant private static final ConcurrentMap droppedSpansCounter = @@ -99,6 +101,10 @@ public void init(ProcessorContext context) { }); } + if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) { + defaultMaxSpanCountLimit = jobConfig.getLong(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT); + } + this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER); restorePunctuators(); } @@ -164,9 +170,16 @@ public KeyValue transform(TraceIdentity key, RawSpan va } private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { - if (traceState != null - && maxSpanCountMap.containsKey(key.getTenantId()) - && traceState.getSpanIds().size() >= maxSpanCountMap.get(key.getTenantId())) { + int inFlightSpansPerTrace = + traceState != null ? traceState.getSpanIds().size() : Integer.MIN_VALUE; + long maxSpanCountTenantLimit = + maxSpanCountMap.containsKey(key.getTenantId()) + ? maxSpanCountMap.get(key.getTenantId()) + : Long.MAX_VALUE; + + if (inFlightSpansPerTrace >= defaultMaxSpanCountLimit + || inFlightSpansPerTrace >= maxSpanCountTenantLimit) { + if (logger.isDebugEnabled()) { logger.debug( "Dropping span [{}] from tenant_id={}, trace_id={} after grouping {} spans", @@ -175,6 +188,7 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { HexUtils.getHex(key.getTraceId()), traceState.getSpanIds().size()); } + // increment the counter for dropped spans droppedSpansCounter .computeIfAbsent( @@ -183,20 +197,21 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { PlatformMetricsRegistry.registerCounter( DROPPED_SPANS_COUNTER, Map.of("tenantId", k))) .increment(); + + // increment the counter when the number of spans reaches the max.span.count limit. + if (inFlightSpansPerTrace == defaultMaxSpanCountLimit + || inFlightSpansPerTrace == maxSpanCountTenantLimit) { + truncatedTracesCounter + .computeIfAbsent( + key.getTenantId(), + k -> + PlatformMetricsRegistry.registerCounter( + TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k))) + .increment(); + } + // drop the span as limit is reached return true; } - // increment the counter when the number of spans reaches the max.span.count limit. - if (traceState != null - && maxSpanCountMap.containsKey(key.getTenantId()) - && traceState.getSpanIds().size() == maxSpanCountMap.get(key.getTenantId())) { - truncatedTracesCounter - .computeIfAbsent( - key.getTenantId(), - k -> - PlatformMetricsRegistry.registerCounter( - TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k))) - .increment(); - } return false; } diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 04bfa190b..f93123e41 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -77,6 +77,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir String tenantId = "tenant1"; + // create spans for trace-1 of tenant1 RawSpan span1 = RawSpan.newBuilder() .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) @@ -95,6 +96,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir .setCustomerId("tenant1") .setEvent(createEvent("event-3", "tenant1")) .build(); + + // create spans for trace-2 of tenant1 RawSpan span4 = RawSpan.newBuilder() .setTraceId(ByteBuffer.wrap("trace-2".getBytes())) @@ -107,6 +110,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir .setCustomerId("tenant1") .setEvent(createEvent("event-5", "tenant1")) .build(); + + // create spans for trace-3 of tenant1 RawSpan span6 = RawSpan.newBuilder() .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) @@ -144,6 +149,57 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir .setEvent(createEvent("event-11", "tenant1")) .build(); + // create 8 spans for tenant-2 for trace-4 + String tenant2 = "tenant2"; + RawSpan span12 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-12", tenant2)) + .build(); + RawSpan span13 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-13", tenant2)) + .build(); + RawSpan span14 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-14", tenant2)) + .build(); + RawSpan span15 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-15", tenant2)) + .build(); + RawSpan span16 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-16", tenant2)) + .build(); + RawSpan span17 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-17", tenant2)) + .build(); + RawSpan span18 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-18", tenant2)) + .build(); + RawSpan span19 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-19", tenant2)) + .build(); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1); inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4); td.advanceWallClockTime(Duration.ofSeconds(1)); @@ -199,6 +255,20 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir // trace should be truncated with 5 spans trace = (StructuredTrace) outputTopic.readValue(); assertEquals(5, trace.getEventList().size()); + + // input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only + // 6 + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19); + td.advanceWallClockTime(Duration.ofSeconds(35)); + trace = (StructuredTrace) outputTopic.readValue(); + assertEquals(6, trace.getEventList().size()); } private Event createEvent(String eventId, String tenantId) { diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf index 24ddf7725..1e1197a3a 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf @@ -28,6 +28,7 @@ span.window.store.retention.time.mins = ${?SPAN_WINDOW_STORE_RETENTION_TIME_MINS span.window.store.segment.size.mins = 20 span.window.store.segment.size.mins = ${?SPAN_WINDOW_STORE_SEGMENT_SIZE_MINS} +default.max.span.count = 6 max.span.count = { tenant1 = 5 }