From 8d53b585134a317ea0618f734a33258e28780e5c Mon Sep 17 00:00:00 2001 From: Brice Dutheil Date: Tue, 15 Jul 2025 18:31:54 +0200 Subject: [PATCH 1/8] chore(css): Add peer tags, span kind and trace root flag to MetricKey bucket --- .../metrics/ConflatingMetricsAggregator.java | 18 +- .../trace/common/metrics/MetricKey.java | 49 ++- .../metrics/SerializingMetricWriter.java | 19 +- .../common/metrics/AggregateMetricTest.groovy | 4 +- .../ConflatingMetricAggregatorTest.groovy | 283 +++++++++++++++--- .../common/metrics/FootprintForkedTest.groovy | 2 + .../SerializingMetricWriterTest.groovy | 64 +++- .../trace/common/metrics/SimpleSpan.groovy | 34 ++- .../groovy/MetricsIntegrationTest.groovy | 4 +- 9 files changed, 405 insertions(+), 72 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 2f2d5118110..1bd535b8cb0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -25,6 +25,8 @@ import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; +import datadog.trace.util.TraceUtils; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -273,7 +275,10 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getOperationName(), span.getType(), span.getHttpStatusCode(), - isSynthetic(span)); + isSynthetic(span), + span.isTopLevel(), + span.getTag(SPAN_KIND, ""), + getPeerTags(span)); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); if (null == key) { @@ -308,6 +313,17 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } + private List getPeerTags(CoreSpan span) { + List peerTags = new ArrayList<>(); + for (String peerTag : features.peerTags()) { + Object value = span.getTag(peerTag); + if (value != null) { + peerTags.add(peerTag + ":" + TraceUtils.normalizeTag(value.toString())); + } + } + return peerTags; + } + private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java index 4ba23db6d57..be992324464 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java @@ -3,6 +3,8 @@ import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.Arrays; +import java.util.List; /** The aggregation key for tracked metrics. */ public final class MetricKey { @@ -13,6 +15,9 @@ public final class MetricKey { private final int httpStatusCode; private final boolean synthetics; private final int hash; + private final boolean isTraceRoot; + private final UTF8BytesString spanKind; + private final UTF8BytesString[] peerTags; public MetricKey( CharSequence resource, @@ -20,18 +25,35 @@ public MetricKey( CharSequence operationName, CharSequence type, int httpStatusCode, - boolean synthetics) { + boolean synthetics, + boolean isTraceRoot, + CharSequence spanKind, + List peerTags) { this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource); this.service = null == service ? EMPTY : UTF8BytesString.create(service); this.operationName = null == operationName ? EMPTY : UTF8BytesString.create(operationName); this.type = null == type ? EMPTY : UTF8BytesString.create(type); this.httpStatusCode = httpStatusCode; this.synthetics = synthetics; - // unrolled polynomial hashcode which avoids allocating varargs - // the constants are 31^5, 31^4, 31^3, 31^2, 31^1, 31^0 + this.isTraceRoot = isTraceRoot; + this.spanKind = null == spanKind ? EMPTY : UTF8BytesString.create(spanKind); + this.peerTags = new UTF8BytesString[peerTags.size()]; + for (int i = 0; i < peerTags.size(); i++) { + this.peerTags[i] = UTF8BytesString.create(peerTags.get(i)); + } + + // Unrolled polynomial hashcode to avoid varargs allocation + // and eliminate data dependency between iterations as in Arrays.hashCode. + // Coefficient constants are powers of 31, with integer overflow (hence negative numbers). + // See + // https://richardstartin.github.io/posts/collecting-rocks-and-benchmarks + // https://richardstartin.github.io/posts/still-true-in-java-9-handwritten-hash-codes-are-faster this.hash = - 28629151 * this.resource.hashCode() - + 923521 * this.service.hashCode() + -196513505 * Boolean.hashCode(this.isTraceRoot) + + -1807454463 * this.spanKind.hashCode() + + 887_503_681 * Arrays.hashCode(this.peerTags) + + 28_629_151 * this.resource.hashCode() + + 923_521 * this.service.hashCode() + 29791 * this.operationName.hashCode() + 961 * this.type.hashCode() + 31 * httpStatusCode @@ -62,6 +84,18 @@ public boolean isSynthetics() { return synthetics; } + public boolean isTraceRoot() { + return isTraceRoot; + } + + public UTF8BytesString getSpanKind() { + return spanKind; + } + + public UTF8BytesString[] getPeerTags() { + return peerTags; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -75,7 +109,10 @@ public boolean equals(Object o) { && resource.equals(metricKey.resource) && service.equals(metricKey.service) && operationName.equals(metricKey.operationName) - && type.equals(metricKey.type); + && type.equals(metricKey.type) + && isTraceRoot == metricKey.isTraceRoot + && spanKind.equals(metricKey.spanKind) + && Arrays.equals(peerTags, metricKey.peerTags); } return false; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 485b3e90bbc..82db7883386 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -31,6 +31,9 @@ public final class SerializingMetricWriter implements MetricWriter { private static final byte[] OK_SUMMARY = "OkSummary".getBytes(ISO_8859_1); private static final byte[] ERROR_SUMMARY = "ErrorSummary".getBytes(ISO_8859_1); private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1); + private static final byte[] IS_TRACE_ROOT = "IsTraceRoot".getBytes(ISO_8859_1); + private static final byte[] SPAN_KIND = "SpanKind".getBytes(ISO_8859_1); + private static final byte[] PEER_TAGS = "PeerTags".getBytes(ISO_8859_1); private final WellKnownTags wellKnownTags; private final WritableFormatter writer; @@ -93,8 +96,7 @@ public void startBucket(int metricCount, long start, long duration) { @Override public void add(MetricKey key, AggregateMetric aggregate) { - - writer.startMap(12); + writer.startMap(15); writer.writeUTF8(NAME); writer.writeUTF8(key.getOperationName()); @@ -114,6 +116,19 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(SYNTHETICS); writer.writeBoolean(key.isSynthetics()); + writer.writeUTF8(IS_TRACE_ROOT); + writer.writeBoolean(key.isTraceRoot()); + + writer.writeUTF8(SPAN_KIND); + writer.writeUTF8(key.getSpanKind()); + + writer.writeUTF8(PEER_TAGS); + UTF8BytesString[] peerTags = key.getPeerTags(); + writer.startArray(peerTags.length); + for (UTF8BytesString peerTag : peerTags) { + writer.writeUTF8(peerTag); + } + writer.writeUTF8(HITS); writer.writeInt(aggregate.getHitCount()); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy index f93e3af1540..17b3510c115 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy @@ -51,7 +51,7 @@ class AggregateMetricTest extends DDSpecification { given: AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG)) - Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false)) + Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault"])) batch.add(0L, 10) batch.add(0L, 10) batch.add(0L, 10) @@ -126,7 +126,7 @@ class AggregateMetricTest extends DDSpecification { def "consistent under concurrent attempts to read and write"() { given: AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false) + MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault"]) BlockingDeque queue = new LinkedBlockingDeque<>(1000) ExecutorService reader = Executors.newSingleThreadExecutor() int writerCount = 10 diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 6c635dc393c..f6ddd1e7746 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -102,22 +102,36 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: CountDownLatch latch = new CountDownLatch(1) - aggregator.publish([new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK)]) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "baz") + ]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 - } + 1 * writer.add(new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) >> { MetricKey key, AggregateMetric value -> + value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -131,14 +145,15 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.spanKindsToComputedStats() >> ["client", "server", "producer", "consumer"] + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: CountDownLatch latch = new CountDownLatch(1) - def span = Spy(new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK)) - span.getTag(SPAN_KIND) >> kind + def span = new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, kind) aggregator.publish([span]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) @@ -146,9 +161,20 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered == statsComputed (statsComputed ? 1 : 0) * writer.startBucket(1, _, _) - (statsComputed ? 1 : 0) * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 0 && value.getDuration() == 100 - } + (statsComputed ? 1 : 0) * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + kind, + [] + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -162,14 +188,74 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null | false } + def "should create bucket for each set of peer tags"() { + setup: + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.spanKindsToComputedStats() >> ["grault"] + features.peerTags() >>> [["country"], ["country", "georegion"],] + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe"), + new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe") + ]) + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered + 1 * writer.startBucket(2, _, _) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + "grault", + ["country:france"] + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + "grault", + ["country:france", "georegion:europe"] + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.finishBucket() >> { latch.countDown() } + + cleanup: + aggregator.close() + } + def "measured spans do not contribute to top level count"() { setup: MetricWriter writer = Mock(MetricWriter) Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, - sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() @@ -177,6 +263,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { CountDownLatch latch = new CountDownLatch(1) aggregator.publish([ new SimpleSpan("service", "operation", "resource", "type", measured, topLevel, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) @@ -184,9 +271,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 - } + 1 * writer.add(new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + topLevel, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 + }) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -205,13 +302,15 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.spanKindsToComputedStats() >> ["baz"] + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) long duration = 100 List trace = [ - new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK), - new SimpleSpan("service1", "operation1", "resource1", "type", false, false, false, 0, 0, HTTP_OK), - new SimpleSpan("service2", "operation2", "resource2", "type", true, false, false, 0, duration * 2, HTTP_OK) + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"), + new SimpleSpan("service1", "operation1", "resource1", "type", false, false, false, 0, 0, HTTP_OK).setTag(SPAN_KIND, "baz"), + new SimpleSpan("service2", "operation2", "resource2", "type", true, false, false, 0, duration * 2, HTTP_OK).setTag(SPAN_KIND, "baz") ] aggregator.start() @@ -228,12 +327,32 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.finishBucket() >> { latch.countDown() } 1 * writer.startBucket(2, _, SECONDS.toNanos(reportingInterval)) - 1 * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - } - 1 * writer.add(new MetricKey("resource2", "service2", "operation2", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration * 2 - } + 1 * writer.add(new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == count && value.getDuration() == count * duration + }) + 1 * writer.add(new MetricKey( + "resource2", + "service2", + "operation2", + "type", + HTTP_OK, + false, + false, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == count && value.getDuration() == count * duration * 2 + }) cleanup: aggregator.close() @@ -249,6 +368,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.spanKindsToComputedStats() >> ["baz"] + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 @@ -259,6 +380,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 11; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) } aggregator.report() @@ -268,11 +390,31 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(10, _, SECONDS.toNanos(reportingInterval)) for (int i = 1; i < 11; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) >> { MetricKey key, AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + } } - 0 * writer.add(new MetricKey("resource", "service0", "operation", "type", HTTP_OK, false), _) + 0 * writer.add(new MetricKey( + "resource", + "service0", + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -286,6 +428,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.spanKindsToComputedStats() >> ["baz"] + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 @@ -296,6 +440,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) } aggregator.report() @@ -305,9 +450,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } 1 * writer.finishBucket() >> { latch.countDown() } @@ -316,6 +471,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 1; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) } aggregator.report() @@ -325,11 +481,31 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(4, _, SECONDS.toNanos(reportingInterval)) for (int i = 1; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ),{ AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } - 0 * writer.add(new MetricKey("resource", "service0", "operation", "type", HTTP_OK, false), _) + 0 * writer.add(new MetricKey( + "resource", + "service0", + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -343,6 +519,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.spanKindsToComputedStats() >> ["quux"] + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 @@ -353,6 +531,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "quux") ]) } aggregator.report() @@ -362,9 +541,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "quux", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } 1 * writer.finishBucket() >> { latch.countDown() } @@ -387,6 +576,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.spanKindsToComputedStats() >> ["garply"] + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 @@ -397,6 +588,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "garply") ]) } def latchTriggered = latch.await(2, SECONDS) @@ -405,9 +597,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(1)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "garply", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } 1 * writer.finishBucket() >> { latch.countDown() } @@ -422,6 +624,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 @@ -460,6 +663,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 @@ -512,6 +716,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> false + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) final spans = [ diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index 4d50d7f156c..030183a3121 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -28,6 +28,8 @@ class FootprintForkedTest extends DDSpecification { ValidatingSink sink = new ValidatingSink(latch) DDAgentFeaturesDiscovery features = Stub(DDAgentFeaturesDiscovery) { it.supportsMetrics() >> true + it.spanKindsToComputedStats() >> [] + it.peerTags() >> [] } ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index d7c3e514ed6..994a4b7d046 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -43,11 +43,50 @@ class SerializingMetricWriterTest extends DDSpecification { where: content << [ [ - Pair.of(new MetricKey("resource1", "service1", "operation1", "type", 0, false), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L))), - Pair.of(new MetricKey("resource2", "service2", "operation2", "type2", 200, true), new AggregateMetric().recordDurations(9, new AtomicLongArray(1L))) + Pair.of( + new MetricKey( + "resource1", + "service1", + "operation1", + "type", + 0, + false, + false, + "client", + ["country:canada", "georegion:amer", "peer.service:remote-service"] + ), + new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) + ), + Pair.of( + new MetricKey( + "resource2", + "service2", + "operation2", + "type2", + 200, + true, + false, + "producer", + ["country:canada", "georegion:amer", "peer.service:remote-service"] + ), + new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) + ) ], (0..10000).collect({ i -> - Pair.of(new MetricKey("resource" + i, "service" + i, "operation" + i, "type", 0, false), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L))) + Pair.of( + new MetricKey( + "resource" + i, + "service" + i, + "operation" + i, + "type", + 0, + false, + false, + "producer", + ["messaging.destination" + i] + ), + new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) + ) }) ] withProcessTags << [true, false] @@ -107,8 +146,8 @@ class SerializingMetricWriterTest extends DDSpecification { for (Pair pair : content) { MetricKey key = pair.getLeft() AggregateMetric value = pair.getRight() - int size = unpacker.unpackMapHeader() - assert size == 12 + int metricMapSize = unpacker.unpackMapHeader() + assert metricMapSize == 15 int elementCount = 0 assert unpacker.unpackString() == "Name" assert unpacker.unpackString() == key.getOperationName() as String @@ -128,6 +167,19 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == "Synthetics" assert unpacker.unpackBoolean() == key.isSynthetics() ++elementCount + assert unpacker.unpackString() == "IsTraceRoot" + assert unpacker.unpackBoolean() == key.isTraceRoot() + ++elementCount + assert unpacker.unpackString() == "SpanKind" + assert unpacker.unpackString() == key.getSpanKind() as String + ++elementCount + assert unpacker.unpackString() == "PeerTags" + int peerTagsLength = unpacker.unpackArrayHeader() + assert peerTagsLength == key.getPeerTags().length + for (int i = 0; i < peerTagsLength; i++) { + assert unpacker.unpackString() == key.getPeerTags()[i] as String + } + ++elementCount assert unpacker.unpackString() == "Hits" assert unpacker.unpackInt() == value.getHitCount() ++elementCount @@ -146,7 +198,7 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == "ErrorSummary" validateSketch(unpacker) ++elementCount - assert elementCount == size + assert elementCount == metricMapSize } validated = true } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy index 008059398f7..a3fcd9e79d7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy @@ -19,7 +19,10 @@ class SimpleSpan implements CoreSpan { private final long duration private final long startTime - SimpleSpan(String serviceName, + private final Map tags = [:] + + SimpleSpan( + String serviceName, String operationName, CharSequence resourceName, String type, @@ -28,7 +31,8 @@ class SimpleSpan implements CoreSpan { boolean error, long startTime, long duration, - int statusCode) { + int statusCode + ) { this.serviceName = serviceName this.operationName = operationName this.resourceName = resourceName @@ -38,7 +42,7 @@ class SimpleSpan implements CoreSpan { this.error = error this.startTime = startTime this.duration = duration - this.statusCode = (short)statusCode + this.statusCode = (short) statusCode } @Override @@ -118,57 +122,60 @@ class SimpleSpan implements CoreSpan { @Override SimpleSpan setTag(String tag, String value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, boolean value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, int value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, long value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, double value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, Number value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, CharSequence value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, Object value) { + tags.put(tag, value) return this } @Override SimpleSpan removeTag(String tag) { + tags.remove(tag) return this } @Override U getTag(CharSequence name, U defaultValue) { - return defaultValue + def tagValue = tags.get(String.valueOf(name)) ?: defaultValue + return tagValue != null ? (U) tagValue : defaultValue } @Override U getTag(CharSequence name) { - return null + return getTag(name, null) } @Override @@ -197,8 +204,7 @@ class SimpleSpan implements CoreSpan { } @Override - void processTagsAndBaggage(MetadataConsumer consumer) { - } + void processTagsAndBaggage(MetadataConsumer consumer) {} @Override SimpleSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index fa96bd90218..fcb0aa12ef7 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -34,11 +34,11 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) writer.add( - new MetricKey("resource1", "service1", "operation1", "sql", 0, false), + new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", ["grault"]), new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) ) writer.add( - new MetricKey("resource2", "service2", "operation2", "web", 200, false), + new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", ["grault"]), new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) ) writer.finishBucket() From a4614dea7052755207ccd299d053cc65e42d4cb4 Mon Sep 17 00:00:00 2001 From: Brice Dutheil Date: Thu, 17 Jul 2025 16:18:27 +0200 Subject: [PATCH 2/8] chore(css): Use a map to create the metric key Also delays the creation of the Utf8ByteString at serialization time. Note that `writer.writeUTF8` emits a header corresponding to the length of the value being written, calling this method again will count as an entry in the array. A possible idea was to use a cache to store the computation of the peerTags, however `Utf8ByteString` is not concatenable / appendable, which is necessary to have the proper encoding. Creating a new "temporary" `Utf8ByteString`, was replaced by a direct call to `String: :getBytes`. --- .../metrics/ConflatingMetricsAggregator.java | 11 ++++---- .../trace/common/metrics/MetricKey.java | 20 ++++++------- .../metrics/SerializingMetricWriter.java | 20 ++++++++++--- .../common/metrics/AggregateMetricTest.groovy | 4 +-- .../ConflatingMetricAggregatorTest.groovy | 28 +++++++++---------- .../common/metrics/FootprintForkedTest.groovy | 3 +- .../SerializingMetricWriterTest.groovy | 13 +++++---- .../groovy/MetricsIntegrationTest.groovy | 4 +-- 8 files changed, 57 insertions(+), 46 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 1bd535b8cb0..1b38ec384d6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -25,9 +25,8 @@ import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; -import datadog.trace.util.TraceUtils; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -293,7 +292,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { // returning false means that either the batch can't take any // more data, or it has already been consumed if (batch.add(tag, durationNanos)) { - // added to a pending batch prior to consumption + // added to a pending batch prior to consumption, // so skip publishing to the queue (we also know // the key isn't rare enough to override the sampler) return false; @@ -313,12 +312,12 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } - private List getPeerTags(CoreSpan span) { - List peerTags = new ArrayList<>(); + private Map getPeerTags(CoreSpan span) { + Map peerTags = new HashMap<>(); for (String peerTag : features.peerTags()) { Object value = span.getTag(peerTag); if (value != null) { - peerTags.add(peerTag + ":" + TraceUtils.normalizeTag(value.toString())); + peerTags.put(peerTag, value.toString()); } } return peerTags; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java index be992324464..de1d55b804b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java @@ -3,8 +3,8 @@ import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import java.util.Arrays; -import java.util.List; +import java.util.Collections; +import java.util.Map; /** The aggregation key for tracked metrics. */ public final class MetricKey { @@ -17,7 +17,7 @@ public final class MetricKey { private final int hash; private final boolean isTraceRoot; private final UTF8BytesString spanKind; - private final UTF8BytesString[] peerTags; + private final Map peerTags; public MetricKey( CharSequence resource, @@ -28,7 +28,7 @@ public MetricKey( boolean synthetics, boolean isTraceRoot, CharSequence spanKind, - List peerTags) { + Map peerTags) { this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource); this.service = null == service ? EMPTY : UTF8BytesString.create(service); this.operationName = null == operationName ? EMPTY : UTF8BytesString.create(operationName); @@ -37,10 +37,7 @@ public MetricKey( this.synthetics = synthetics; this.isTraceRoot = isTraceRoot; this.spanKind = null == spanKind ? EMPTY : UTF8BytesString.create(spanKind); - this.peerTags = new UTF8BytesString[peerTags.size()]; - for (int i = 0; i < peerTags.size(); i++) { - this.peerTags[i] = UTF8BytesString.create(peerTags.get(i)); - } + this.peerTags = peerTags == null ? Collections.emptyMap() : peerTags; // Unrolled polynomial hashcode to avoid varargs allocation // and eliminate data dependency between iterations as in Arrays.hashCode. @@ -48,10 +45,11 @@ public MetricKey( // See // https://richardstartin.github.io/posts/collecting-rocks-and-benchmarks // https://richardstartin.github.io/posts/still-true-in-java-9-handwritten-hash-codes-are-faster + this.hash = -196513505 * Boolean.hashCode(this.isTraceRoot) + -1807454463 * this.spanKind.hashCode() - + 887_503_681 * Arrays.hashCode(this.peerTags) + + 887_503_681 * this.peerTags.hashCode() // possibly unroll here has well. + 28_629_151 * this.resource.hashCode() + 923_521 * this.service.hashCode() + 29791 * this.operationName.hashCode() @@ -92,7 +90,7 @@ public UTF8BytesString getSpanKind() { return spanKind; } - public UTF8BytesString[] getPeerTags() { + public Map getPeerTags() { return peerTags; } @@ -112,7 +110,7 @@ public boolean equals(Object o) { && type.equals(metricKey.type) && isTraceRoot == metricKey.isTraceRoot && spanKind.equals(metricKey.spanKind) - && Arrays.equals(peerTags, metricKey.peerTags); + && peerTags.equals(metricKey.peerTags); } return false; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 82db7883386..7c20d158f2c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -1,6 +1,7 @@ package datadog.trace.common.metrics; import static java.nio.charset.StandardCharsets.ISO_8859_1; +import static java.nio.charset.StandardCharsets.UTF_8; import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.WritableFormatter; @@ -8,6 +9,8 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.util.TraceUtils; +import java.util.Map; public final class SerializingMetricWriter implements MetricWriter { @@ -123,10 +126,19 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(key.getSpanKind()); writer.writeUTF8(PEER_TAGS); - UTF8BytesString[] peerTags = key.getPeerTags(); - writer.startArray(peerTags.length); - for (UTF8BytesString peerTag : peerTags) { - writer.writeUTF8(peerTag); + Map peerTags = key.getPeerTags(); + writer.startArray(peerTags.size()); + + StringBuilder peerTagBuilder = new StringBuilder(); + for (Map.Entry peerTag : peerTags.entrySet()) { + peerTagBuilder.setLength(0); + String toWrite = + peerTagBuilder + .append(peerTag.getKey()) + .append(':') + .append(TraceUtils.normalizeTag(peerTag.getValue())) + .toString(); + writer.writeUTF8(toWrite.getBytes(UTF_8)); } writer.writeUTF8(HITS); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy index 17b3510c115..14754d93886 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy @@ -51,7 +51,7 @@ class AggregateMetricTest extends DDSpecification { given: AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG)) - Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault"])) + Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"])) batch.add(0L, 10) batch.add(0L, 10) batch.add(0L, 10) @@ -126,7 +126,7 @@ class AggregateMetricTest extends DDSpecification { def "consistent under concurrent attempts to read and write"() { given: AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault"]) + MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"]) BlockingDeque queue = new LinkedBlockingDeque<>(1000) ExecutorService reader = Executors.newSingleThreadExecutor() int writerCount = 10 diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index f6ddd1e7746..cf53da0671c 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -128,7 +128,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [] + [:] ), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 } @@ -171,7 +171,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, kind, - [] + [:] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -224,7 +224,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "grault", - ["country:france"] + ["country":"france"] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -238,7 +238,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "grault", - ["country:france", "georegion:europe"] + ["country":"france", "georegion":"europe"] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -280,7 +280,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, topLevel, "baz", - [] + [:] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 }) @@ -336,7 +336,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "baz", - [] + [:] ), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration }) @@ -349,7 +349,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "baz", - [] + [:] ), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration * 2 }) @@ -399,7 +399,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [] + [:] ), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration } @@ -413,7 +413,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [] + [:] ), _) 1 * writer.finishBucket() >> { latch.countDown() } @@ -459,7 +459,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [] + [:] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -490,7 +490,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [] + [:] ),{ AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -504,7 +504,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [] + [:] ), _) 1 * writer.finishBucket() >> { latch.countDown() } @@ -550,7 +550,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "quux", - [] + [:] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -606,7 +606,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "garply", - [] + [:] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index 030183a3121..0f949f1ddc8 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -40,8 +40,7 @@ class FootprintForkedTest extends DDSpecification { 1000, 1000, 100, - SECONDS - ) + SECONDS) // Removing the 'features' as it's a mock, and mocks are heavyweight, e.g. around 22MiB def baseline = footprint(aggregator, features) aggregator.start() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 994a4b7d046..8e64253942c 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -53,7 +53,7 @@ class SerializingMetricWriterTest extends DDSpecification { false, false, "client", - ["country:canada", "georegion:amer", "peer.service:remote-service"] + ["country":"canada", "georegion":"amer", "peer.service":"remote-service"] ), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ), @@ -67,7 +67,7 @@ class SerializingMetricWriterTest extends DDSpecification { true, false, "producer", - ["country:canada", "georegion:amer", "peer.service:remote-service"] + ["country":"canada", "georegion":"amer", "peer.service":"remote-service"] ), new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) ) @@ -83,7 +83,7 @@ class SerializingMetricWriterTest extends DDSpecification { false, false, "producer", - ["messaging.destination" + i] + ["messaging.destination" : "dest" + i] ), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ) @@ -175,9 +175,12 @@ class SerializingMetricWriterTest extends DDSpecification { ++elementCount assert unpacker.unpackString() == "PeerTags" int peerTagsLength = unpacker.unpackArrayHeader() - assert peerTagsLength == key.getPeerTags().length + assert peerTagsLength == key.getPeerTags().size() for (int i = 0; i < peerTagsLength; i++) { - assert unpacker.unpackString() == key.getPeerTags()[i] as String + def string = unpacker.unpackString() + def separatorPos = string.indexOf(':') + def tagVal = key.getPeerTags()[string.substring(0, separatorPos)] + assert tagVal == string.substring(separatorPos + 1) } ++elementCount assert unpacker.unpackString() == "Hits" diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index fcb0aa12ef7..b92ed985ca1 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -34,11 +34,11 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) writer.add( - new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", ["grault"]), + new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", ["grault":"quux"]), new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) ) writer.add( - new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", ["grault"]), + new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", ["grault":"quux"]), new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) ) writer.finishBucket() From 908548df2654cb267017c3b8e24cac0e8d181431 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Mon, 11 Aug 2025 11:14:00 +0200 Subject: [PATCH 3/8] Improves CSS peer tag aggregation (#9336) * Add jmh for metrics aggregation * Cache peer tags to avoid too many strings/utf8 conversions * Use span kind cache * Fix tests --- .../ConflatingMetricsAggregatorBenchmark.java | 102 ++++++++++++++++++ .../metrics/ConflatingMetricsAggregator.java | 29 ++++- .../trace/common/metrics/MetricKey.java | 10 +- .../metrics/SerializingMetricWriter.java | 18 +--- .../common/metrics/AggregateMetricTest.groovy | 5 +- .../ConflatingMetricAggregatorTest.groovy | 28 ++--- .../SerializingMetricWriterTest.groovy | 21 ++-- .../groovy/MetricsIntegrationTest.groovy | 5 +- 8 files changed, 169 insertions(+), 49 deletions(-) create mode 100644 dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java new file mode 100644 index 00000000000..e15d771fc0b --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -0,0 +1,102 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.monitor.Monitoring; +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.util.Strings; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@State(Scope.Benchmark) +@Warmup(iterations = 1, time = 30, timeUnit = SECONDS) +@Measurement(iterations = 3, time = 30, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class ConflatingMetricsAggregatorBenchmark { + private final DDAgentFeaturesDiscovery featuresDiscovery = + new FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()); + private final ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + new NullSink(), + 2048, + 2048); + private final List> spans = generateTrace(64); + + static List> generateTrace(int len) { + final List> trace = new ArrayList<>(); + for (int i = 0; i < len; i++) { + SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1); + span.setTag("peer.hostname", Strings.random(10)); + trace.add(span); + } + return trace; + } + + static class NullSink implements Sink { + + @Override + public void register(EventListener listener) {} + + @Override + public void accept(int messageCount, ByteBuffer buffer) {} + } + + static class FixedAgentFeaturesDiscovery extends DDAgentFeaturesDiscovery { + private final Set peerTags; + private final Set spanKinds; + + public FixedAgentFeaturesDiscovery(Set peerTags, Set spanKinds) { + // create a fixed discovery with metrics enabled + super(null, Monitoring.DISABLED, null, false, true); + this.peerTags = peerTags; + this.spanKinds = spanKinds; + } + + @Override + public void discover() { + // do nothing + } + + @Override + public boolean supportsMetrics() { + return true; + } + + @Override + public Set peerTags() { + return peerTags; + } + + @Override + public Set spanKindsToComputedStats() { + return spanKinds; + } + } + + @Benchmark + public void benchmark(Blackhole blackhole) { + blackhole.consume(aggregator.publish(spans)); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 1b38ec384d6..f2b6a96857c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -15,6 +15,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; @@ -25,8 +26,8 @@ import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -34,6 +35,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.maps.NonBlockingHashMap; import org.jctools.queues.MpscCompoundQueue; import org.jctools.queues.SpmcArrayQueue; @@ -50,6 +52,21 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); + private static final DDCache SPAN_KINDS = + DDCaches.newFixedSizeCache(16); + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = + DDCaches.newFixedSizeCache( + 64); // it can be unbounded since those values are returned by the agent and should be + // under control. 64 entries is enough in this case to contain all the peer tags. + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private final Set ignoredResources; @@ -276,7 +293,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getHttpStatusCode(), isSynthetic(span), span.isTopLevel(), - span.getTag(SPAN_KIND, ""), + SPAN_KINDS.computeIfAbsent(span.getTag(SPAN_KIND, ""), UTF8BytesString::create), getPeerTags(span)); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); @@ -312,12 +329,14 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } - private Map getPeerTags(CoreSpan span) { - Map peerTags = new HashMap<>(); + private List getPeerTags(CoreSpan span) { + List peerTags = new ArrayList<>(); for (String peerTag : features.peerTags()) { Object value = span.getTag(peerTag); if (value != null) { - peerTags.put(peerTag, value.toString()); + final Pair, Function> pair = + PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); + peerTags.add(pair.getLeft().computeIfAbsent(value.toString(), pair.getRight())); } } return peerTags; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java index de1d55b804b..73aca7d6daf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java @@ -4,7 +4,7 @@ import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import java.util.Collections; -import java.util.Map; +import java.util.List; /** The aggregation key for tracked metrics. */ public final class MetricKey { @@ -17,7 +17,7 @@ public final class MetricKey { private final int hash; private final boolean isTraceRoot; private final UTF8BytesString spanKind; - private final Map peerTags; + private final List peerTags; public MetricKey( CharSequence resource, @@ -28,7 +28,7 @@ public MetricKey( boolean synthetics, boolean isTraceRoot, CharSequence spanKind, - Map peerTags) { + List peerTags) { this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource); this.service = null == service ? EMPTY : UTF8BytesString.create(service); this.operationName = null == operationName ? EMPTY : UTF8BytesString.create(operationName); @@ -37,7 +37,7 @@ public MetricKey( this.synthetics = synthetics; this.isTraceRoot = isTraceRoot; this.spanKind = null == spanKind ? EMPTY : UTF8BytesString.create(spanKind); - this.peerTags = peerTags == null ? Collections.emptyMap() : peerTags; + this.peerTags = peerTags == null ? Collections.emptyList() : peerTags; // Unrolled polynomial hashcode to avoid varargs allocation // and eliminate data dependency between iterations as in Arrays.hashCode. @@ -90,7 +90,7 @@ public UTF8BytesString getSpanKind() { return spanKind; } - public Map getPeerTags() { + public List getPeerTags() { return peerTags; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 7c20d158f2c..964c51b2cbf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -1,7 +1,6 @@ package datadog.trace.common.metrics; import static java.nio.charset.StandardCharsets.ISO_8859_1; -import static java.nio.charset.StandardCharsets.UTF_8; import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.WritableFormatter; @@ -9,8 +8,7 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import datadog.trace.util.TraceUtils; -import java.util.Map; +import java.util.List; public final class SerializingMetricWriter implements MetricWriter { @@ -126,19 +124,11 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(key.getSpanKind()); writer.writeUTF8(PEER_TAGS); - Map peerTags = key.getPeerTags(); + final List peerTags = key.getPeerTags(); writer.startArray(peerTags.size()); - StringBuilder peerTagBuilder = new StringBuilder(); - for (Map.Entry peerTag : peerTags.entrySet()) { - peerTagBuilder.setLength(0); - String toWrite = - peerTagBuilder - .append(peerTag.getKey()) - .append(':') - .append(TraceUtils.normalizeTag(peerTag.getValue())) - .toString(); - writer.writeUTF8(toWrite.getBytes(UTF_8)); + for (UTF8BytesString peerTag : peerTags) { + writer.writeUTF8(peerTag); } writer.writeUTF8(HITS); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy index 14754d93886..3c7a247cae3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy @@ -1,5 +1,6 @@ package datadog.trace.common.metrics +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.test.util.DDSpecification import java.util.concurrent.BlockingDeque @@ -51,7 +52,7 @@ class AggregateMetricTest extends DDSpecification { given: AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG)) - Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"])) + Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")])) batch.add(0L, 10) batch.add(0L, 10) batch.add(0L, 10) @@ -126,7 +127,7 @@ class AggregateMetricTest extends DDSpecification { def "consistent under concurrent attempts to read and write"() { given: AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"]) + MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")]) BlockingDeque queue = new LinkedBlockingDeque<>(1000) ExecutorService reader = Executors.newSingleThreadExecutor() int writerCount = 10 diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index cf53da0671c..5f40490a3a5 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -128,7 +128,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 } @@ -171,7 +171,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, kind, - [:] + [] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -224,7 +224,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "grault", - ["country":"france"] + [UTF8BytesString.create("country:france")] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -238,7 +238,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "grault", - ["country":"france", "georegion":"europe"] + [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -280,7 +280,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, topLevel, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 }) @@ -336,7 +336,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration }) @@ -349,7 +349,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration * 2 }) @@ -399,7 +399,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration } @@ -413,7 +413,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) 1 * writer.finishBucket() >> { latch.countDown() } @@ -459,7 +459,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -490,7 +490,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ),{ AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -504,7 +504,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) 1 * writer.finishBucket() >> { latch.countDown() } @@ -550,7 +550,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "quux", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -606,7 +606,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "garply", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 8e64253942c..a36db50b441 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -4,6 +4,7 @@ import datadog.trace.api.Config import datadog.trace.api.ProcessTags import datadog.trace.api.WellKnownTags import datadog.trace.api.Pair +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.test.util.DDSpecification import org.msgpack.core.MessagePack import org.msgpack.core.MessageUnpacker @@ -53,7 +54,11 @@ class SerializingMetricWriterTest extends DDSpecification { false, false, "client", - ["country":"canada", "georegion":"amer", "peer.service":"remote-service"] + [ + UTF8BytesString.create("country:canada"), + UTF8BytesString.create("georegion:amer"), + UTF8BytesString.create("peer.service:remote-service") + ] ), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ), @@ -67,7 +72,11 @@ class SerializingMetricWriterTest extends DDSpecification { true, false, "producer", - ["country":"canada", "georegion":"amer", "peer.service":"remote-service"] + [ + UTF8BytesString.create("country:canada"), + UTF8BytesString.create("georegion:amer"), + UTF8BytesString.create("peer.service:remote-service") + ], ), new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) ) @@ -83,7 +92,7 @@ class SerializingMetricWriterTest extends DDSpecification { false, false, "producer", - ["messaging.destination" : "dest" + i] + [UTF8BytesString.create("messaging.destination:dest" + i)] ), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ) @@ -177,10 +186,8 @@ class SerializingMetricWriterTest extends DDSpecification { int peerTagsLength = unpacker.unpackArrayHeader() assert peerTagsLength == key.getPeerTags().size() for (int i = 0; i < peerTagsLength; i++) { - def string = unpacker.unpackString() - def separatorPos = string.indexOf(':') - def tagVal = key.getPeerTags()[string.substring(0, separatorPos)] - assert tagVal == string.substring(separatorPos + 1) + def unpackedPeerTag = unpacker.unpackString() + assert unpackedPeerTag == key.getPeerTags()[i].toString() } ++elementCount assert unpacker.unpackString() == "Hits" diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index b92ed985ca1..9984b9700d0 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -2,6 +2,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.communication.http.OkHttpUtils import datadog.trace.api.Config import datadog.trace.api.WellKnownTags +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.common.metrics.AggregateMetric import datadog.trace.common.metrics.EventListener import datadog.trace.common.metrics.MetricKey @@ -34,11 +35,11 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) writer.add( - new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", ["grault":"quux"]), + new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")]), new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) ) writer.add( - new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", ["grault":"quux"]), + new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")]), new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) ) writer.finishBucket() From f565188a805cf443c984d79b6fa0f0601c7ec53e Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Mon, 11 Aug 2025 11:58:00 +0200 Subject: [PATCH 4/8] Hardcode eligible span kind since agent backpropagated are deprecated --- .../ddagent/DDAgentFeaturesDiscovery.java | 12 ------------ .../ddagent/DDAgentFeaturesDiscoveryTest.groovy | 6 ------ .../ConflatingMetricsAggregatorBenchmark.java | 5 ----- .../metrics/ConflatingMetricsAggregator.java | 15 ++++++++++++++- .../metrics/ConflatingMetricAggregatorTest.groovy | 15 +++++---------- .../common/metrics/FootprintForkedTest.groovy | 1 - 6 files changed, 19 insertions(+), 35 deletions(-) diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index 789747216b0..3956030d0f6 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -92,7 +92,6 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy { private volatile String version; private volatile String telemetryProxyEndpoint; private volatile Set peerTags = emptySet(); - private volatile Set spanKindsToComputedStats = emptySet(); private long lastTimeDiscovered; @@ -127,7 +126,6 @@ private void reset() { lastTimeDiscovered = 0; telemetryProxyEndpoint = null; peerTags = emptySet(); - spanKindsToComputedStats = emptySet(); } /** Run feature discovery, unconditionally. */ @@ -310,12 +308,6 @@ private boolean processInfoResponse(String response) { peer_tags instanceof List ? unmodifiableSet(new HashSet<>((List) peer_tags)) : emptySet(); - - Object span_kinds = map.get("span_kinds_stats_computed"); - spanKindsToComputedStats = - span_kinds instanceof List - ? unmodifiableSet(new HashSet<>((List) span_kinds)) - : emptySet(); } try { state = Strings.sha256(response); @@ -377,10 +369,6 @@ public Set peerTags() { return peerTags; } - public Set spanKindsToComputedStats() { - return spanKindsToComputedStats; - } - public String getMetricsEndpoint() { return metricsEndpoint; } diff --git a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy index 1ee382f5775..5ac884e0814 100644 --- a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy +++ b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy @@ -462,12 +462,6 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { "tablename", "topicname" ) - features.spanKindsToComputedStats().containsAll( - "client", - "consumer", - "producer", - "server" - ) } def "should send container id as header on the info request and parse the hash in the response"() { diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index e15d771fc0b..cc3d037498d 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -88,11 +88,6 @@ public boolean supportsMetrics() { public Set peerTags() { return peerTags; } - - @Override - public Set spanKindsToComputedStats() { - return spanKinds; - } } @Benchmark diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index f2b6a96857c..0732ec47109 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -3,6 +3,10 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT; import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; @@ -10,6 +14,7 @@ import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; +import static java.util.Collections.unmodifiableSet; import static java.util.concurrent.TimeUnit.SECONDS; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; @@ -27,7 +32,9 @@ import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; @@ -69,6 +76,12 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; + private static final Set ELIGIBLE_SPAN_KINDS = + unmodifiableSet( + new HashSet<>( + Arrays.asList( + SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER))); + private final Set ignoredResources; private final Queue batchPool; private final NonBlockingHashMap pending; @@ -280,7 +293,7 @@ private boolean shouldComputeMetric(CoreSpan span) { private boolean spanKindEligible(CoreSpan span) { final Object spanKind = span.getTag(SPAN_KIND); // use toString since it could be a CharSequence... - return spanKind != null && features.spanKindsToComputedStats().contains(spanKind.toString()); + return spanKind != null && ELIGIBLE_SPAN_KINDS.contains(spanKind.toString()); } private boolean publish(CoreSpan span, boolean isTopLevel) { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 5f40490a3a5..e9fb925cdaf 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -144,7 +144,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["client", "server", "producer", "consumer"] features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) @@ -183,6 +182,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { where: kind | statsComputed "client" | true + "producer" | true + "consumer" | true UTF8BytesString.create("server") | true "internal" | false null | false @@ -194,7 +195,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["grault"] features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, 10, queueSize, reportingInterval, SECONDS) @@ -203,9 +203,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { when: CountDownLatch latch = new CountDownLatch(1) aggregator.publish([ - new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK) + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) .setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe"), - new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK) + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) .setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe") ]) aggregator.report() @@ -302,7 +302,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["baz"] features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) @@ -368,7 +367,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["baz"] features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) @@ -428,7 +426,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["baz"] features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) @@ -491,7 +488,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { true, "baz", [] - ),{ AggregateMetric value -> + ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) } @@ -519,7 +516,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["quux"] features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) @@ -576,7 +572,6 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["garply"] features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index 0f949f1ddc8..4a96460d604 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -28,7 +28,6 @@ class FootprintForkedTest extends DDSpecification { ValidatingSink sink = new ValidatingSink(latch) DDAgentFeaturesDiscovery features = Stub(DDAgentFeaturesDiscovery) { it.supportsMetrics() >> true - it.spanKindsToComputedStats() >> [] it.peerTags() >> [] } ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( From ce2279b504fa36e49ab758f627014e7b0273c497 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Tue, 12 Aug 2025 08:32:00 +0200 Subject: [PATCH 5/8] revisit peer tags aggregation rules according to the rfc --- .../metrics/ConflatingMetricsAggregator.java | 48 +++++++++++----- .../ConflatingMetricAggregatorTest.groovy | 57 +++++++++++++++++-- 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 0732ec47109..88e6607628a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -1,10 +1,12 @@ package datadog.trace.common.metrics; import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT; +import static datadog.trace.api.DDTags.BASE_SERVICE; import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; @@ -76,12 +78,15 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; - private static final Set ELIGIBLE_SPAN_KINDS = + private static final Set ELIGIBLE_SPAN_KINDS_FOR_METRICS = unmodifiableSet( new HashSet<>( Arrays.asList( SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER))); + private static final Set ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = + unmodifiableSet(new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER))); + private final Set ignoredResources; private final Queue batchPool; private final NonBlockingHashMap pending; @@ -293,10 +298,11 @@ private boolean shouldComputeMetric(CoreSpan span) { private boolean spanKindEligible(CoreSpan span) { final Object spanKind = span.getTag(SPAN_KIND); // use toString since it could be a CharSequence... - return spanKind != null && ELIGIBLE_SPAN_KINDS.contains(spanKind.toString()); + return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); } private boolean publish(CoreSpan span, boolean isTopLevel) { + final CharSequence spanKind = span.getTag(SPAN_KIND, ""); MetricKey newKey = new MetricKey( span.getResourceName(), @@ -306,8 +312,9 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getHttpStatusCode(), isSynthetic(span), span.isTopLevel(), - SPAN_KINDS.computeIfAbsent(span.getTag(SPAN_KIND, ""), UTF8BytesString::create), - getPeerTags(span)); + SPAN_KINDS.computeIfAbsent( + spanKind, UTF8BytesString::create), // save repeated utf8 conversions + getPeerTags(span, spanKind.toString())); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); if (null == key) { @@ -342,17 +349,32 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } - private List getPeerTags(CoreSpan span) { - List peerTags = new ArrayList<>(); - for (String peerTag : features.peerTags()) { - Object value = span.getTag(peerTag); - if (value != null) { - final Pair, Function> pair = - PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - peerTags.add(pair.getLeft().computeIfAbsent(value.toString(), pair.getRight())); + private List getPeerTags(CoreSpan span, String spanKind) { + if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { + List peerTags = new ArrayList<>(); + for (String peerTag : features.peerTags()) { + Object value = span.getTag(peerTag); + if (value != null) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); + peerTags.add( + cacheAndCreator + .getLeft() + .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + } + } + return peerTags; + } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { + // in this case only the base service should be aggregated if present + final String baseService = span.getTag(BASE_SERVICE); + if (baseService != null) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); + return Collections.singletonList( + cacheAndCreator.getLeft().computeIfAbsent(baseService, cacheAndCreator.getRight())); } } - return peerTags; + return Collections.emptyList(); } private static boolean isSynthetic(CoreSpan span) { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index e9fb925cdaf..2e67cd2bece 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -204,9 +204,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { CountDownLatch latch = new CountDownLatch(1) aggregator.publish([ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) - .setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe"), + .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"), new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) - .setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe") + .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe") ]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) @@ -223,7 +223,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { HTTP_OK, false, false, - "grault", + "client", [UTF8BytesString.create("country:france")] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 @@ -237,7 +237,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { HTTP_OK, false, false, - "grault", + "client", [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 @@ -248,6 +248,55 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.close() } + def "should aggregate the right peer tags for kind #kind"() { + setup: + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.peerTags() >> ["peer.hostname", "_dd.base_service"] + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, kind).setTag("peer.hostname", "localhost").setTag("_dd.base_service", "test") + ]) + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered + 1 * writer.startBucket(1, _, _) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + kind, + expectedPeerTags + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.finishBucket() >> { latch.countDown() } + + cleanup: + aggregator.close() + + where: + kind | expectedPeerTags + "client" | [UTF8BytesString.create("peer.hostname:localhost"), UTF8BytesString.create("_dd.base_service:test")] + "internal" | [UTF8BytesString.create("_dd.base_service:test")] + "server" | [] + } + def "measured spans do not contribute to top level count"() { setup: MetricWriter writer = Mock(MetricWriter) From 81d8a79de7fa542c566217469a50879601d639c0 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Wed, 20 Aug 2025 17:56:12 +0200 Subject: [PATCH 6/8] IsTraceRoot is a tristate --- .../datadog/trace/common/metrics/SerializingMetricWriter.java | 2 +- .../trace/common/metrics/SerializingMetricWriterTest.groovy | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 964c51b2cbf..48d688be700 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -118,7 +118,7 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeBoolean(key.isSynthetics()); writer.writeUTF8(IS_TRACE_ROOT); - writer.writeBoolean(key.isTraceRoot()); + writer.writeInt(key.isTraceRoot() ? 1 : 2); // tristate (0 unknown, 1 true, 2 false) writer.writeUTF8(SPAN_KIND); writer.writeUTF8(key.getSpanKind()); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index a36db50b441..bcc87fd66a7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -177,7 +177,7 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackBoolean() == key.isSynthetics() ++elementCount assert unpacker.unpackString() == "IsTraceRoot" - assert unpacker.unpackBoolean() == key.isTraceRoot() + assert unpacker.unpackInt() == (key.isTraceRoot() ? 1 : 2) ++elementCount assert unpacker.unpackString() == "SpanKind" assert unpacker.unpackString() == key.getSpanKind() as String From 34da211622e9f0ff1d839b11fcf228a4d6f2e699 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Thu, 21 Aug 2025 13:31:01 +0200 Subject: [PATCH 7/8] Don't confuse trace root with top levels --- .../metrics/ConflatingMetricsAggregator.java | 2 +- .../ConflatingMetricAggregatorTest.groovy | 22 +++++++++---------- .../trace/common/metrics/SimpleSpan.groovy | 7 ++++-- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 88e6607628a..cdc3039f910 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -311,7 +311,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getType(), span.getHttpStatusCode(), isSynthetic(span), - span.isTopLevel(), + span.getParentId() == 0, SPAN_KINDS.computeIfAbsent( spanKind, UTF8BytesString::create), // save repeated utf8 conversions getPeerTags(span, spanKind.toString())); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 2e67cd2bece..155af9123a6 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -126,7 +126,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - true, + false, "baz", [] ), _) >> { MetricKey key, AggregateMetric value -> @@ -197,7 +197,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -256,7 +256,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -327,7 +327,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - topLevel, + false, "baz", [] ), { AggregateMetric value -> @@ -444,7 +444,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - true, + false, "baz", [] ), _) >> { MetricKey key, AggregateMetric value -> @@ -458,7 +458,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - true, + false, "baz", [] ), _) @@ -503,7 +503,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - true, + false, "baz", [] ), { AggregateMetric value -> @@ -534,7 +534,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - true, + false, "baz", [] ), { AggregateMetric value -> @@ -548,7 +548,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - true, + false, "baz", [] ), _) @@ -593,7 +593,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "type", HTTP_OK, false, - true, + false, "quux", [] ), { AggregateMetric value -> @@ -631,7 +631,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { CountDownLatch latch = new CountDownLatch(1) for (int i = 0; i < 5; ++i) { aggregator.publish([ - new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK, true) .setTag(SPAN_KIND, "garply") ]) } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy index a3fcd9e79d7..72250d64654 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy @@ -13,6 +13,7 @@ class SimpleSpan implements CoreSpan { private final String type private final boolean measured private final boolean topLevel + private final boolean traceRoot private final boolean error private final short statusCode @@ -31,7 +32,8 @@ class SimpleSpan implements CoreSpan { boolean error, long startTime, long duration, - int statusCode + int statusCode, + boolean traceRoot = false ) { this.serviceName = serviceName this.operationName = operationName @@ -39,6 +41,7 @@ class SimpleSpan implements CoreSpan { this.type = type this.measured = measured this.topLevel = topLevel + this.traceRoot = traceRoot this.error = error this.startTime = startTime this.duration = duration @@ -77,7 +80,7 @@ class SimpleSpan implements CoreSpan { @Override long getParentId() { - return DDSpanId.ZERO + return traceRoot ? DDSpanId.ZERO : 1L } @Override From bcf4d04f2bdcd7304b90288d53e4800f15fd4b1b Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Thu, 21 Aug 2025 13:45:41 +0200 Subject: [PATCH 8/8] Fix build after rebase --- .../common/metrics/ConflatingMetricsAggregatorBenchmark.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index cc3d037498d..3ae8a050f32 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -7,6 +7,7 @@ import datadog.communication.monitor.Monitoring; import datadog.trace.api.WellKnownTags; import datadog.trace.core.CoreSpan; +import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.Strings; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -39,6 +40,7 @@ public class ConflatingMetricsAggregatorBenchmark { new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), featuresDiscovery, + HealthMetrics.NO_OP, new NullSink(), 2048, 2048);