From ff2c775d8c44dbd393f2d609f02086edc1c83115 Mon Sep 17 00:00:00 2001 From: Daniel Mohedano Date: Tue, 15 Jul 2025 12:11:28 +0200 Subject: [PATCH 1/3] fix: avoid race conditions on feature discovery --- .../ddagent/SharedCommunicationObjects.java | 12 +++- .../trace/common/writer/WriterFactory.java | 3 + .../java/datadog/trace/core/CoreTracer.java | 11 +++- .../common/writer/WriterFactoryTest.groovy | 59 ++++++++++++++++--- 4 files changed, 74 insertions(+), 11 deletions(-) diff --git a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java index b17a94a9d05..ff5a8440031 100644 --- a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java +++ b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java @@ -138,6 +138,16 @@ public void setFeaturesDiscovery(DDAgentFeaturesDiscovery featuresDiscovery) { this.featuresDiscovery = featuresDiscovery; } + // for testing + public void setAgentUrl(HttpUrl agentUrl) { + this.agentUrl = agentUrl; + } + + // for testing + public void setOkHttpClient(OkHttpClient okHttpClient) { + this.okHttpClient = okHttpClient; + } + public DDAgentFeaturesDiscovery featuresDiscovery(Config config) { DDAgentFeaturesDiscovery ret = featuresDiscovery; if (ret == null) { @@ -159,7 +169,7 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) { ret.discover(); // safe to run on same thread } else { // avoid performing blocking I/O operation on application thread - AgentTaskScheduler.INSTANCE.execute(ret::discover); + AgentTaskScheduler.INSTANCE.execute(ret::discoverIfOutdated); } } featuresDiscovery = ret; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java index baa55d5dbf1..a3244fc3768 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java @@ -86,6 +86,7 @@ public static Writer createWriter( // The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is // enabled, check if we can use the IntakeWriter instead. if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) { + featuresDiscovery.discoverIfOutdated(); if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) { configuredType = DD_INTAKE_WRITER_TYPE; } else { @@ -94,6 +95,7 @@ public static Writer createWriter( } } if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isLlmObsEnabled())) { + featuresDiscovery.discoverIfOutdated(); if (featuresDiscovery.supportsEvpProxy() || config.isLlmObsAgentlessEnabled()) { configuredType = DD_INTAKE_WRITER_TYPE; } else { @@ -186,6 +188,7 @@ private static RemoteApi createDDIntakeRemoteApi( SharedCommunicationObjects commObjects, DDAgentFeaturesDiscovery featuresDiscovery, TrackType trackType) { + featuresDiscovery.discoverIfOutdated(); boolean evpProxySupported = featuresDiscovery.supportsEvpProxy(); boolean useProxyApi = (evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled()) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index 28ba0832e30..dd3cea27f6d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -669,6 +669,14 @@ private CoreTracer( this.writer = writer; } + DDAgentFeaturesDiscovery featuresDiscovery = + sharedCommunicationObjects.featuresDiscovery(config); + + if (config.isCiVisibilityEnabled()) { + // ensure updated discovery and sync if the another discovery currently being done + featuresDiscovery.discoverIfOutdated(); + } + if (config.isCiVisibilityEnabled() && (config.isCiVisibilityAgentlessEnabled() || sharedCommunicationObjects.featuresDiscovery(config).supportsEvpProxy())) { @@ -732,8 +740,7 @@ private CoreTracer( if (config.isCiVisibilityAgentlessEnabled()) { addTraceInterceptor(DDIntakeTraceInterceptor.INSTANCE); } else { - DDAgentFeaturesDiscovery featuresDiscovery = - sharedCommunicationObjects.featuresDiscovery(config); + featuresDiscovery.discoverIfOutdated(); if (!featuresDiscovery.supportsEvpProxy()) { // CI Test Cycle protocol is not available addTraceInterceptor(CiVisibilityApmProtocolInterceptor.INSTANCE); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy index e3d9ce9c427..926ec81907a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy @@ -1,5 +1,7 @@ package datadog.trace.common.writer +import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE + import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.Config @@ -10,10 +12,16 @@ import datadog.trace.common.writer.ddintake.DDEvpProxyApi import datadog.trace.common.writer.ddintake.DDIntakeApi import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification - +import groovy.json.JsonBuilder import java.util.stream.Collectors - -import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE +import okhttp3.Call +import okhttp3.HttpUrl +import okhttp3.MediaType +import okhttp3.OkHttpClient +import okhttp3.Protocol +import okhttp3.Request +import okhttp3.Response +import okhttp3.ResponseBody class WriterFactoryTest extends DDSpecification { @@ -27,19 +35,30 @@ class WriterFactoryTest extends DDSpecification { config.isCiVisibilityEnabled() >> true config.isCiVisibilityCodeCoverageEnabled() >> false - def agentFeaturesDiscovery = Mock(DDAgentFeaturesDiscovery) - agentFeaturesDiscovery.getEvpProxyEndpoint() >> DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT - agentFeaturesDiscovery.supportsContentEncodingHeadersWithEvpProxy() >> evpProxySupportsCompression + // Mock agent info response + def response = buildHttpResponse(hasEvpProxy, evpProxySupportsCompression, HttpUrl.parse(config.agentUrl + "/info")) + + // Mock HTTP client that simulates delayed response for async feature discovery + def mockCall = Mock(Call) + def mockHttpClient = Mock(OkHttpClient) + mockCall.execute() >> { + // Add a delay + sleep(400) + return response + } + mockHttpClient.newCall(_ as Request) >> mockCall + // Create SharedCommunicationObjects with mocked HTTP client def sharedComm = new SharedCommunicationObjects() - sharedComm.setFeaturesDiscovery(agentFeaturesDiscovery) + sharedComm.okHttpClient = mockHttpClient + sharedComm.agentUrl = HttpUrl.parse(config.agentUrl) sharedComm.createRemaining(config) def sampler = Mock(Sampler) when: - agentFeaturesDiscovery.supportsEvpProxy() >> hasEvpProxy config.ciVisibilityAgentlessEnabled >> isCiVisibilityAgentlessEnabled + def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, configuredType) def apis @@ -77,4 +96,28 @@ class WriterFactoryTest extends DDSpecification { "not-found" | false | false | true | DDIntakeWriter | [DDIntakeApi] | true "not-found" | false | false | false | DDAgentWriter | [DDAgentApi] | false } + + Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) { + def endpoints = [] + if (hasEvpProxy && evpProxySupportsCompression) { + endpoints = [DDAgentFeaturesDiscovery.V4_EVP_PROXY_ENDPOINT] + } else if (hasEvpProxy) { + endpoints = [DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT] + } else { + endpoints = [DDAgentFeaturesDiscovery.V4_ENDPOINT] + } + + def response = [ + "version" : "7.40.0", + "endpoints" : endpoints, + ] + + def builder = new Response.Builder() + .code(200) + .message("OK") + .protocol(Protocol.HTTP_1_1) + .request(new Request.Builder().url(agentUrl.resolve("/info")).build()) + .body(ResponseBody.create(MediaType.parse("application/json"), new JsonBuilder(response).toString())) + return builder.build() + } } From a304e949af555867e5fbad62684fef1a7ebeaaa4 Mon Sep 17 00:00:00 2001 From: Daniel Mohedano Date: Tue, 15 Jul 2025 12:47:13 +0200 Subject: [PATCH 2/3] commit suggestion Co-authored-by: Nikita Tkachenko <121111529+nikita-tkachenko-datadog@users.noreply.github.com> --- dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index dd3cea27f6d..b767c9c7b53 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -679,7 +679,7 @@ private CoreTracer( if (config.isCiVisibilityEnabled() && (config.isCiVisibilityAgentlessEnabled() - || sharedCommunicationObjects.featuresDiscovery(config).supportsEvpProxy())) { + || featuresDiscovery.supportsEvpProxy())) { pendingTraceBuffer = PendingTraceBuffer.discarding(); traceCollectorFactory = new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics); From 4e880c76b9e23a4c2db71789f867252ebeedab2c Mon Sep 17 00:00:00 2001 From: Daniel Mohedano Date: Tue, 15 Jul 2025 13:05:05 +0200 Subject: [PATCH 3/3] fix: spotless and issue with groovy mocking --- .../ddagent/SharedCommunicationObjects.java | 10 ---------- .../src/main/java/datadog/trace/core/CoreTracer.java | 3 +-- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java index ff5a8440031..b855731bc08 100644 --- a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java +++ b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java @@ -138,16 +138,6 @@ public void setFeaturesDiscovery(DDAgentFeaturesDiscovery featuresDiscovery) { this.featuresDiscovery = featuresDiscovery; } - // for testing - public void setAgentUrl(HttpUrl agentUrl) { - this.agentUrl = agentUrl; - } - - // for testing - public void setOkHttpClient(OkHttpClient okHttpClient) { - this.okHttpClient = okHttpClient; - } - public DDAgentFeaturesDiscovery featuresDiscovery(Config config) { DDAgentFeaturesDiscovery ret = featuresDiscovery; if (ret == null) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index b767c9c7b53..498c8a9c27d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -678,8 +678,7 @@ private CoreTracer( } if (config.isCiVisibilityEnabled() - && (config.isCiVisibilityAgentlessEnabled() - || featuresDiscovery.supportsEvpProxy())) { + && (config.isCiVisibilityAgentlessEnabled() || featuresDiscovery.supportsEvpProxy())) { pendingTraceBuffer = PendingTraceBuffer.discarding(); traceCollectorFactory = new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics);