From 7d7ee06e362c4feaf489d398cb4955b6ce1c534f Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Wed, 8 Jan 2025 10:09:49 +0100 Subject: [PATCH 1/2] Reactor: early propagate span in context when subscribing --- .../groovy/ReactorCoreTest.groovy | 33 ++++++- .../reactor/core/ContextSpanHelper.java | 36 +++++++ .../core/CorePublisherInstrumentation.java | 99 +++++++++++++++++++ .../core/CoreSubscriberInstrumentation.java | 29 +++--- .../src/test/groovy/ReactorCoreTest.groovy | 47 +++++++-- 5 files changed, 216 insertions(+), 28 deletions(-) create mode 100644 dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java create mode 100644 dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy index 51d9484a424..a89315a16ab 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy @@ -15,13 +15,13 @@ import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import reactor.core.publisher.Flux -import reactor.core.publisher.Hooks import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers import reactor.util.context.Context import spock.lang.Shared import java.time.Duration +import java.util.concurrent.CompletableFuture class ReactorCoreTest extends AgentTestRunner { @@ -443,12 +443,41 @@ class ReactorCoreTest extends AgentTestRunner { def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() { when: - def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count() + def mono = Flux.range(1, 100).windowUntil { it % 10 == 0 }.count() then: // we are not interested into asserting a trace structure but only that the instrumentation error count is 0 assert mono.block() == 10 } + def "span in the context has to be activated when the publisher subscribes"() { + when: + // the mono is subscribed (block) when first is active. + // However we expect that the span third will have second as parent and not first + // because we set the parent explicitly in the reactor context (dd.span key) + def result = runUnderTrace("first", { + runUnderTrace("second", { + def mono = Mono.defer { + Mono.fromCompletionStage(CompletableFuture.supplyAsync { + runUnderTrace("third", { + "hello world" + }) + }) + }.contextWrite(Context.of("dd.span", TEST_TRACER.activeSpan())) + mono + }) + .block() + }) + then: + assert result == "hello world" + assertTraces(1, { + trace(3, true) { + basicSpan(it, "first") + basicSpan(it, "second", span(0)) + basicSpan(it, "third", span(1)) + } + }) + } + @Trace(operationName = "trace-parent", resourceName = "trace-parent") def assemblePublisherUnderTrace(def publisherSupplier) { diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java new file mode 100644 index 00000000000..1926ac6db9d --- /dev/null +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java @@ -0,0 +1,36 @@ +package datadog.trace.instrumentation.reactor.core; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan; +import reactor.core.CoreSubscriber; +import reactor.util.context.Context; + +public class ContextSpanHelper { + private static final String DD_SPAN_KEY = "dd.span"; + + private ContextSpanHelper() {} + + public static AgentSpan extractSpanFromSubscriberContext(final CoreSubscriber subscriber) { + if (subscriber == null) { + return null; + } + Context context = null; + try { + context = subscriber.currentContext(); + } catch (Throwable ignored) { + } + if (context == null) { + return null; + } + if (context.hasKey(DD_SPAN_KEY)) { + Object maybeSpan = context.get(DD_SPAN_KEY); + if (maybeSpan instanceof WithAgentSpan) { + AgentSpan span = ((WithAgentSpan) maybeSpan).asAgentSpan(); + if (span != null) { + return span; + } + } + } + return null; + } +} diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java new file mode 100644 index 00000000000..8b2a7a02a9f --- /dev/null +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java @@ -0,0 +1,99 @@ +package datadog.trace.instrumentation.reactor.core; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasSuperType; +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.HashMap; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.CoreSubscriber; + +@AutoService(InstrumenterModule.class) +public class CorePublisherInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { + public CorePublisherInstrumentation() { + super("reactor-core"); + } + + @Override + public String hierarchyMarkerType() { + return "reactor.core.CoreSubscriber"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return implementsInterface(named("reactor.core.CorePublisher")) // from 3.1.7 + .or( + hasSuperType( + namedOneOf( + "reactor.core.publisher.Mono", "reactor.core.publisher.Flux"))); // < 3.1.7 + } + + @Override + public Map contextStore() { + final Map ret = new HashMap<>(); + ret.put("org.reactivestreams.Subscriber", AgentSpan.class.getName()); + ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName()); + return ret; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".ContextSpanHelper", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + named("subscribe") + .and(not(isStatic())) + .and(takesArguments(1)) + .and(takesArgument(0, named("reactor.core.CoreSubscriber"))), + getClass().getName() + "$PropagateContextSpanOnSubscribe"); + } + + public static class PropagateContextSpanOnSubscribe { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope before( + @Advice.This Publisher self, @Advice.Argument(0) final CoreSubscriber subscriber) { + final AgentSpan span = extractSpanFromSubscriberContext(subscriber); + + if (span != null) { + // we force storing the span state linked to publisher and subscriber to the one explicitly + // present in the context so that, if PublisherInstrumentation is kicking in after this + // advice, it won't override that active span + InstrumentationContext.get(Publisher.class, AgentSpan.class).put(self, span); + InstrumentationContext.get(Subscriber.class, AgentSpan.class).put(subscriber, span); + return activateSpan(span); + } + return null; + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void after(@Advice.Enter final AgentScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java index 6782d269ec0..269072b86de 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java @@ -4,18 +4,17 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.CoreSubscriber; -import reactor.util.context.Context; @AutoService(InstrumenterModule.class) public class CoreSubscriberInstrumentation extends InstrumenterModule.Tracing @@ -34,6 +33,13 @@ public ElementMatcher hierarchyMatcher() { return implementsInterface(named(hierarchyMarkerType())); } + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".ContextSpanHelper", + }; + } + @Override public void methodAdvice(MethodTransformer transformer) { transformer.applyAdvice( @@ -44,22 +50,9 @@ public void methodAdvice(MethodTransformer transformer) { public static class PropagateSpanInScopeAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AgentScope before(@Advice.This final CoreSubscriber self) { - Context context = null; - try { - context = self.currentContext(); - } catch (Throwable ignored) { - } - if (context == null) { - return null; - } - if (context.hasKey("dd.span")) { - Object maybeSpan = context.get("dd.span"); - if (maybeSpan instanceof WithAgentSpan) { - AgentSpan span = ((WithAgentSpan) maybeSpan).asAgentSpan(); - if (span != null) { - return activateSpan(span); - } - } + final AgentSpan span = extractSpanFromSubscriberContext(self); + if (span != null) { + return activateSpan(span); } return null; } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy index 9dd1c0616f6..1e59f5c4471 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy @@ -21,6 +21,7 @@ import reactor.util.context.Context import spock.lang.Shared import java.time.Duration +import java.util.concurrent.CompletableFuture class ReactorCoreTest extends AgentTestRunner { @@ -440,6 +441,44 @@ class ReactorCoreTest extends AgentTestRunner { }) } + def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() { + when: + def mono = Flux.range(1, 100).windowUntil { it % 10 == 0 }.count() + then: + // we are not interested into asserting a trace structure but only that the instrumentation error count is 0 + assert mono.block() == 11 + } + + + def "span in the context has to be activated when the publisher subscribes"() { + when: + // the mono is subscribed (block) when first is active. + // However we expect that the span third will have second as parent and not first + // because we set the parent explicitly in the reactor context (dd.span key) + def result = runUnderTrace("first", { + runUnderTrace("second", { + def mono = Mono.defer { + Mono.fromCompletionStage(CompletableFuture.supplyAsync { + runUnderTrace("third", { + "hello world" + }) + }) + }.subscriberContext(Context.of("dd.span", TEST_TRACER.activeSpan())) + mono + }) + .block() + }) + then: + assert result == "hello world" + assertTraces(1, { + trace(3, true) { + basicSpan(it, "first") + basicSpan(it, "second", span(0)) + basicSpan(it, "third", span(1)) + } + }) + } + @Trace(operationName = "trace-parent", resourceName = "trace-parent") def assemblePublisherUnderTrace(def publisherSupplier) { def span = startSpan("publisher-parent") @@ -490,14 +529,6 @@ class ReactorCoreTest extends AgentTestRunner { span.finish() } - def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() { - when: - def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count() - then: - // we are not interested into asserting a trace structure but only that the instrumentation error count is 0 - assert mono.block() == 11 - } - @Trace(operationName = "addOne", resourceName = "addOne") def static addOneFunc(int i) { return i + 1 From 5e09240cf42f2d660e10e5e643502ea330fdc550 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Wed, 8 Jan 2025 13:42:18 +0100 Subject: [PATCH 2/2] review --- .../instrumentation/reactor/core/ContextSpanHelper.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java index 1926ac6db9d..ed184afd022 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java @@ -2,6 +2,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan; +import javax.annotation.Nullable; import reactor.core.CoreSubscriber; import reactor.util.context.Context; @@ -10,6 +11,7 @@ public class ContextSpanHelper { private ContextSpanHelper() {} + @Nullable public static AgentSpan extractSpanFromSubscriberContext(final CoreSubscriber subscriber) { if (subscriber == null) { return null; @@ -25,10 +27,7 @@ public static AgentSpan extractSpanFromSubscriberContext(final CoreSubscriber if (context.hasKey(DD_SPAN_KEY)) { Object maybeSpan = context.get(DD_SPAN_KEY); if (maybeSpan instanceof WithAgentSpan) { - AgentSpan span = ((WithAgentSpan) maybeSpan).asAgentSpan(); - if (span != null) { - return span; - } + return ((WithAgentSpan) maybeSpan).asAgentSpan(); } } return null;