From 5f7043d0926eb976e5a2657bf79d956dbaef9059 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 24 Nov 2021 16:36:19 +0530 Subject: [PATCH 1/3] feat: add global upper limit for max span count limit --- .../RawSpanGrouperConstants.java | 1 + .../rawspansgrouper/RawSpansProcessor.java | 45 ++++++++---- .../rawspansgrouper/RawSpansGrouperTest.java | 70 +++++++++++++++++++ .../raw-spans-grouper/application.conf | 1 + 4 files changed, 102 insertions(+), 15 deletions(-) diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java index 092bcb01e..5f01053d0 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java @@ -14,6 +14,7 @@ public class RawSpanGrouperConstants { public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY = "dataflow.metriccollection.sampling.percent"; public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count"; + public static final String UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT = "upper.max.span.count"; public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans"; public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces"; } diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index a1d8ea941..39f5fd93d 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -10,6 +10,7 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT; import com.typesafe.config.Config; import io.micrometer.core.instrument.Counter; @@ -64,6 +65,7 @@ public class RawSpansProcessor private To outputTopic; private double dataflowSamplingPercent = -1; private static final Map maxSpanCountMap = new HashMap<>(); + private long maxSpanCountUpperLimit = Long.MAX_VALUE; // counter for number of spans dropped per tenant private static final ConcurrentMap droppedSpansCounter = @@ -99,6 +101,10 @@ public void init(ProcessorContext context) { }); } + if (jobConfig.hasPath(UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT)) { + maxSpanCountUpperLimit = jobConfig.getLong(UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT); + } + this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER); restorePunctuators(); } @@ -164,9 +170,16 @@ public KeyValue transform(TraceIdentity key, RawSpan va } private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { - if (traceState != null - && maxSpanCountMap.containsKey(key.getTenantId()) - && traceState.getSpanIds().size() >= maxSpanCountMap.get(key.getTenantId())) { + int inFlightSpansPerTrace = + traceState != null ? traceState.getSpanIds().size() : Integer.MIN_VALUE; + long maxSpanCountTenantLimit = + maxSpanCountMap.containsKey(key.getTenantId()) + ? maxSpanCountMap.get(key.getTenantId()) + : Long.MAX_VALUE; + + if (inFlightSpansPerTrace >= maxSpanCountUpperLimit + || inFlightSpansPerTrace >= maxSpanCountTenantLimit) { + if (logger.isDebugEnabled()) { logger.debug( "Dropping span [{}] from tenant_id={}, trace_id={} after grouping {} spans", @@ -175,6 +188,7 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { HexUtils.getHex(key.getTraceId()), traceState.getSpanIds().size()); } + // increment the counter for dropped spans droppedSpansCounter .computeIfAbsent( @@ -183,20 +197,21 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { PlatformMetricsRegistry.registerCounter( DROPPED_SPANS_COUNTER, Map.of("tenantId", k))) .increment(); + + // increment the counter when the number of spans reaches the max.span.count limit. + if (inFlightSpansPerTrace == maxSpanCountUpperLimit + || inFlightSpansPerTrace == maxSpanCountTenantLimit) { + truncatedTracesCounter + .computeIfAbsent( + key.getTenantId(), + k -> + PlatformMetricsRegistry.registerCounter( + TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k))) + .increment(); + } + // drop the span as limit is reached return true; } - // increment the counter when the number of spans reaches the max.span.count limit. - if (traceState != null - && maxSpanCountMap.containsKey(key.getTenantId()) - && traceState.getSpanIds().size() == maxSpanCountMap.get(key.getTenantId())) { - truncatedTracesCounter - .computeIfAbsent( - key.getTenantId(), - k -> - PlatformMetricsRegistry.registerCounter( - TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k))) - .increment(); - } return false; } diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 04bfa190b..4ccdced47 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -10,6 +10,7 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -77,6 +78,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir String tenantId = "tenant1"; + // create spans for trace-1 of tenant1 RawSpan span1 = RawSpan.newBuilder() .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) @@ -95,6 +97,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir .setCustomerId("tenant1") .setEvent(createEvent("event-3", "tenant1")) .build(); + + // create spans for trace-2 of tenant1 RawSpan span4 = RawSpan.newBuilder() .setTraceId(ByteBuffer.wrap("trace-2".getBytes())) @@ -107,6 +111,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir .setCustomerId("tenant1") .setEvent(createEvent("event-5", "tenant1")) .build(); + + // create spans for trace-3 of tenant1 RawSpan span6 = RawSpan.newBuilder() .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) @@ -144,6 +150,57 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir .setEvent(createEvent("event-11", "tenant1")) .build(); + // create 8 spans for tenant-2 for trace-4 + String tenant2 = "tenant2"; + RawSpan span12 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-12", tenant2)) + .build(); + RawSpan span13 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-13", tenant2)) + .build(); + RawSpan span14 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-14", tenant2)) + .build(); + RawSpan span15 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-15", tenant2)) + .build(); + RawSpan span16 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-16", tenant2)) + .build(); + RawSpan span17 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-17", tenant2)) + .build(); + RawSpan span18 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-18", tenant2)) + .build(); + RawSpan span19 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenant2) + .setEvent(createEvent("event-19", tenant2)) + .build(); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1); inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4); td.advanceWallClockTime(Duration.ofSeconds(1)); @@ -199,6 +256,19 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir // trace should be truncated with 5 spans trace = (StructuredTrace) outputTopic.readValue(); assertEquals(5, trace.getEventList().size()); + + // input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only 6 + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18); + inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19); + td.advanceWallClockTime(Duration.ofSeconds(35)); + trace = (StructuredTrace) outputTopic.readValue(); + assertEquals(6, trace.getEventList().size()); } private Event createEvent(String eventId, String tenantId) { diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf index 24ddf7725..83675f12a 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf @@ -28,6 +28,7 @@ span.window.store.retention.time.mins = ${?SPAN_WINDOW_STORE_RETENTION_TIME_MINS span.window.store.segment.size.mins = 20 span.window.store.segment.size.mins = ${?SPAN_WINDOW_STORE_SEGMENT_SIZE_MINS} +upper.max.span.count = 6 max.span.count = { tenant1 = 5 } From b2552c0e81ef09beb919cbb060680d57ad067e65 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 24 Nov 2021 16:37:24 +0530 Subject: [PATCH 2/3] fixed code style --- .../hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 4ccdced47..f93123e41 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -10,7 +10,6 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.time.Duration; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -257,7 +256,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir trace = (StructuredTrace) outputTopic.readValue(); assertEquals(5, trace.getEventList().size()); - // input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only 6 + // input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only + // 6 inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12); inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13); inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14); From 56dc177eaf0892bd0b87984502c31f42757c14be Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 24 Nov 2021 17:37:03 +0530 Subject: [PATCH 3/3] renamed config name from upper.max.span.count to default.max.span.count for global limit --- .../rawspansgrouper/RawSpanGrouperConstants.java | 2 +- .../core/rawspansgrouper/RawSpansProcessor.java | 12 ++++++------ .../configs/raw-spans-grouper/application.conf | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java index 5f01053d0..244782024 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java @@ -14,7 +14,7 @@ public class RawSpanGrouperConstants { public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY = "dataflow.metriccollection.sampling.percent"; public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count"; - public static final String UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT = "upper.max.span.count"; + public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count"; public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans"; public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces"; } diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index 39f5fd93d..b165870ba 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -2,6 +2,7 @@ import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER; @@ -10,7 +11,6 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER; -import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT; import com.typesafe.config.Config; import io.micrometer.core.instrument.Counter; @@ -65,7 +65,7 @@ public class RawSpansProcessor private To outputTopic; private double dataflowSamplingPercent = -1; private static final Map maxSpanCountMap = new HashMap<>(); - private long maxSpanCountUpperLimit = Long.MAX_VALUE; + private long defaultMaxSpanCountLimit = Long.MAX_VALUE; // counter for number of spans dropped per tenant private static final ConcurrentMap droppedSpansCounter = @@ -101,8 +101,8 @@ public void init(ProcessorContext context) { }); } - if (jobConfig.hasPath(UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT)) { - maxSpanCountUpperLimit = jobConfig.getLong(UPPER_INFLIGHT_TRACE_MAX_SPAN_COUNT); + if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) { + defaultMaxSpanCountLimit = jobConfig.getLong(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT); } this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER); @@ -177,7 +177,7 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { ? maxSpanCountMap.get(key.getTenantId()) : Long.MAX_VALUE; - if (inFlightSpansPerTrace >= maxSpanCountUpperLimit + if (inFlightSpansPerTrace >= defaultMaxSpanCountLimit || inFlightSpansPerTrace >= maxSpanCountTenantLimit) { if (logger.isDebugEnabled()) { @@ -199,7 +199,7 @@ private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { .increment(); // increment the counter when the number of spans reaches the max.span.count limit. - if (inFlightSpansPerTrace == maxSpanCountUpperLimit + if (inFlightSpansPerTrace == defaultMaxSpanCountLimit || inFlightSpansPerTrace == maxSpanCountTenantLimit) { truncatedTracesCounter .computeIfAbsent( diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf index 83675f12a..1e1197a3a 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf @@ -28,7 +28,7 @@ span.window.store.retention.time.mins = ${?SPAN_WINDOW_STORE_RETENTION_TIME_MINS span.window.store.segment.size.mins = 20 span.window.store.segment.size.mins = ${?SPAN_WINDOW_STORE_SEGMENT_SIZE_MINS} -upper.max.span.count = 6 +default.max.span.count = 6 max.span.count = { tenant1 = 5 }