From c9b88fd9d5c76e327fb598ce44000ced872f9c6f Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 26 Jul 2024 14:57:13 +0200 Subject: [PATCH 1/3] Implements spring WebClient request body capturing support --- .../agent/impl/context/BodyCaptureImpl.java | 42 +++++- .../impl/context/BodyCaptureImplTest.java | 20 ++- .../serialize/DslJsonSerializerTest.java | 3 +- .../common/RequestBodyCaptureRegistry.java | 2 +- .../agent/httpclient/HttpClientHelper.java | 41 ++++-- .../RequestBodyRecordingHelper.java | 10 +- .../RequestBodyRecordingHelperTest.java | 3 +- .../springwebclient/BodyCaptureRegistry.java | 30 ++++ .../ClientHttpConnectorInstrumentation.java | 102 +++++++++++++ .../ClientHttpRequestInstrumentation.java | 134 ++++++++++++++++++ .../ClientRequestHeaderGetter.java | 29 ++++ ...ClientExchangeFunctionInstrumentation.java | 4 +- ...ic.apm.agent.sdk.ElasticApmInstrumentation | 2 + .../WebClientInstrumentationIT.java | 14 ++ .../WebClientInstrumentationTest.java | 55 +++++++ .../HttpUrlConnectionInstrumentation.java | 2 +- .../agent/tracer/metadata/BodyCapture.java | 53 +++++-- 17 files changed, 503 insertions(+), 43 deletions(-) create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/BodyCaptureRegistry.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpConnectorInstrumentation.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientRequestHeaderGetter.java diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java index e6560f1b82..3e786b7334 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java @@ -48,6 +48,8 @@ public void recycle(ByteBuffer object) { private enum CaptureState { NOT_ELIGIBLE, ELIGIBLE, + PRECONDITIONS_PASSED, + PRECONDITIONS_FAILED, STARTED } @@ -95,17 +97,43 @@ public boolean isEligibleForCapturing() { } @Override - public boolean startCapture(@Nullable String requestCharset, int numBytesToCapture) { + public boolean havePreconditionsBeenChecked() { + return state == CaptureState.PRECONDITIONS_PASSED + || state == CaptureState.PRECONDITIONS_FAILED + || state == CaptureState.STARTED; + } + + @Override + public void markPreconditionsFailed() { + synchronized (this) { + if (state != CaptureState.ELIGIBLE) { + throw new IllegalStateException("state is " + state); + } + state = CaptureState.PRECONDITIONS_FAILED; + } + } + + @Override + public void markPreconditionsPassed(@Nullable String requestCharset, int numBytesToCapture) { if (numBytesToCapture > WebConfiguration.MAX_BODY_CAPTURE_BYTES) { throw new IllegalArgumentException("Capturing " + numBytesToCapture + " bytes is not supported, maximum is " + WebConfiguration.MAX_BODY_CAPTURE_BYTES + " bytes"); } - if (state == CaptureState.ELIGIBLE) { + synchronized (this) { + if (state == CaptureState.ELIGIBLE) { + if (requestCharset != null) { + this.charset.append(requestCharset); + } + this.numBytesToCapture = numBytesToCapture; + state = CaptureState.PRECONDITIONS_PASSED; + } + } + } + + @Override + public boolean startCapture() { + if (state == CaptureState.PRECONDITIONS_PASSED) { synchronized (this) { - if (state == CaptureState.ELIGIBLE) { - if (requestCharset != null) { - this.charset.append(requestCharset); - } - this.numBytesToCapture = numBytesToCapture; + if (state == CaptureState.PRECONDITIONS_PASSED) { state = CaptureState.STARTED; return true; } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/context/BodyCaptureImplTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/context/BodyCaptureImplTest.java index c1e4e0ea31..26ea3172c0 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/context/BodyCaptureImplTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/context/BodyCaptureImplTest.java @@ -32,7 +32,8 @@ public class BodyCaptureImplTest { public void testAppendTruncation() { BodyCaptureImpl capture = new BodyCaptureImpl(); capture.markEligibleForCapturing(); - capture.startCapture("foobar", 10); + capture.markPreconditionsPassed("foobar", 10); + capture.startCapture(); assertThat(capture.isFull()).isFalse(); capture.append("123Hello World!".getBytes(StandardCharsets.UTF_8), 3, 5); @@ -55,21 +56,28 @@ public void testLifecycle() { BodyCaptureImpl capture = new BodyCaptureImpl(); assertThat(capture.isEligibleForCapturing()).isFalse(); - assertThat(capture.startCapture("foobar", 42)) + assertThat(capture.havePreconditionsBeenChecked()).isFalse(); + assertThat(capture.startCapture()) .isFalse(); assertThatThrownBy(() -> capture.append((byte) 42)).isInstanceOf(IllegalStateException.class); capture.markEligibleForCapturing(); assertThat(capture.isEligibleForCapturing()).isTrue(); + assertThat(capture.havePreconditionsBeenChecked()).isFalse(); assertThatThrownBy(() -> capture.append((byte) 42)).isInstanceOf(IllegalStateException.class); - assertThat(capture.startCapture("foobar", 42)) - .isTrue(); + capture.markPreconditionsPassed("foobar", 42); + assertThat(capture.isEligibleForCapturing()).isTrue(); + assertThat(capture.havePreconditionsBeenChecked()).isTrue(); + + + assertThat(capture.startCapture()).isTrue(); capture.append((byte) 42); //ensure no exception thrown // startCapture should return true only once - assertThat(capture.startCapture("foobar", 42)) - .isFalse(); + assertThat(capture.havePreconditionsBeenChecked()).isTrue(); + assertThat(capture.startCapture()).isFalse(); + assertThat(capture.havePreconditionsBeenChecked()).isTrue(); capture.resetState(); assertThat(capture.getCharset()).isNull(); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/DslJsonSerializerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/DslJsonSerializerTest.java index e2c76649aa..834a9c7f26 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/DslJsonSerializerTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/DslJsonSerializerTest.java @@ -475,7 +475,8 @@ private SpanImpl createSpanWithRequestBody(@Nullable byte[] bodyBytes, @Nullable SpanImpl span = new SpanImpl(tracer); BodyCaptureImpl bodyCapture = span.getContext().getHttp().getRequestBody(); bodyCapture.markEligibleForCapturing(); - bodyCapture.startCapture(charset, WebConfiguration.MAX_BODY_CAPTURE_BYTES); + bodyCapture.markPreconditionsPassed(charset, WebConfiguration.MAX_BODY_CAPTURE_BYTES); + bodyCapture.startCapture(); if (bodyBytes != null) { bodyCapture.append(bodyBytes, 0, bodyBytes.length); diff --git a/apm-agent-plugins/apm-apache-httpclient/apm-apache-httpclient-common/src/main/java/co/elastic/apm/agent/httpclient/common/RequestBodyCaptureRegistry.java b/apm-agent-plugins/apm-apache-httpclient/apm-apache-httpclient-common/src/main/java/co/elastic/apm/agent/httpclient/common/RequestBodyCaptureRegistry.java index 052591ba6e..052626b80a 100644 --- a/apm-agent-plugins/apm-apache-httpclient/apm-apache-httpclient-common/src/main/java/co/elastic/apm/agent/httpclient/common/RequestBodyCaptureRegistry.java +++ b/apm-agent-plugins/apm-apache-httpclient/apm-apache-httpclient-common/src/main/java/co/elastic/apm/agent/httpclient/common/RequestBodyCaptureRegistry.java @@ -56,7 +56,7 @@ public static void potentiallyCaptureRequestBody( ApacheHttpClientEntityAccessor adapter, TextHeaderGetter headerGetter ) { - if (HttpClientHelper.startRequestBodyCapture(abstractSpan, request, headerGetter)) { + if (HttpClientHelper.checkAndStartRequestBodyCapture(abstractSpan, request, headerGetter)) { Span span = (Span) abstractSpan; byte[] simpleBytes = adapter.getSimpleBodyBytes(request); if (simpleBytes != null) { diff --git a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java index 61bb7e7e66..19a8b560ed 100644 --- a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java +++ b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java @@ -27,6 +27,7 @@ import co.elastic.apm.agent.tracer.TraceState; import co.elastic.apm.agent.tracer.configuration.WebConfiguration; import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter; +import co.elastic.apm.agent.tracer.metadata.BodyCapture; import javax.annotation.Nullable; import java.net.URI; @@ -75,29 +76,41 @@ public static Span startHttpClientSpan(TraceState activeContext, String me return span; } - public static boolean startRequestBodyCapture(@Nullable AbstractSpan abstractSpan, R request, TextHeaderGetter headerGetter) { + public static void checkBodyCapturePreconditions(@Nullable AbstractSpan abstractSpan, R request, TextHeaderGetter headerGetter) { if (!(abstractSpan instanceof Span)) { - return false; + return; } Span span = (Span) abstractSpan; - if (!span.getContext().getHttp().getRequestBody().isEligibleForCapturing()) { - return false; + BodyCapture bodyCapture = span.getContext().getHttp().getRequestBody(); + if (!bodyCapture.isEligibleForCapturing()) { + return; + } + if (bodyCapture.havePreconditionsBeenChecked()) { + return; } WebConfiguration webConfig = GlobalTracer.get().getConfig(WebConfiguration.class); int byteCount = webConfig.getCaptureClientRequestBytes(); - if (byteCount == 0) { - return false; - } - List contentTypes = webConfig.getCaptureContentTypes(); - String contentTypeHeader = headerGetter.getFirstHeader("Content-Type", request); - if (contentTypeHeader == null) { - contentTypeHeader = ""; + if (byteCount > 0) { + List contentTypes = webConfig.getCaptureContentTypes(); + String contentTypeHeader = headerGetter.getFirstHeader("Content-Type", request); + if (contentTypeHeader == null) { + contentTypeHeader = ""; + } + if (WildcardMatcher.anyMatch(contentTypes, contentTypeHeader) != null) { + bodyCapture.markPreconditionsPassed(extractCharsetFromContentType(contentTypeHeader), byteCount); + return; + } } - if (WildcardMatcher.anyMatch(contentTypes, contentTypeHeader) == null) { + bodyCapture.markPreconditionsFailed(); + } + + public static boolean checkAndStartRequestBodyCapture(@Nullable AbstractSpan abstractSpan, R request, TextHeaderGetter headerGetter) { + if (!(abstractSpan instanceof Span)) { return false; } - String charset = extractCharsetFromContentType(contentTypeHeader); - return span.getContext().getHttp().getRequestBody().startCapture(charset, byteCount); + checkBodyCapturePreconditions(abstractSpan, request, headerGetter); + Span span = (Span) abstractSpan; + return span.getContext().getHttp().getRequestBody().startCapture(); } //Visible for testing diff --git a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java index 789471ae47..80846353ed 100644 --- a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java +++ b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java @@ -4,7 +4,7 @@ import co.elastic.apm.agent.tracer.SpanEndListener; import co.elastic.apm.agent.tracer.metadata.BodyCapture; -class RequestBodyRecordingHelper implements SpanEndListener> { +public class RequestBodyRecordingHelper implements SpanEndListener> { /** * We do not need to participate in span reference counting here. @@ -21,17 +21,21 @@ public RequestBodyRecordingHelper(Span clientSpan) { } } - void appendToBody(byte b) { + + public boolean appendToBody(byte b) { if (clientSpan != null) { BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody(); requestBody.append(b); if (requestBody.isFull()) { releaseSpan(); + } else { + return true; } } + return false; } - void appendToBody(byte[] b, int off, int len) { + public void appendToBody(byte[] b, int off, int len) { if (clientSpan != null) { BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody(); requestBody.append(b, off, len); diff --git a/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java b/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java index 097b5b564f..3374936f60 100644 --- a/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java +++ b/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java @@ -40,7 +40,8 @@ public void ensureNoModificationAfterSpanEnd() { SpanImpl span = rootTx.createSpan(); BodyCaptureImpl spanBody = span.getContext().getHttp().getRequestBody(); spanBody.markEligibleForCapturing(); - spanBody.startCapture(null, 100); + spanBody.markPreconditionsPassed(null, 100); + spanBody.startCapture(); RequestBodyRecordingHelper helper = new RequestBodyRecordingHelper(span); helper.appendToBody(new byte[]{1, 2, 3, 4}, 1, 2); diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/BodyCaptureRegistry.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/BodyCaptureRegistry.java new file mode 100644 index 0000000000..6ef7dae3c6 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/BodyCaptureRegistry.java @@ -0,0 +1,30 @@ +package co.elastic.apm.agent.springwebclient; + +import co.elastic.apm.agent.httpclient.RequestBodyRecordingHelper; +import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent; +import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap; +import co.elastic.apm.agent.tracer.AbstractSpan; +import co.elastic.apm.agent.tracer.Span; +import org.springframework.http.client.reactive.ClientHttpRequest; + +import javax.annotation.Nullable; + +public class BodyCaptureRegistry { + + private static final WeakMap PENDING_RECORDINGS = WeakConcurrent.buildMap(); + + public static void maybeCaptureBodyFor(AbstractSpan abstractSpan, ClientHttpRequest request) { + if (!(abstractSpan instanceof Span)) { + return; + } + Span span = (Span) abstractSpan; + if (span.getContext().getHttp().getRequestBody().startCapture()) { + PENDING_RECORDINGS.put(request, new RequestBodyRecordingHelper(span)); + } + } + + @Nullable + public static RequestBodyRecordingHelper activateRecording(ClientHttpRequest request) { + return PENDING_RECORDINGS.remove(request); + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpConnectorInstrumentation.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpConnectorInstrumentation.java new file mode 100644 index 0000000000..ceeec4be33 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpConnectorInstrumentation.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebclient; + +import co.elastic.apm.agent.sdk.ElasticApmInstrumentation; +import co.elastic.apm.agent.tracer.GlobalTracer; +import co.elastic.apm.agent.tracer.TraceState; +import co.elastic.apm.agent.tracer.Tracer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.NamedElement; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.springframework.http.HttpMethod; +import org.springframework.http.client.reactive.ClientHttpRequest; +import reactor.core.publisher.Mono; + +import javax.annotation.Nullable; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.function.Function; + +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.nameContains; +import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +/** + * Instruments all {@link org.springframework.http.client.reactive.ClientHttpConnector} types, to preserve the span context + * within the callback passed to {@link org.springframework.http.client.reactive.ClientHttpConnector#connect(HttpMethod, URI, Function)}. + * If the span is ready for it, this will cause the request body to be captured. + */ +public class ClientHttpConnectorInstrumentation extends ElasticApmInstrumentation { + + private static final Tracer tracer = GlobalTracer.get(); + + @Override + public ElementMatcher getTypeMatcherPreFilter() { + return nameStartsWith("org.springframework.http.") + .and(nameContains("Connector")); + } + + @Override + public ElementMatcher getTypeMatcher() { + return hasSuperType(named("org.springframework.http.client.reactive.ClientHttpConnector")) + .and(not(isInterface())); + } + + @Override + public ElementMatcher getMethodMatcher() { + return named("connect") + .and(takesArgument(2, named("java.util.function.Function"))); + } + + @Override + public Collection getInstrumentationGroupNames() { + return Arrays.asList("http-client", "spring-webclient"); + } + + public static class AdviceClass { + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + @Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(2)) + public static Function> onBefore(@Advice.Argument(2) Function> requestCallback) { + TraceState context = tracer.currentContext(); + if (context.isEmpty()) { + return requestCallback; + } + return new Function>() { + @Override + public Mono apply(ClientHttpRequest clientHttpRequest) { + // Note that even though ClientHttpRequest exposes headers via the interface, those are empty + // therefore we check the span capturing pre-conditions in the WebClientExchangeFunctionInstrumentation instead + BodyCaptureRegistry.maybeCaptureBodyFor(context.getSpan(), clientHttpRequest); + return requestCallback.apply(clientHttpRequest); + } + }; + } + + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java new file mode 100644 index 0000000000..afe4b75ec5 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebclient; + +import co.elastic.apm.agent.httpclient.RequestBodyRecordingHelper; +import co.elastic.apm.agent.sdk.ElasticApmInstrumentation; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.NamedElement; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.client.reactive.ClientHttpRequest; +import reactor.core.publisher.Flux; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Function; + +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.nameContains; +import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +/** + * Instruments both {@link ClientHttpRequest#writeWith(Publisher)} and {@link ClientHttpRequest#writeAndFlushWith(Publisher)} + * to capture the request body. + */ +public class ClientHttpRequestInstrumentation extends ElasticApmInstrumentation { + + @Override + public ElementMatcher getTypeMatcherPreFilter() { + return nameStartsWith("org.springframework.http.client.reactive") + .and(nameContains("HttpRequest")); + } + + @Override + public ElementMatcher getTypeMatcher() { + return hasSuperType(named("org.springframework.http.client.reactive.ClientHttpRequest")) + .and(not(isInterface())); + } + + @Override + public ElementMatcher getMethodMatcher() { + return takesArgument(0, named("org.reactivestreams.Publisher")).and( + named("writeWith").or(named("writeAndFlushWith"))); + } + + @Override + public Collection getInstrumentationGroupNames() { + return Arrays.asList("http-client", "spring-webclient"); + } + + public static class AdviceClass { + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + @Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(0)) + @SuppressWarnings("unchecked") + public static Publisher onBefore( + @Advice.Origin("#m") String methodName, + @Advice.This ClientHttpRequest clientRequest, + @Advice.Argument(0) Publisher bodyPublisher + ) { + RequestBodyRecordingHelper activeRecording = BodyCaptureRegistry.activateRecording(clientRequest); + // Note that activateRecording would return null on subsequent calls for the same span + // This is important because writeAndFlushWith might be built on top of writeWith (or the other way round) + // The removal helps to not double capture the body in this case. + if (activeRecording != null) { + RecordingConsumer recordingConsumer = new RecordingConsumer(activeRecording); + if (methodName.equals("writeWith")) { + Publisher actualPublisher = (Publisher) bodyPublisher; + return Flux.from(actualPublisher) + .doOnNext(recordingConsumer); + } else if (methodName.equals("writeAndFlushWith")) { + Publisher> actualPublisher + = (Publisher>) bodyPublisher; + return Flux.from(actualPublisher) + .map(new Function, Publisher>() { + @Override + public Publisher apply(Publisher publisher) { + return Flux.from(publisher) + .doOnNext(recordingConsumer); + } + }); + } + } + return bodyPublisher; + } + } + + private static class RecordingConsumer implements Consumer { + + private final RequestBodyRecordingHelper recordTo; + + private RecordingConsumer(RequestBodyRecordingHelper recordTo) { + this.recordTo = recordTo; + } + + @Override + public void accept(DataBuffer dataBuffer) { + int positionBackUp = dataBuffer.readPosition(); + while (dataBuffer.readableByteCount() > 0) { + if (!recordTo.appendToBody(dataBuffer.read())) { + break; + } + } + dataBuffer.readPosition(positionBackUp); + } + } + +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientRequestHeaderGetter.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientRequestHeaderGetter.java new file mode 100644 index 0000000000..1bc6f13f66 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientRequestHeaderGetter.java @@ -0,0 +1,29 @@ +package co.elastic.apm.agent.springwebclient; + +import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter; +import org.springframework.web.reactive.function.client.ClientRequest; + +import javax.annotation.Nullable; +import java.util.List; + +public class ClientRequestHeaderGetter implements TextHeaderGetter { + + public static final ClientRequestHeaderGetter INSTANCE = new ClientRequestHeaderGetter(); + + @Nullable + @Override + public String getFirstHeader(String headerName, ClientRequest carrier) { + return carrier.headers().getFirst(headerName); + } + + @Override + public void forEach(String headerName, ClientRequest carrier, S state, HeaderConsumer consumer) { + List headerValues = carrier.headers().get(headerName); + if (headerValues == null) { + return; + } + for (String value : headerValues) { + consumer.accept(value, state); + } + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/WebClientExchangeFunctionInstrumentation.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/WebClientExchangeFunctionInstrumentation.java index 6ae015d439..aef6617f6e 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/WebClientExchangeFunctionInstrumentation.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/WebClientExchangeFunctionInstrumentation.java @@ -22,8 +22,8 @@ import co.elastic.apm.agent.sdk.ElasticApmInstrumentation; import co.elastic.apm.agent.tracer.GlobalTracer; import co.elastic.apm.agent.tracer.Span; -import co.elastic.apm.agent.tracer.Tracer; import co.elastic.apm.agent.tracer.TraceState; +import co.elastic.apm.agent.tracer.Tracer; import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.NamedElement; @@ -85,6 +85,8 @@ public static Object[] onBefore(@Advice.Argument(0) ClientRequest clientRequest) span.activate(); } + HttpClientHelper.checkBodyCapturePreconditions(tracer.getActive(), clientRequest, ClientRequestHeaderGetter.INSTANCE); + TraceState toPropagate = tracer.currentContext(); if (!toPropagate.isEmpty()) { ClientRequest.Builder builder = ClientRequest.from(clientRequest); diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation index 808ca3da32..7e3bb40c17 100755 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation @@ -1 +1,3 @@ co.elastic.apm.agent.springwebclient.WebClientExchangeFunctionInstrumentation +co.elastic.apm.agent.springwebclient.ClientHttpConnectorInstrumentation +co.elastic.apm.agent.springwebclient.ClientHttpRequestInstrumentation diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationIT.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationIT.java index 2a2fd670f1..126a999d5d 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationIT.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationIT.java @@ -24,6 +24,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import java.util.Arrays; @@ -82,5 +83,18 @@ public boolean isTestHttpCallWithUserInfoEnabled() { protected void performGet(String path) throws Exception { webClient.get().uri(path).exchangeToMono(response -> response.bodyToMono(String.class)).block(); } + + @Override + protected boolean isBodyCapturingSupported() { + return true; + } + + @Override + protected void performPost(String path, byte[] content, String contentTypeHeader) throws Exception { + webClient.post().uri(path) + .header("Content-Type", contentTypeHeader) + .body(Mono.just(content), byte[].class) + .exchangeToMono(response -> response.bodyToMono(String.class)).block(); + } } } diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationTest.java index 006f673732..d949a8b7e0 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationTest.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/test/java/co/elastic/apm/agent/springwebclient/WebClientInstrumentationTest.java @@ -26,6 +26,7 @@ import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @RunWith(Parameterized.class) @@ -86,6 +87,16 @@ protected void performGet(String path) throws Exception { strategy.execute(webClient, path); } + @Override + protected boolean isBodyCapturingSupported() { + return true; + } + + @Override + protected void performPost(String path, byte[] content, String contentTypeHeader) throws Exception { + strategy.execute(webClient, path, content, contentTypeHeader); + } + /** * Client-side API variants to cover. While we know that implementation details might delegate to the same method * we have to test for it to prevent regression in a future version @@ -98,12 +109,34 @@ void execute(Object client, String uri) { ((WebClient) client).get().uri(uri).exchange() // deprecated API .block(); } + + @Override + void execute(Object client, String uri, byte[] body, String contentTypeHeader) { + ((WebClient) client).post() + .uri(uri) + .header("Content-Type", contentTypeHeader) + .body(Mono.just(body), byte[].class) + .exchange() // deprecated API + .block(); + + } }, EXCHANGE_TO_FLUX { @Override void execute(Object client, String uri) { ((WebClient) client).get().uri(uri).exchangeToFlux(response -> response.bodyToFlux(String.class)).blockLast(); } + + @Override + void execute(Object client, String uri, byte[] body, String contentTypeHeader) { + ((WebClient) client).post() + .uri(uri) + .header("Content-Type", contentTypeHeader) + .body(Mono.just(body), byte[].class) + .exchangeToFlux(response -> response.bodyToFlux(String.class)) + .blockLast(); + + } }, EXCHANGE_TO_MONO { // TODO @@ -111,15 +144,37 @@ void execute(Object client, String uri) { void execute(Object client, String uri) { ((WebClient) client).get().uri(uri).exchangeToMono(response -> response.bodyToMono(String.class)).block(); } + + @Override + void execute(Object client, String uri, byte[] body, String contentTypeHeader) { + ((WebClient) client).post() + .uri(uri) + .header("Content-Type", contentTypeHeader) + .body(Mono.just(body), byte[].class) + .exchangeToMono(response -> response.bodyToMono(String.class)).block(); + + } }, RETRIEVE { @Override void execute(Object client, String uri) { ((WebClient) client).get().uri(uri).retrieve().bodyToMono(String.class).block(); } + + @Override + void execute(Object client, String uri, byte[] body, String contentTypeHeader) { + ((WebClient) client).post() + .uri(uri) + .header("Content-Type", contentTypeHeader) + .body(Mono.just(body), byte[].class) + .retrieve().bodyToMono(String.class).block(); + + } }; abstract void execute(Object client, String uri); + + abstract void execute(Object client, String uri, byte[] body, String contentTypeHeader); } public static class Clients { diff --git a/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java b/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java index 62e75d4d07..2105eee66c 100644 --- a/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java +++ b/apm-agent-plugins/apm-urlconnection-plugin/src/main/java/co/elastic/apm/agent/urlconnection/HttpUrlConnectionInstrumentation.java @@ -204,7 +204,7 @@ public static OutputStream exit(@Advice.This HttpURLConnection thiz, Span captureBodyFor = captureBodyForSpan.get(thiz); if (captureBodyFor == null) { AbstractSpan currentSpan = tracer.getActive(); - if (HttpClientHelper.startRequestBodyCapture(currentSpan, thiz, UrlConnectionPropertyAccessor.instance())) { + if (HttpClientHelper.checkAndStartRequestBodyCapture(currentSpan, thiz, UrlConnectionPropertyAccessor.instance())) { captureBodyFor = (Span) currentSpan; captureBodyForSpan.put(thiz, captureBodyFor); } diff --git a/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/metadata/BodyCapture.java b/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/metadata/BodyCapture.java index 0d005a126f..5a2070895f 100644 --- a/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/metadata/BodyCapture.java +++ b/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/metadata/BodyCapture.java @@ -20,6 +20,29 @@ import javax.annotation.Nullable; +/** + * Container class for managing request body capture and associated state. + * This class collects the body as encoded bytearray and a charset with which we'll attempt to decode the body. + *

+ * A span goes through several states to perform body-capture: + *

    + *
  • A span need to be marked as eligible for body capture. This is useful so that higher + * level instrumentation (E.g. Spring RestTemplate) can mark the span that they would like + * to capture the body without having to implement the capturing itself. Lower level instrumentations + * (e.g. for HTTPUrlConnection) will attempt to capture the body for the currently active span even + * if they didn't start a span themselves when the currently active span is marked as eligible. + *
  • + *
  • Even if a span is marked eligible, the body will only be captured if the preconditions have been checked. + * The preconditions check whether based on the agent configuration and the Content-Type header the body shall be captured or not. + * E.g. if the body capturing is disabled via the config, the preconditions will fail for every span. + *
  • + *
  • If the preconditions passed, capturing may be started via {@link #startCapture()}. + * Capturing will only start exactly once, {@link #startCapture()} will return false on subsequent calls. + * This prevents nested instrumentation being capable of capturing the body (e.g. Spring WebFlux WebClient + * and async Apache Http client) of capturing every byte multiple times and therefore reporting a garbage body. + *
  • + *
+ */ public interface BodyCapture { /** @@ -33,23 +56,37 @@ public interface BodyCapture { * @return true, if {@link #markEligibleForCapturing()} was called for this span. */ boolean isEligibleForCapturing(); - + + /** + * @return true, if either {@link #markPreconditionsFailed()} or {@link #markPreconditionsPassed(String, int)} have been called. + */ + boolean havePreconditionsBeenChecked(); + + /** + * Ensures that the no body capturing will be performed for this span, e.g. because it is disabled via the agent config. + */ + void markPreconditionsFailed(); + + /** + * Marks this span so that any capable instrumentation may start the capturing procedure via {@link #startCapture()} + */ + void markPreconditionsPassed(@Nullable String requestCharset, int numBytesToCapture); + + /** * This method acts as a protection mechanism so that only one instrumentation tries to capture the body. * It returns true, if the calling instrumentation shall start adding body byte via {@link #append(byte)}. *

- * For this to happen, {@link #markEligibleForCapturing()} must have been called first. + * For this to happen, both {@link #markEligibleForCapturing()} and {@link #markPreconditionsPassed(String, int)} + * must have been called first. *

- * After {@link #startCapture(String, int)} has returned true once, subsequent calls will return false. + * After {@link #startCapture()} has returned true once, subsequent calls will return false. * So for example if instrumentation A and B are active for the same span, only the first one will actually be capturing the body, - * because {@link #startCapture(String, int)} only returns true once. - * - * @param charset the charset (if available) with which the request-body is encoded. - * @param numBytesToCapture the number of bytes to capture, to configure the limit of the internal buffer + * because {@link #startCapture()} only returns true once. * * @return true, if the calling instrumentation should be capturing the body (by calling {@link #append(byte)} */ - boolean startCapture(@Nullable String charset, int numBytesToCapture); + boolean startCapture(); void append(byte b); From 2ec7da9c0ac138c3989a6110c118e280e902b4ea Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Mon, 26 Aug 2024 12:03:18 +0200 Subject: [PATCH 2/3] Fix cherry pick conflict --- .../apm/agent/httpclient/RequestBodyRecordingHelperTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java b/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java index 3374936f60..ad534b361f 100644 --- a/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java +++ b/apm-agent-plugins/apm-httpclient-core/src/test/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelperTest.java @@ -67,7 +67,8 @@ public void ensureLimitRespected() { SpanImpl span = rootTx.createSpan(); BodyCaptureImpl spanBody = span.getContext().getHttp().getRequestBody(); spanBody.markEligibleForCapturing(); - spanBody.startCapture(null, 3); + spanBody.markPreconditionsPassed(null, 3); + spanBody.startCapture(); RequestBodyRecordingHelper helper = new RequestBodyRecordingHelper(span); helper.appendToBody((byte) 1); From c0f338351bc9ee40d8fef686a037640eb578f917 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Tue, 27 Aug 2024 12:12:19 +0200 Subject: [PATCH 3/3] Review fixes --- .../agent/impl/context/BodyCaptureImpl.java | 15 ++++--- .../agent/httpclient/HttpClientHelper.java | 24 ++++++----- .../RequestBodyRecordingHelper.java | 4 ++ .../ClientHttpRequestInstrumentation.java | 40 ++++++++++--------- 4 files changed, 45 insertions(+), 38 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java index 3e786b7334..8cdb1e4973 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/context/BodyCaptureImpl.java @@ -46,11 +46,11 @@ public void recycle(ByteBuffer object) { }); private enum CaptureState { - NOT_ELIGIBLE, - ELIGIBLE, - PRECONDITIONS_PASSED, - PRECONDITIONS_FAILED, - STARTED + NOT_ELIGIBLE, // initial state + ELIGIBLE, // eligible but before preconditions evaluation + PRECONDITIONS_PASSED, // post preconditions (passed), can start capture + PRECONDITIONS_FAILED, // post preconditions (failed), no body will be captured + STARTED // the body capturing has been started, a buffer was acquired } private volatile CaptureState state; @@ -106,10 +106,9 @@ public boolean havePreconditionsBeenChecked() { @Override public void markPreconditionsFailed() { synchronized (this) { - if (state != CaptureState.ELIGIBLE) { - throw new IllegalStateException("state is " + state); + if (state == CaptureState.ELIGIBLE) { + state = CaptureState.PRECONDITIONS_FAILED; } - state = CaptureState.PRECONDITIONS_FAILED; } } diff --git a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java index 19a8b560ed..2641adb865 100644 --- a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java +++ b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/HttpClientHelper.java @@ -90,18 +90,20 @@ public static void checkBodyCapturePreconditions(@Nullable AbstractSpan a } WebConfiguration webConfig = GlobalTracer.get().getConfig(WebConfiguration.class); int byteCount = webConfig.getCaptureClientRequestBytes(); - if (byteCount > 0) { - List contentTypes = webConfig.getCaptureContentTypes(); - String contentTypeHeader = headerGetter.getFirstHeader("Content-Type", request); - if (contentTypeHeader == null) { - contentTypeHeader = ""; - } - if (WildcardMatcher.anyMatch(contentTypes, contentTypeHeader) != null) { - bodyCapture.markPreconditionsPassed(extractCharsetFromContentType(contentTypeHeader), byteCount); - return; - } + if (byteCount == 0) { + bodyCapture.markPreconditionsFailed(); + return; + } + List contentTypes = webConfig.getCaptureContentTypes(); + String contentTypeHeader = headerGetter.getFirstHeader("Content-Type", request); + if (contentTypeHeader == null) { + contentTypeHeader = ""; + } + if (WildcardMatcher.anyMatch(contentTypes, contentTypeHeader) == null) { + bodyCapture.markPreconditionsFailed(); + return; } - bodyCapture.markPreconditionsFailed(); + bodyCapture.markPreconditionsPassed(extractCharsetFromContentType(contentTypeHeader), byteCount); } public static boolean checkAndStartRequestBodyCapture(@Nullable AbstractSpan abstractSpan, R request, TextHeaderGetter headerGetter) { diff --git a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java index 80846353ed..a75274426a 100644 --- a/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java +++ b/apm-agent-plugins/apm-httpclient-core/src/main/java/co/elastic/apm/agent/httpclient/RequestBodyRecordingHelper.java @@ -22,6 +22,10 @@ public RequestBodyRecordingHelper(Span clientSpan) { } + /** + * @param b the byte to append + * @return false, if the body buffer is full and future calls would be no-op. True otherwise. + */ public boolean appendToBody(byte b) { if (clientSpan != null) { BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody(); diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java index afe4b75ec5..2a7757fced 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webclient-plugin/src/main/java/co/elastic/apm/agent/springwebclient/ClientHttpRequestInstrumentation.java @@ -88,26 +88,28 @@ public static Publisher onBefore( // Note that activateRecording would return null on subsequent calls for the same span // This is important because writeAndFlushWith might be built on top of writeWith (or the other way round) // The removal helps to not double capture the body in this case. - if (activeRecording != null) { - RecordingConsumer recordingConsumer = new RecordingConsumer(activeRecording); - if (methodName.equals("writeWith")) { - Publisher actualPublisher = (Publisher) bodyPublisher; - return Flux.from(actualPublisher) - .doOnNext(recordingConsumer); - } else if (methodName.equals("writeAndFlushWith")) { - Publisher> actualPublisher - = (Publisher>) bodyPublisher; - return Flux.from(actualPublisher) - .map(new Function, Publisher>() { - @Override - public Publisher apply(Publisher publisher) { - return Flux.from(publisher) - .doOnNext(recordingConsumer); - } - }); - } + if (activeRecording == null) { + return bodyPublisher; + } + RecordingConsumer recordingConsumer = new RecordingConsumer(activeRecording); + if (methodName.equals("writeWith")) { + Publisher actualPublisher = (Publisher) bodyPublisher; + return Flux.from(actualPublisher) + .doOnNext(recordingConsumer); + } else if (methodName.equals("writeAndFlushWith")) { + Publisher> actualPublisher + = (Publisher>) bodyPublisher; + return Flux.from(actualPublisher) + .map(new Function, Publisher>() { + @Override + public Publisher apply(Publisher publisher) { + return Flux.from(publisher) + .doOnNext(recordingConsumer); + } + }); + } else { + throw new IllegalStateException("This case should never happen"); } - return bodyPublisher; } }