diff --git a/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java b/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java index 8b90b4678a7..8f52f13ee86 100644 --- a/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java +++ b/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java @@ -51,7 +51,7 @@ public class InjectingPipeOutputStreamBenchmark { public void withPipe() throws Exception { try (final PrintWriter out = new PrintWriter( - new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content, null))) { + new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content))) { htmlContent.forEach(out::println); } } diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java index 8f91a8e38cd..7221a85d007 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.function.LongConsumer; import javax.annotation.concurrent.NotThreadSafe; /** @@ -23,18 +24,41 @@ public class InjectingPipeOutputStream extends OutputStream { private final Runnable onContentInjected; private final int bulkWriteThreshold; private final OutputStream downstream; + private final LongConsumer onBytesWritten; + private final LongConsumer onInjectionTime; + private long bytesWritten = 0; /** + * This constructor is typically used for testing where we care about the logic and not the + * telemetry. + * + * @param downstream the delegate output stream + * @param marker the marker to find in the stream. Must at least be one byte. + * @param contentToInject the content to inject once before the marker if found. + */ + public InjectingPipeOutputStream( + final OutputStream downstream, final byte[] marker, final byte[] contentToInject) { + this(downstream, marker, contentToInject, null, null, null); + } + + /** + * This constructor contains the full set of parameters. + * * @param downstream the delegate output stream * @param marker the marker to find in the stream. Must at least be one byte. * @param contentToInject the content to inject once before the marker if found. * @param onContentInjected callback called when and if the content is injected. + * @param onBytesWritten callback called when stream is closed to report total bytes written. + * @param onInjectionTime callback called with the time in milliseconds taken to write the + * injection content. */ public InjectingPipeOutputStream( final OutputStream downstream, final byte[] marker, final byte[] contentToInject, - final Runnable onContentInjected) { + final Runnable onContentInjected, + final LongConsumer onBytesWritten, + final LongConsumer onInjectionTime) { this.downstream = downstream; this.marker = marker; this.lookbehind = new byte[marker.length]; @@ -46,6 +70,8 @@ public InjectingPipeOutputStream( this.filter = true; this.contentToInject = contentToInject; this.onContentInjected = onContentInjected; + this.onBytesWritten = onBytesWritten; + this.onInjectionTime = onInjectionTime; this.bulkWriteThreshold = marker.length * 2 - 2; } @@ -57,11 +83,13 @@ public void write(int b) throws IOException { drain(); } downstream.write(b); + bytesWritten++; return; } if (count == lookbehind.length) { downstream.write(lookbehind[pos]); + bytesWritten++; } else { count++; } @@ -72,7 +100,12 @@ public void write(int b) throws IOException { if (marker[matchingPos++] == b) { if (matchingPos == marker.length) { filter = false; + long injectionStart = System.nanoTime(); downstream.write(contentToInject); + long injectionEnd = System.nanoTime(); + if (onInjectionTime != null) { + onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L); + } if (onContentInjected != null) { onContentInjected.run(); } @@ -91,6 +124,7 @@ public void write(byte[] array, int off, int len) throws IOException { drain(); } downstream.write(array, off, len); + bytesWritten += len; return; } @@ -103,12 +137,21 @@ public void write(byte[] array, int off, int len) throws IOException { // we have a full match. just write everything filter = false; drain(); - downstream.write(array, off, idx); + int bytesToWrite = idx; + downstream.write(array, off, bytesToWrite); + bytesWritten += bytesToWrite; + long injectionStart = System.nanoTime(); downstream.write(contentToInject); + long injectionEnd = System.nanoTime(); + if (onInjectionTime != null) { + onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L); + } if (onContentInjected != null) { onContentInjected.run(); } - downstream.write(array, off + idx, len - idx); + bytesToWrite = len - idx; + downstream.write(array, off + idx, bytesToWrite); + bytesWritten += bytesToWrite; } else { // we don't have a full match. write everything in a bulk except the lookbehind buffer // sequentially @@ -120,7 +163,9 @@ public void write(byte[] array, int off, int len) throws IOException { // will be reset if no errors after the following write filter = false; - downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold); + int bytesToWrite = len - bulkWriteThreshold; + downstream.write(array, off + marker.length - 1, bytesToWrite); + bytesWritten += bytesToWrite; filter = wasFiltering; for (int i = len - marker.length + 1; i < len; i++) { write(array[i]); @@ -163,6 +208,7 @@ private void drain() throws IOException { int cnt = count; for (int i = 0; i < cnt; i++) { downstream.write(lookbehind[(start + i) % lookbehind.length]); + bytesWritten++; count--; } filter = wasFiltering; @@ -185,6 +231,11 @@ public void flush() throws IOException { public void close() throws IOException { try { commit(); + // report the size of the original HTTP response before injecting via callback + if (onBytesWritten != null) { + onBytesWritten.accept(bytesWritten); + } + bytesWritten = 0; } finally { downstream.close(); } diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java index 7a3d4a75f19..8d2c222d924 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.io.Writer; +import java.util.function.LongConsumer; import javax.annotation.concurrent.NotThreadSafe; /** @@ -23,18 +24,41 @@ public class InjectingPipeWriter extends Writer { private final Runnable onContentInjected; private final int bulkWriteThreshold; private final Writer downstream; + private final LongConsumer onBytesWritten; + private final LongConsumer onInjectionTime; + private long bytesWritten = 0; /** + * This constructor is typically used for testing where we care about the logic and not the + * telemetry. + * + * @param downstream the delegate writer + * @param marker the marker to find in the stream. Must at least be one char. + * @param contentToInject the content to inject once before the marker if found. + */ + public InjectingPipeWriter( + final Writer downstream, final char[] marker, final char[] contentToInject) { + this(downstream, marker, contentToInject, null, null, null); + } + + /** + * This constructor contains the full set of parameters. + * * @param downstream the delegate writer * @param marker the marker to find in the stream. Must at least be one char. * @param contentToInject the content to inject once before the marker if found. * @param onContentInjected callback called when and if the content is injected. + * @param onBytesWritten callback called when writer is closed to report total bytes written. + * @param onInjectionTime callback called with the time in milliseconds taken to write the + * injection content. */ public InjectingPipeWriter( final Writer downstream, final char[] marker, final char[] contentToInject, - final Runnable onContentInjected) { + final Runnable onContentInjected, + final LongConsumer onBytesWritten, + final LongConsumer onInjectionTime) { this.downstream = downstream; this.marker = marker; this.lookbehind = new char[marker.length]; @@ -46,6 +70,8 @@ public InjectingPipeWriter( this.filter = true; this.contentToInject = contentToInject; this.onContentInjected = onContentInjected; + this.onBytesWritten = onBytesWritten; + this.onInjectionTime = onInjectionTime; this.bulkWriteThreshold = marker.length * 2 - 2; } @@ -57,11 +83,13 @@ public void write(int c) throws IOException { drain(); } downstream.write(c); + bytesWritten++; return; } if (count == lookbehind.length) { downstream.write(lookbehind[pos]); + bytesWritten++; } else { count++; } @@ -72,7 +100,12 @@ public void write(int c) throws IOException { if (marker[matchingPos++] == c) { if (matchingPos == marker.length) { filter = false; + long injectionStart = System.nanoTime(); downstream.write(contentToInject); + long injectionEnd = System.nanoTime(); + if (onInjectionTime != null) { + onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L); + } if (onContentInjected != null) { onContentInjected.run(); } @@ -91,6 +124,7 @@ public void write(char[] array, int off, int len) throws IOException { drain(); } downstream.write(array, off, len); + bytesWritten += len; return; } @@ -103,12 +137,21 @@ public void write(char[] array, int off, int len) throws IOException { // we have a full match. just write everything filter = false; drain(); - downstream.write(array, off, idx); + int bytesToWrite = idx; + downstream.write(array, off, bytesToWrite); + bytesWritten += bytesToWrite; + long injectionStart = System.nanoTime(); downstream.write(contentToInject); + long injectionEnd = System.nanoTime(); + if (onInjectionTime != null) { + onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L); + } if (onContentInjected != null) { onContentInjected.run(); } - downstream.write(array, off + idx, len - idx); + bytesToWrite = len - idx; + downstream.write(array, off + idx, bytesToWrite); + bytesWritten += bytesToWrite; } else { // we don't have a full match. write everything in a bulk except the lookbehind buffer // sequentially @@ -120,7 +163,9 @@ public void write(char[] array, int off, int len) throws IOException { // will be reset if no errors after the following write filter = false; - downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold); + int bytesToWrite = len - bulkWriteThreshold; + downstream.write(array, off + marker.length - 1, bytesToWrite); + bytesWritten += bytesToWrite; filter = wasFiltering; for (int i = len - marker.length + 1; i < len; i++) { @@ -164,6 +209,7 @@ private void drain() throws IOException { int cnt = count; for (int i = 0; i < cnt; i++) { downstream.write(lookbehind[(start + i) % lookbehind.length]); + bytesWritten++; count--; } filter = wasFiltering; @@ -188,6 +234,11 @@ public void close() throws IOException { commit(); } finally { downstream.close(); + // report the size of the original HTTP response before injecting via callback + if (onBytesWritten != null) { + onBytesWritten.accept(bytesWritten); + } + bytesWritten = 0; } } diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy index 9b04234ad3d..fcf4699075d 100644 --- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy @@ -1,8 +1,12 @@ package datadog.trace.bootstrap.instrumentation.buffer import datadog.trace.test.util.DDSpecification +import java.util.function.LongConsumer class InjectingPipeOutputStreamTest extends DDSpecification { + static final byte[] MARKER_BYTES = "".getBytes("UTF-8") + static final byte[] CONTEXT_BYTES = "".getBytes("UTF-8") + static class GlitchedOutputStream extends FilterOutputStream { int glitchesPos int count @@ -33,10 +37,18 @@ class InjectingPipeOutputStreamTest extends DDSpecification { } } + static class Counter { + int value = 0 + + def incr(long count) { + this.value += count + } + } + def 'should filter a buffer and inject if found #found'() { setup: def downstream = new ByteArrayOutputStream() - def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null), + def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8")), "UTF-8") when: try (def closeme = piped) { @@ -55,7 +67,7 @@ class InjectingPipeOutputStreamTest extends DDSpecification { setup: def baos = new ByteArrayOutputStream() def downstream = new GlitchedOutputStream(baos, glichesAt) - def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null) + def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8")) when: try { for (String line : body) { @@ -87,4 +99,116 @@ class InjectingPipeOutputStreamTest extends DDSpecification { // expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content [""] | "" | "" | 3 | " counter.incr(bytes) }, null) + + when: + piped.write(testBytes) + piped.close() + + then: + counter.value == testBytes.length + downstream.toByteArray() == testBytes + } + + def 'should count bytes correctly when writing bytes individually'() { + setup: + def testBytes = "test".getBytes("UTF-8") + def downstream = new ByteArrayOutputStream() + def counter = new Counter() + def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, { long bytes -> counter.incr(bytes) }, null) + + when: + for (int i = 0; i < testBytes.length; i++) { + piped.write((int) testBytes[i]) + } + piped.close() + + then: + counter.value == testBytes.length + downstream.toByteArray() == testBytes + } + + def 'should count bytes correctly with multiple writes'() { + setup: + def testBytes = "test content" + def downstream = new ByteArrayOutputStream() + def counter = new Counter() + def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, { long bytes -> counter.incr(bytes) }, null) + + when: + piped.write(testBytes[0..4].getBytes("UTF-8")) + piped.write(testBytes[5..5].getBytes("UTF-8")) + piped.write(testBytes[6..-1].getBytes("UTF-8")) + piped.close() + + then: + counter.value == testBytes.length() + downstream.toByteArray() == testBytes.getBytes("UTF-8") + } + + def 'should be resilient to exceptions when onBytesWritten callback is null'() { + setup: + def testBytes = "test content".getBytes("UTF-8") + def downstream = new ByteArrayOutputStream() + def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES) + + when: + piped.write(testBytes) + piped.close() + + then: + noExceptionThrown() + downstream.toByteArray() == testBytes + } + + def 'should call timing callback when injection happens'() { + setup: + def downstream = Mock(OutputStream) { + write(_) >> { args -> + Thread.sleep(1) // simulate slow write + } + } + def timingCallback = Mock(LongConsumer) + def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, null, timingCallback) + + when: + piped.write("".getBytes("UTF-8")) + piped.close() + + then: + 1 * timingCallback.accept({ it > 0 }) + } + + def 'should not call timing callback when no injection happens'() { + setup: + def downstream = new ByteArrayOutputStream() + def timingCallback = Mock(LongConsumer) + def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, null, timingCallback) + + when: + piped.write("no marker here".getBytes("UTF-8")) + piped.close() + + then: + 0 * timingCallback.accept(_) + } + + def 'should be resilient to exceptions when timing callback is null'() { + setup: + def downstream = new ByteArrayOutputStream() + def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, null, null) + + when: + piped.write("".getBytes("UTF-8")) + piped.close() + + then: + noExceptionThrown() + } } diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy index d115f81a403..7466839f7c8 100644 --- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy @@ -1,8 +1,12 @@ package datadog.trace.bootstrap.instrumentation.buffer import datadog.trace.test.util.DDSpecification +import java.util.function.LongConsumer class InjectingPipeWriterTest extends DDSpecification { + static final char[] MARKER_CHARS = "".toCharArray() + static final char[] CONTEXT_CHARS = "".toCharArray() + static class GlitchedWriter extends FilterWriter { int glitchesPos int count @@ -33,10 +37,18 @@ class InjectingPipeWriterTest extends DDSpecification { } } + static class Counter { + int value = 0 + + def incr(long count) { + this.value += count + } + } + def 'should filter a buffer and inject if found #found using write'() { setup: def downstream = new StringWriter() - def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null)) + def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray())) when: try (def closeme = piped) { piped.write(body) @@ -53,7 +65,7 @@ class InjectingPipeWriterTest extends DDSpecification { def 'should filter a buffer and inject if found #found using append'() { setup: def downstream = new StringWriter() - def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null)) + def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray())) when: try (def closeme = piped) { piped.append(body) @@ -71,7 +83,7 @@ class InjectingPipeWriterTest extends DDSpecification { setup: def writer = new StringWriter() def downstream = new GlitchedWriter(writer, glichesAt) - def piped = new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null) + def piped = new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray()) when: try { for (String line : body) { @@ -103,4 +115,116 @@ class InjectingPipeWriterTest extends DDSpecification { // expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content [""] | "" | "" | 3 | " counter.incr(bytes) }, null) + def testBytes = "test content" + + when: + piped.write(testBytes.toCharArray()) + piped.close() + + then: + counter.value == testBytes.length() + downstream.toString() == testBytes + } + + def 'should count bytes correctly when writing characters individually'() { + setup: + def downstream = new StringWriter() + def counter = new Counter() + def piped = new InjectingPipeWriter(downstream, MARKER_CHARS, CONTEXT_CHARS, null, { long bytes -> counter.incr(bytes) }, null) + def testBytes = "test" + + when: + for (int i = 0; i < testBytes.length(); i++) { + piped.write((int) testBytes.charAt(i)) + } + piped.close() + + then: + counter.value == testBytes.length() + downstream.toString() == testBytes + } + + def 'should count bytes correctly with multiple writes'() { + setup: + def downstream = new StringWriter() + def counter = new Counter() + def piped = new InjectingPipeWriter(downstream, MARKER_CHARS, CONTEXT_CHARS, null, { long bytes -> counter.incr(bytes) }, null) + def testBytes = "test content" + + when: + piped.write(testBytes[0..4].toCharArray()) + piped.write(testBytes[5..5].toCharArray()) + piped.write(testBytes[6..-1].toCharArray()) + piped.close() + + then: + counter.value == testBytes.length() + downstream.toString() == testBytes + } + + def 'should be resilient to exceptions when onBytesWritten callback is null'() { + setup: + def downstream = new StringWriter() + def piped = new InjectingPipeWriter(downstream, MARKER_CHARS, CONTEXT_CHARS) + def testBytes = "test content" + + when: + piped.write(testBytes.toCharArray()) + piped.close() + + then: + noExceptionThrown() + downstream.toString() == testBytes + } + + def 'should call timing callback when injection happens'() { + setup: + def downstream = Mock(Writer) { + write(_) >> { args -> + Thread.sleep(1) // simulate slow write + } + } + def timingCallback = Mock(LongConsumer) + def piped = new InjectingPipeWriter(downstream, MARKER_CHARS, CONTEXT_CHARS, null, null, timingCallback) + + when: + piped.write("".toCharArray()) + piped.close() + + then: + 1 * timingCallback.accept({ it > 0 }) + } + + def 'should not call timing callback when no injection happens'() { + setup: + def downstream = new StringWriter() + def timingCallback = Mock(LongConsumer) + def piped = new InjectingPipeWriter(downstream, MARKER_CHARS, CONTEXT_CHARS, null, null, timingCallback) + + when: + piped.write("no marker here".toCharArray()) + piped.close() + + then: + 0 * timingCallback.accept(_) + } + + def 'should be resilient to exceptions when timing callback is null'() { + setup: + def downstream = new StringWriter() + def piped = new InjectingPipeWriter(downstream, MARKER_CHARS, CONTEXT_CHARS, null, null, null) + + when: + piped.write("".toCharArray()) + piped.close() + + then: + noExceptionThrown() + } } diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletRequestWrapper.java b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletRequestWrapper.java index e0b4240097e..36be99b0a7f 100644 --- a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletRequestWrapper.java +++ b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletRequestWrapper.java @@ -37,7 +37,8 @@ public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse se ServletResponse actualResponse = servletResponse; // rewrap it if (servletResponse instanceof HttpServletResponse) { - actualResponse = new RumHttpServletResponseWrapper((HttpServletResponse) servletResponse); + actualResponse = + new RumHttpServletResponseWrapper(this, (HttpServletResponse) servletResponse); servletRequest.setAttribute(DD_RUM_INJECTED, actualResponse); } return super.startAsync(servletRequest, actualResponse); diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java index a5defc56dfe..a1dc5bcf54d 100644 --- a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java +++ b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java @@ -7,17 +7,21 @@ import java.io.PrintWriter; import java.lang.invoke.MethodHandle; import java.nio.charset.Charset; +import javax.servlet.ServletContext; import javax.servlet.ServletOutputStream; import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponseWrapper; public class RumHttpServletResponseWrapper extends HttpServletResponseWrapper { private final RumInjector rumInjector; + private final String servletVersion; private WrappedServletOutputStream outputStream; private PrintWriter printWriter; private InjectingPipeWriter wrappedPipeWriter; private boolean shouldInject = true; + private String contentEncoding = null; private static final MethodHandle SET_CONTENT_LENGTH_LONG = getMh("setContentLengthLong"); @@ -34,9 +38,19 @@ private static void sneakyThrow(Throwable e) throws E { throw (E) e; } - public RumHttpServletResponseWrapper(HttpServletResponse response) { + public RumHttpServletResponseWrapper(HttpServletRequest request, HttpServletResponse response) { super(response); this.rumInjector = RumInjector.get(); + + String version = "3"; + ServletContext servletContext = request.getServletContext(); + if (servletContext != null) { + try { + version = String.valueOf(servletContext.getEffectiveMajorVersion()); + } catch (Exception e) { + } + } + this.servletVersion = version; } @Override @@ -45,18 +59,30 @@ public ServletOutputStream getOutputStream() throws IOException { return outputStream; } if (!shouldInject) { + RumInjector.getTelemetryCollector().onInjectionSkipped(servletVersion); return super.getOutputStream(); } - String encoding = getCharacterEncoding(); - if (encoding == null) { - encoding = Charset.defaultCharset().name(); + try { + String encoding = getCharacterEncoding(); + if (encoding == null) { + encoding = Charset.defaultCharset().name(); + } + outputStream = + new WrappedServletOutputStream( + super.getOutputStream(), + rumInjector.getMarkerBytes(encoding), + rumInjector.getSnippetBytes(encoding), + this::onInjected, + bytes -> + RumInjector.getTelemetryCollector() + .onInjectionResponseSize(servletVersion, bytes), + milliseconds -> + RumInjector.getTelemetryCollector() + .onInjectionTime(servletVersion, milliseconds)); + } catch (Exception e) { + RumInjector.getTelemetryCollector().onInjectionFailed(servletVersion, contentEncoding); + throw e; } - outputStream = - new WrappedServletOutputStream( - super.getOutputStream(), - rumInjector.getMarkerBytes(encoding), - rumInjector.getSnippetBytes(encoding), - this::onInjected); return outputStream; } @@ -67,19 +93,51 @@ public PrintWriter getWriter() throws IOException { return printWriter; } if (!shouldInject) { + RumInjector.getTelemetryCollector().onInjectionSkipped(servletVersion); return super.getWriter(); } - wrappedPipeWriter = - new InjectingPipeWriter( - super.getWriter(), - rumInjector.getMarkerChars(), - rumInjector.getSnippetChars(), - this::onInjected); - printWriter = new PrintWriter(wrappedPipeWriter); + try { + wrappedPipeWriter = + new InjectingPipeWriter( + super.getWriter(), + rumInjector.getMarkerChars(), + rumInjector.getSnippetChars(), + this::onInjected, + bytes -> + RumInjector.getTelemetryCollector() + .onInjectionResponseSize(servletVersion, bytes), + milliseconds -> + RumInjector.getTelemetryCollector() + .onInjectionTime(servletVersion, milliseconds)); + printWriter = new PrintWriter(wrappedPipeWriter); + } catch (Exception e) { + RumInjector.getTelemetryCollector().onInjectionFailed(servletVersion, contentEncoding); + throw e; + } return printWriter; } + @Override + public void setHeader(String name, String value) { + checkForContentSecurityPolicy(name); + super.setHeader(name, value); + } + + @Override + public void addHeader(String name, String value) { + checkForContentSecurityPolicy(name); + super.addHeader(name, value); + } + + private void checkForContentSecurityPolicy(String name) { + if (name != null) { + if (name.startsWith("Content-Security-Policy")) { + RumInjector.getTelemetryCollector().onContentSecurityPolicyDetected(servletVersion); + } + } + } + @Override public void setContentLength(int len) { // don't set it since we don't know if we will inject @@ -99,6 +157,14 @@ public void setContentLengthLong(long len) { } } + @Override + public void setCharacterEncoding(String charset) { + if (charset != null) { + this.contentEncoding = charset; + } + super.setCharacterEncoding(charset); + } + @Override public void reset() { this.outputStream = null; @@ -117,6 +183,7 @@ public void resetBuffer() { } public void onInjected() { + RumInjector.getTelemetryCollector().onInjectionSucceed(servletVersion); try { setHeader("x-datadog-rum-injected", "1"); } catch (Throwable ignored) { diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Advice.java b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Advice.java index a06cbda8c59..9a8a55f159d 100644 --- a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Advice.java +++ b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Advice.java @@ -50,7 +50,8 @@ public static boolean onEnter( if (maybeRumWrapper instanceof RumHttpServletResponseWrapper) { rumServletWrapper = (RumHttpServletResponseWrapper) maybeRumWrapper; } else { - rumServletWrapper = new RumHttpServletResponseWrapper((HttpServletResponse) response); + rumServletWrapper = + new RumHttpServletResponseWrapper(httpServletRequest, (HttpServletResponse) response); httpServletRequest.setAttribute(DD_RUM_INJECTED, rumServletWrapper); response = rumServletWrapper; request = new RumHttpServletRequestWrapper(httpServletRequest, rumServletWrapper); diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java index 109a55491d8..c4b575836ff 100644 --- a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java +++ b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java @@ -4,6 +4,7 @@ import datadog.trace.util.MethodHandles; import java.io.IOException; import java.lang.invoke.MethodHandle; +import java.util.function.LongConsumer; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -29,8 +30,15 @@ private static void sneakyThrow(Throwable e) throws E { } public WrappedServletOutputStream( - ServletOutputStream delegate, byte[] marker, byte[] contentToInject, Runnable onInjected) { - this.filtered = new InjectingPipeOutputStream(delegate, marker, contentToInject, onInjected); + ServletOutputStream delegate, + byte[] marker, + byte[] contentToInject, + Runnable onInjected, + LongConsumer onBytesWritten, + LongConsumer onInjectionTime) { + this.filtered = + new InjectingPipeOutputStream( + delegate, marker, contentToInject, onInjected, onBytesWritten, onInjectionTime); this.delegate = delegate; } diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/RumHttpServletResponseWrapperTest.groovy b/dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/RumHttpServletResponseWrapperTest.groovy new file mode 100644 index 00000000000..f34fa560ed6 --- /dev/null +++ b/dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/RumHttpServletResponseWrapperTest.groovy @@ -0,0 +1,265 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.rum.RumInjector +import datadog.trace.api.rum.RumTelemetryCollector +import datadog.trace.bootstrap.instrumentation.buffer.InjectingPipeOutputStream +import datadog.trace.bootstrap.instrumentation.buffer.InjectingPipeWriter +import datadog.trace.instrumentation.servlet3.RumHttpServletResponseWrapper +import datadog.trace.instrumentation.servlet3.WrappedServletOutputStream +import spock.lang.Subject + +import java.util.function.LongConsumer +import javax.servlet.ServletContext +import javax.servlet.http.HttpServletRequest +import javax.servlet.http.HttpServletResponse + +class RumHttpServletResponseWrapperTest extends AgentTestRunner { + private static final String SERVLET_VERSION = "3" + + def mockRequest = Mock(HttpServletRequest) + def mockResponse = Mock(HttpServletResponse) + def mockServletContext = Mock(ServletContext) + def mockTelemetryCollector = Mock(RumTelemetryCollector) + + @Subject + RumHttpServletResponseWrapper wrapper + + void setup() { + mockRequest.getServletContext() >> mockServletContext + mockServletContext.getEffectiveMajorVersion() >> Integer.parseInt(SERVLET_VERSION) + wrapper = new RumHttpServletResponseWrapper(mockRequest, mockResponse) + RumInjector.setTelemetryCollector(mockTelemetryCollector) + } + + void cleanup() { + RumInjector.setTelemetryCollector(RumTelemetryCollector.NO_OP) + } + + void 'onInjected calls telemetry collector onInjectionSucceed'() { + when: + wrapper.onInjected() + + then: + 1 * mockTelemetryCollector.onInjectionSucceed(SERVLET_VERSION) + } + + void 'getOutputStream with non-HTML content reports skipped'() { + setup: + wrapper.setContentType("text/plain") + + when: + wrapper.getOutputStream() + + then: + 1 * mockTelemetryCollector.onInjectionSkipped(SERVLET_VERSION) + 1 * mockResponse.getOutputStream() + } + + void 'getWriter with non-HTML content reports skipped'() { + setup: + wrapper.setContentType("text/plain") + + when: + wrapper.getWriter() + + then: + 1 * mockTelemetryCollector.onInjectionSkipped(SERVLET_VERSION) + 1 * mockResponse.getWriter() + } + + void 'getOutputStream exception reports failure'() { + setup: + wrapper.setContentType("text/html") + mockResponse.getOutputStream() >> { throw new IOException("stream error") } + + when: + try { + wrapper.getOutputStream() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, null) + } + + void 'getWriter exception reports failure'() { + setup: + wrapper.setContentType("text/html") + mockResponse.getWriter() >> { throw new IOException("writer error") } + + when: + try { + wrapper.getWriter() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, null) + } + + void 'setHeader with Content-Security-Policy reports CSP detected'() { + when: + wrapper.setHeader("Content-Security-Policy", "test") + + then: + 1 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.setHeader("Content-Security-Policy", "test") + } + + void 'addHeader with Content-Security-Policy reports CSP detected'() { + when: + wrapper.addHeader("Content-Security-Policy", "test") + + then: + 1 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.addHeader("Content-Security-Policy", "test") + } + + void 'setHeader with non-CSP header does not report CSP detected'() { + when: + wrapper.setHeader("X-Content-Security-Policy", "test") + + then: + 0 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.setHeader("X-Content-Security-Policy", "test") + } + + void 'addHeader with non-CSP header does not report CSP detected'() { + when: + wrapper.addHeader("X-Content-Security-Policy", "test") + + then: + 0 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.addHeader("X-Content-Security-Policy", "test") + } + + void 'setCharacterEncoding reports the content-encoding tag with value when injection fails'() { + setup: + wrapper.setContentType("text/html") + wrapper.setCharacterEncoding("UTF-8") + mockResponse.getOutputStream() >> { throw new IOException("stream error") } + + when: + try { + wrapper.getOutputStream() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, "UTF-8") + } + + void 'setCharacterEncoding reports the content-encoding tag with null when injection fails'() { + setup: + wrapper.setContentType("text/html") + wrapper.setCharacterEncoding(null) + mockResponse.getOutputStream() >> { throw new IOException("stream error") } + + when: + try { + wrapper.getOutputStream() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, null) + } + + // Callback is created in the RumHttpServletResponseWrapper and passed to InjectingPipeOutputStream via WrappedServletOutputStream. + // When the stream is closed, the callback is called with the number of bytes written to the stream and the time taken to write the injection content. + void 'response sizes are reported to the telemetry collector via the WrappedServletOutputStream callback'() { + setup: + def downstream = Mock(javax.servlet.ServletOutputStream) + def marker = "".getBytes("UTF-8") + def contentToInject = "".getBytes("UTF-8") + def onBytesWritten = { bytes -> + mockTelemetryCollector.onInjectionResponseSize(SERVLET_VERSION, bytes) + } + def wrappedStream = new WrappedServletOutputStream( + downstream, marker, contentToInject, null, onBytesWritten, null) + def testBytes = "test content" + + when: + wrappedStream.write(testBytes[0..5].getBytes("UTF-8")) + wrappedStream.write(testBytes[6..-1].getBytes("UTF-8")) + wrappedStream.close() + + then: + 1 * mockTelemetryCollector.onInjectionResponseSize(SERVLET_VERSION, testBytes.length()) + } + + void 'response sizes are reported by the InjectingPipeOutputStream callback'() { + setup: + def downstream = Mock(java.io.OutputStream) + def marker = "".getBytes("UTF-8") + def contentToInject = "".getBytes("UTF-8") + def onBytesWritten = Mock(LongConsumer) + def stream = new InjectingPipeOutputStream( + downstream, marker, contentToInject, null, onBytesWritten, null) + def testBytes = "test content" + + when: + stream.write(testBytes[0..5].getBytes("UTF-8")) + stream.write(testBytes[6..-1].getBytes("UTF-8")) + stream.close() + + then: + 1 * onBytesWritten.accept(testBytes.length()) + } + + void 'response sizes are reported by the InjectingPipeWriter callback'() { + setup: + def downstream = Mock(java.io.Writer) + def marker = "".toCharArray() + def contentToInject = "".toCharArray() + def onBytesWritten = Mock(LongConsumer) + def writer = new InjectingPipeWriter( + downstream, marker, contentToInject, null, onBytesWritten, null) + def testBytes = "test content" + + when: + writer.write(testBytes[0..5].toCharArray()) + writer.write(testBytes[6..-1].toCharArray()) + writer.close() + + then: + 1 * onBytesWritten.accept(testBytes.length()) + } + + void 'injection timing is reported by the InjectingPipeOutputStream callback'() { + setup: + def downstream = Mock(java.io.OutputStream) { + write(_) >> { args -> + Thread.sleep(1) // simulate slow write + } + } + def marker = "".getBytes("UTF-8") + def contentToInject = "".getBytes("UTF-8") + def onInjectionTime = Mock(LongConsumer) + def stream = new InjectingPipeOutputStream( + downstream, marker, contentToInject, null, null, onInjectionTime) + + when: + stream.write("content".getBytes("UTF-8")) + stream.close() + + then: + 1 * onInjectionTime.accept({ it > 0 }) + } + + void 'injection timing is reported by the InjectingPipeWriter callback'() { + setup: + def downstream = Mock(java.io.Writer) { + write(_) >> { args -> + Thread.sleep(1) // simulate slow write + } + } + def marker = "".toCharArray() + def contentToInject = "".toCharArray() + def onInjectionTime = Mock(LongConsumer) + def writer = new InjectingPipeWriter( + downstream, marker, contentToInject, null, null, onInjectionTime) + + when: + writer.write("content".toCharArray()) + writer.close() + + then: + 1 * onInjectionTime.accept({ it > 0 }) + } +} diff --git a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/JakartaServletInstrumentation.java b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/JakartaServletInstrumentation.java index d3630cc1fd8..3d0013786ee 100644 --- a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/JakartaServletInstrumentation.java +++ b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/JakartaServletInstrumentation.java @@ -84,7 +84,9 @@ public static AgentSpan before( if (maybeRumWrapper instanceof RumHttpServletResponseWrapper) { rumServletWrapper = (RumHttpServletResponseWrapper) maybeRumWrapper; } else { - rumServletWrapper = new RumHttpServletResponseWrapper((HttpServletResponse) response); + rumServletWrapper = + new RumHttpServletResponseWrapper( + httpServletRequest, (HttpServletResponse) response); httpServletRequest.setAttribute(DD_RUM_INJECTED, rumServletWrapper); response = rumServletWrapper; request = new RumHttpServletRequestWrapper(httpServletRequest, rumServletWrapper); diff --git a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletRequestWrapper.java b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletRequestWrapper.java index c2a05680488..7bba1a13c10 100644 --- a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletRequestWrapper.java +++ b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletRequestWrapper.java @@ -37,7 +37,8 @@ public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse se ServletResponse actualResponse = servletResponse; // rewrap it if (servletResponse instanceof HttpServletResponse) { - actualResponse = new RumHttpServletResponseWrapper((HttpServletResponse) servletResponse); + actualResponse = + new RumHttpServletResponseWrapper(this, (HttpServletResponse) servletResponse); servletRequest.setAttribute(DD_RUM_INJECTED, actualResponse); } return super.startAsync(servletRequest, actualResponse); diff --git a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletResponseWrapper.java b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletResponseWrapper.java index 4b91afd3890..b242ba07837 100644 --- a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletResponseWrapper.java +++ b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletResponseWrapper.java @@ -2,7 +2,9 @@ import datadog.trace.api.rum.RumInjector; import datadog.trace.bootstrap.instrumentation.buffer.InjectingPipeWriter; +import jakarta.servlet.ServletContext; import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponseWrapper; import java.io.IOException; @@ -11,14 +13,26 @@ public class RumHttpServletResponseWrapper extends HttpServletResponseWrapper { private final RumInjector rumInjector; + private final String servletVersion; private WrappedServletOutputStream outputStream; private InjectingPipeWriter wrappedPipeWriter; private PrintWriter printWriter; private boolean shouldInject = true; + private String contentEncoding = null; - public RumHttpServletResponseWrapper(HttpServletResponse response) { + public RumHttpServletResponseWrapper(HttpServletRequest request, HttpServletResponse response) { super(response); this.rumInjector = RumInjector.get(); + + String version = "5"; + ServletContext servletContext = request.getServletContext(); + if (servletContext != null) { + try { + version = String.valueOf(servletContext.getEffectiveMajorVersion()); + } catch (Exception e) { + } + } + this.servletVersion = version; } @Override @@ -27,18 +41,30 @@ public ServletOutputStream getOutputStream() throws IOException { return outputStream; } if (!shouldInject) { + RumInjector.getTelemetryCollector().onInjectionSkipped(servletVersion); return super.getOutputStream(); } - String encoding = getCharacterEncoding(); - if (encoding == null) { - encoding = Charset.defaultCharset().name(); + try { + String encoding = getCharacterEncoding(); + if (encoding == null) { + encoding = Charset.defaultCharset().name(); + } + outputStream = + new WrappedServletOutputStream( + super.getOutputStream(), + rumInjector.getMarkerBytes(encoding), + rumInjector.getSnippetBytes(encoding), + this::onInjected, + bytes -> + RumInjector.getTelemetryCollector() + .onInjectionResponseSize(servletVersion, bytes), + milliseconds -> + RumInjector.getTelemetryCollector() + .onInjectionTime(servletVersion, milliseconds)); + } catch (Exception e) { + RumInjector.getTelemetryCollector().onInjectionFailed(servletVersion, contentEncoding); + throw e; } - outputStream = - new WrappedServletOutputStream( - super.getOutputStream(), - rumInjector.getMarkerBytes(encoding), - rumInjector.getSnippetBytes(encoding), - this::onInjected); return outputStream; } @@ -48,19 +74,51 @@ public PrintWriter getWriter() throws IOException { return printWriter; } if (!shouldInject) { + RumInjector.getTelemetryCollector().onInjectionSkipped(servletVersion); return super.getWriter(); } - wrappedPipeWriter = - new InjectingPipeWriter( - super.getWriter(), - rumInjector.getMarkerChars(), - rumInjector.getSnippetChars(), - this::onInjected); - printWriter = new PrintWriter(wrappedPipeWriter); + try { + wrappedPipeWriter = + new InjectingPipeWriter( + super.getWriter(), + rumInjector.getMarkerChars(), + rumInjector.getSnippetChars(), + this::onInjected, + bytes -> + RumInjector.getTelemetryCollector() + .onInjectionResponseSize(servletVersion, bytes), + milliseconds -> + RumInjector.getTelemetryCollector() + .onInjectionTime(servletVersion, milliseconds)); + printWriter = new PrintWriter(wrappedPipeWriter); + } catch (Exception e) { + RumInjector.getTelemetryCollector().onInjectionFailed(servletVersion, contentEncoding); + throw e; + } return printWriter; } + @Override + public void setHeader(String name, String value) { + checkForContentSecurityPolicy(name); + super.setHeader(name, value); + } + + @Override + public void addHeader(String name, String value) { + checkForContentSecurityPolicy(name); + super.addHeader(name, value); + } + + private void checkForContentSecurityPolicy(String name) { + if (name != null) { + if (name.startsWith("Content-Security-Policy")) { + RumInjector.getTelemetryCollector().onContentSecurityPolicyDetected(servletVersion); + } + } + } + @Override public void setContentLength(int len) { // don't set it since we don't know if we will inject @@ -76,6 +134,14 @@ public void setContentLengthLong(long len) { } } + @Override + public void setCharacterEncoding(String charset) { + if (charset != null) { + this.contentEncoding = charset; + } + super.setCharacterEncoding(charset); + } + @Override public void reset() { this.outputStream = null; @@ -94,6 +160,7 @@ public void resetBuffer() { } public void onInjected() { + RumInjector.getTelemetryCollector().onInjectionSucceed(servletVersion); try { setHeader("x-datadog-rum-injected", "1"); } catch (Throwable ignored) { diff --git a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/WrappedServletOutputStream.java b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/WrappedServletOutputStream.java index db956377708..fe2a87774ac 100644 --- a/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/WrappedServletOutputStream.java +++ b/dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/WrappedServletOutputStream.java @@ -4,14 +4,22 @@ import jakarta.servlet.ServletOutputStream; import jakarta.servlet.WriteListener; import java.io.IOException; +import java.util.function.LongConsumer; public class WrappedServletOutputStream extends ServletOutputStream { private final InjectingPipeOutputStream filtered; private final ServletOutputStream delegate; public WrappedServletOutputStream( - ServletOutputStream delegate, byte[] marker, byte[] contentToInject, Runnable onInjected) { - this.filtered = new InjectingPipeOutputStream(delegate, marker, contentToInject, onInjected); + ServletOutputStream delegate, + byte[] marker, + byte[] contentToInject, + Runnable onInjected, + LongConsumer onBytesWritten, + LongConsumer onInjectionTime) { + this.filtered = + new InjectingPipeOutputStream( + delegate, marker, contentToInject, onInjected, onBytesWritten, onInjectionTime); this.delegate = delegate; } diff --git a/dd-java-agent/instrumentation/servlet/request-5/src/test/groovy/RumHttpServletResponseWrapperTest.groovy b/dd-java-agent/instrumentation/servlet/request-5/src/test/groovy/RumHttpServletResponseWrapperTest.groovy new file mode 100644 index 00000000000..78dbd697c0d --- /dev/null +++ b/dd-java-agent/instrumentation/servlet/request-5/src/test/groovy/RumHttpServletResponseWrapperTest.groovy @@ -0,0 +1,265 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.rum.RumInjector +import datadog.trace.api.rum.RumTelemetryCollector +import datadog.trace.bootstrap.instrumentation.buffer.InjectingPipeOutputStream +import datadog.trace.bootstrap.instrumentation.buffer.InjectingPipeWriter +import datadog.trace.instrumentation.servlet5.RumHttpServletResponseWrapper +import datadog.trace.instrumentation.servlet5.WrappedServletOutputStream +import spock.lang.Subject + +import java.util.function.LongConsumer +import jakarta.servlet.ServletContext +import jakarta.servlet.http.HttpServletRequest +import jakarta.servlet.http.HttpServletResponse + +class RumHttpServletResponseWrapperTest extends AgentTestRunner { + private static final String SERVLET_VERSION = "5" + + def mockRequest = Mock(HttpServletRequest) + def mockResponse = Mock(HttpServletResponse) + def mockServletContext = Mock(ServletContext) + def mockTelemetryCollector = Mock(RumTelemetryCollector) + + @Subject + RumHttpServletResponseWrapper wrapper + + void setup() { + mockRequest.getServletContext() >> mockServletContext + mockServletContext.getEffectiveMajorVersion() >> Integer.parseInt(SERVLET_VERSION) + wrapper = new RumHttpServletResponseWrapper(mockRequest, mockResponse) + RumInjector.setTelemetryCollector(mockTelemetryCollector) + } + + void cleanup() { + RumInjector.setTelemetryCollector(RumTelemetryCollector.NO_OP) + } + + void 'onInjected calls telemetry collector onInjectionSucceed'() { + when: + wrapper.onInjected() + + then: + 1 * mockTelemetryCollector.onInjectionSucceed(SERVLET_VERSION) + } + + void 'getOutputStream with non-HTML content reports skipped'() { + setup: + wrapper.setContentType("text/plain") + + when: + wrapper.getOutputStream() + + then: + 1 * mockTelemetryCollector.onInjectionSkipped(SERVLET_VERSION) + 1 * mockResponse.getOutputStream() + } + + void 'getWriter with non-HTML content reports skipped'() { + setup: + wrapper.setContentType("text/plain") + + when: + wrapper.getWriter() + + then: + 1 * mockTelemetryCollector.onInjectionSkipped(SERVLET_VERSION) + 1 * mockResponse.getWriter() + } + + void 'getOutputStream exception reports failure'() { + setup: + wrapper.setContentType("text/html") + mockResponse.getOutputStream() >> { throw new IOException("stream error") } + + when: + try { + wrapper.getOutputStream() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, null) + } + + void 'getWriter exception reports failure'() { + setup: + wrapper.setContentType("text/html") + mockResponse.getWriter() >> { throw new IOException("writer error") } + + when: + try { + wrapper.getWriter() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, null) + } + + void 'setHeader with Content-Security-Policy reports CSP detected'() { + when: + wrapper.setHeader("Content-Security-Policy", "test") + + then: + 1 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.setHeader("Content-Security-Policy", "test") + } + + void 'addHeader with Content-Security-Policy reports CSP detected'() { + when: + wrapper.addHeader("Content-Security-Policy", "test") + + then: + 1 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.addHeader("Content-Security-Policy", "test") + } + + void 'setHeader with non-CSP header does not report CSP detected'() { + when: + wrapper.setHeader("X-Content-Security-Policy", "test") + + then: + 0 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.setHeader("X-Content-Security-Policy", "test") + } + + void 'addHeader with non-CSP header does not report CSP detected'() { + when: + wrapper.addHeader("X-Content-Security-Policy", "test") + + then: + 0 * mockTelemetryCollector.onContentSecurityPolicyDetected(SERVLET_VERSION) + 1 * mockResponse.addHeader("X-Content-Security-Policy", "test") + } + + void 'setCharacterEncoding reports the content-encoding tag with value when injection fails'() { + setup: + wrapper.setContentType("text/html") + wrapper.setCharacterEncoding("UTF-8") + mockResponse.getOutputStream() >> { throw new IOException("stream error") } + + when: + try { + wrapper.getOutputStream() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, "UTF-8") + } + + void 'setCharacterEncoding reports the content-encoding tag with null when injection fails'() { + setup: + wrapper.setContentType("text/html") + wrapper.setCharacterEncoding(null) + mockResponse.getOutputStream() >> { throw new IOException("stream error") } + + when: + try { + wrapper.getOutputStream() + } catch (IOException ignored) {} + + then: + 1 * mockTelemetryCollector.onInjectionFailed(SERVLET_VERSION, null) + } + + // Callback is created in the RumHttpServletResponseWrapper and passed to InjectingPipeOutputStream via WrappedServletOutputStream. + // When the stream is closed, the callback is called with the number of bytes written to the stream and the time taken to write the injection content. + void 'response sizes are reported to the telemetry collector via the WrappedServletOutputStream callback'() { + setup: + def downstream = Mock(jakarta.servlet.ServletOutputStream) + def marker = "".getBytes("UTF-8") + def contentToInject = "".getBytes("UTF-8") + def onBytesWritten = { bytes -> + mockTelemetryCollector.onInjectionResponseSize(SERVLET_VERSION, bytes) + } + def wrappedStream = new WrappedServletOutputStream( + downstream, marker, contentToInject, null, onBytesWritten, null) + def testBytes = "test content" + + when: + wrappedStream.write(testBytes[0..5].getBytes("UTF-8")) + wrappedStream.write(testBytes[6..-1].getBytes("UTF-8")) + wrappedStream.close() + + then: + 1 * mockTelemetryCollector.onInjectionResponseSize(SERVLET_VERSION, testBytes.length()) + } + + void 'response sizes are reported by the InjectingPipeOutputStream callback'() { + setup: + def downstream = Mock(java.io.OutputStream) + def marker = "".getBytes("UTF-8") + def contentToInject = "".getBytes("UTF-8") + def onBytesWritten = Mock(LongConsumer) + def stream = new InjectingPipeOutputStream( + downstream, marker, contentToInject, null, onBytesWritten, null) + def testBytes = "test content" + + when: + stream.write(testBytes[0..5].getBytes("UTF-8")) + stream.write(testBytes[6..-1].getBytes("UTF-8")) + stream.close() + + then: + 1 * onBytesWritten.accept(testBytes.length()) + } + + void 'response sizes are reported by the InjectingPipeWriter callback'() { + setup: + def downstream = Mock(java.io.Writer) + def marker = "".toCharArray() + def contentToInject = "".toCharArray() + def onBytesWritten = Mock(LongConsumer) + def writer = new InjectingPipeWriter( + downstream, marker, contentToInject, null, onBytesWritten, null) + def testBytes = "test content" + + when: + writer.write(testBytes[0..5].toCharArray()) + writer.write(testBytes[6..-1].toCharArray()) + writer.close() + + then: + 1 * onBytesWritten.accept(testBytes.length()) + } + + void 'injection timing is reported by the InjectingPipeOutputStream callback'() { + setup: + def downstream = Mock(java.io.OutputStream) { + write(_) >> { args -> + Thread.sleep(1) // simulate slow write + } + } + def marker = "".getBytes("UTF-8") + def contentToInject = "".getBytes("UTF-8") + def onInjectionTime = Mock(LongConsumer) + def stream = new InjectingPipeOutputStream( + downstream, marker, contentToInject, null, null, onInjectionTime) + + when: + stream.write("content".getBytes("UTF-8")) + stream.close() + + then: + 1 * onInjectionTime.accept({ it > 0 }) + } + + void 'injection timing is reported by the InjectingPipeWriter callback'() { + setup: + def downstream = Mock(java.io.Writer) { + write(_) >> { args -> + Thread.sleep(1) // simulate slow write + } + } + def marker = "".toCharArray() + def contentToInject = "".toCharArray() + def onInjectionTime = Mock(LongConsumer) + def writer = new InjectingPipeWriter( + downstream, marker, contentToInject, null, null, onInjectionTime) + + when: + writer.write("content".toCharArray()) + writer.close() + + then: + 1 * onInjectionTime.accept({ it > 0 }) + } +} diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy index fba553fe199..8bd05e8147a 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy @@ -2236,14 +2236,28 @@ abstract class HttpServerTest extends WithHttpServer { def "test rum injection in head for mime #mime"() { setup: assumeTrue(testRumInjection()) + def telemetryCollector = RumInjector.getTelemetryCollector() def request = new Request.Builder().url(server.address().resolve("gimme-$mime").toURL()) .get().build() + when: def response = client.newCall(request).execute() + def responseBody = response.body().string() + def finalSummary = telemetryCollector.summary() + then: assert response.code() == 200 - assert response.body().string().contains(new String(RumInjector.get().getSnippetBytes("UTF-8"), "UTF-8")) == expected + assert responseBody.contains(new String(RumInjector.get().getSnippetBytes("UTF-8"), "UTF-8")) == expected assert response.header("x-datadog-rum-injected") == (expected ? "1" : null) + + // Check a few telemetry metrics + if (expected) { + assert finalSummary.contains("injectionSucceed=") + assert responseBody.length() > 0 + } else { + assert finalSummary.contains("injectionSkipped=") + } + where: mime | expected "html" | true diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index ad35455d6c4..4c81a579f83 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -30,6 +30,7 @@ import datadog.trace.api.DynamicConfig; import datadog.trace.api.EndpointTracker; import datadog.trace.api.IdGenerationStrategy; +import datadog.trace.api.InstrumenterConfig; import datadog.trace.api.StatsDClient; import datadog.trace.api.TagMap; import datadog.trace.api.TraceConfig; @@ -49,6 +50,7 @@ import datadog.trace.api.metrics.SpanMetricRegistry; import datadog.trace.api.naming.SpanNaming; import datadog.trace.api.remoteconfig.ServiceNameCollector; +import datadog.trace.api.rum.RumInjector; import datadog.trace.api.sampling.PrioritySampling; import datadog.trace.api.scopemanager.ScopeListener; import datadog.trace.api.time.SystemTimeSource; @@ -703,6 +705,7 @@ private CoreTracer( config.isHealthMetricsEnabled() ? new MonitoringImpl(this.statsDClient, 10, SECONDS) : Monitoring.DISABLED; + this.healthMetrics = healthMetrics != null ? healthMetrics @@ -710,6 +713,12 @@ private CoreTracer( ? new TracerHealthMetrics(this.statsDClient) : HealthMetrics.NO_OP); this.healthMetrics.start(); + + // Start RUM injector telemetry + if (InstrumenterConfig.get().isRumEnabled()) { + RumInjector.enableTelemetry(this.statsDClient); + } + performanceMonitoring = config.isPerfMetricsEnabled() ? new MonitoringImpl(this.statsDClient, 10, SECONDS) @@ -1256,6 +1265,7 @@ public void close() { tracingConfigPoller.stop(); pendingTraceBuffer.close(); writer.close(); + RumInjector.shutdownTelemetry(); statsDClient.close(); metricsAggregator.close(); dataStreamsMonitoring.close(); diff --git a/internal-api/build.gradle.kts b/internal-api/build.gradle.kts index eb25ebd9ac8..6b29c32d3a7 100644 --- a/internal-api/build.gradle.kts +++ b/internal-api/build.gradle.kts @@ -250,6 +250,8 @@ val excludedClassesBranchCoverage by extra( "datadog.trace.api.env.CapturedEnvironment.ProcessInfo", "datadog.trace.util.TempLocationManager", "datadog.trace.util.TempLocationManager.*", + // Branches depend on RUM injector state that cannot be reliably controlled in unit tests + "datadog.trace.api.rum.RumInjectorMetrics", ) ) diff --git a/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java b/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java index 5501c80e62c..343c9450aa6 100644 --- a/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java +++ b/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java @@ -29,6 +29,8 @@ public final class RumInjector { private final DDCache markerCache; private final Function snippetBytes; + private static volatile RumTelemetryCollector telemetryCollector = RumTelemetryCollector.NO_OP; + RumInjector(Config config, InstrumenterConfig instrumenterConfig) { boolean rumEnabled = instrumenterConfig.isRumEnabled(); RumInjectorConfig injectorConfig = config.getRumInjectorConfig(); @@ -122,4 +124,46 @@ public byte[] getMarkerBytes(String encoding) { } return this.markerCache.computeIfAbsent(encoding, MARKER_BYTES); } + + /** + * Starts telemetry collection and reports metrics via StatsDClient. + * + * @param statsDClient The StatsDClient to report metrics to. + */ + public static void enableTelemetry(datadog.trace.api.StatsDClient statsDClient) { + if (statsDClient != null) { + RumInjectorMetrics metrics = new RumInjectorMetrics(statsDClient); + telemetryCollector = metrics; + + if (INSTANCE.isEnabled()) { + telemetryCollector.onInitializationSucceed(); + } + } else { + telemetryCollector = RumTelemetryCollector.NO_OP; + } + } + + /** Shuts down telemetry collection and resets the telemetry collector to NO_OP. */ + public static void shutdownTelemetry() { + telemetryCollector.close(); + telemetryCollector = RumTelemetryCollector.NO_OP; + } + + /** + * Sets the telemetry collector. This is used for testing purposes only. + * + * @param collector The telemetry collector to set or {@code null} to reset to NO_OP. + */ + public static void setTelemetryCollector(RumTelemetryCollector collector) { + telemetryCollector = collector != null ? collector : RumTelemetryCollector.NO_OP; + } + + /** + * Gets the telemetry collector. + * + * @return The telemetry collector used to report telemetry. + */ + public static RumTelemetryCollector getTelemetryCollector() { + return telemetryCollector; + } } diff --git a/internal-api/src/main/java/datadog/trace/api/rum/RumInjectorMetrics.java b/internal-api/src/main/java/datadog/trace/api/rum/RumInjectorMetrics.java new file mode 100644 index 00000000000..f97742af3d2 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/rum/RumInjectorMetrics.java @@ -0,0 +1,200 @@ +package datadog.trace.api.rum; + +import datadog.trace.api.Config; +import datadog.trace.api.StatsDClient; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class implements the RumTelemetryCollector interface, which is used to collect telemetry + * from the RumInjector. Metrics are then reported via StatsDClient with tagging. + * + * @see common + * metrics and tags + */ +public class RumInjectorMetrics implements RumTelemetryCollector { + + private final AtomicLong injectionSucceed = new AtomicLong(); + private final AtomicLong injectionFailed = new AtomicLong(); + private final AtomicLong injectionSkipped = new AtomicLong(); + private final AtomicLong contentSecurityPolicyDetected = new AtomicLong(); + private final AtomicLong initializationSucceed = new AtomicLong(); + + private final StatsDClient statsd; + + private final String applicationId; + private final String remoteConfigUsed; + + // Cache dependent on servlet version and content encoding + private final DDCache succeedTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache skippedTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache cspTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache responseTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache timeTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache failedTagsCache = DDCaches.newFixedSizeCache(16); + + private static final String[] INIT_TAGS = + new String[] {"integration_name:servlet", "integration_version:N/A"}; + + public RumInjectorMetrics(final StatsDClient statsd) { + this.statsd = statsd; + + // Get RUM config values (applicationId and remoteConfigUsed) for tagging + RumInjector rumInjector = RumInjector.get(); + RumInjectorConfig injectorConfig = Config.get().getRumInjectorConfig(); + if (rumInjector.isEnabled() && injectorConfig != null) { + this.applicationId = injectorConfig.applicationId; + this.remoteConfigUsed = injectorConfig.remoteConfigurationId != null ? "true" : "false"; + } else { + this.applicationId = "unknown"; + this.remoteConfigUsed = "false"; + } + } + + @Override + public void onInjectionSucceed(String servletVersion) { + injectionSucceed.incrementAndGet(); + + String[] tags = + succeedTagsCache.computeIfAbsent( + servletVersion, + version -> + new String[] { + "application_id:" + applicationId, + "integration_name:servlet", + "integration_version:" + version, + "remote_config_used:" + remoteConfigUsed + }); + + statsd.count("rum.injection.succeed", 1, tags); + } + + @Override + public void onInjectionFailed(String servletVersion, String contentEncoding) { + injectionFailed.incrementAndGet(); + + String cacheKey = servletVersion + ":" + contentEncoding; + String[] tags = + failedTagsCache.computeIfAbsent( + cacheKey, + key -> { + if (contentEncoding != null) { + return new String[] { + "application_id:" + applicationId, + "content_encoding:" + contentEncoding, + "integration_name:servlet", + "integration_version:" + servletVersion, + "reason:failed_to_return_response_wrapper", + "remote_config_used:" + remoteConfigUsed + }; + } else { + return new String[] { + "application_id:" + applicationId, + "integration_name:servlet", + "integration_version:" + servletVersion, + "reason:failed_to_return_response_wrapper", + "remote_config_used:" + remoteConfigUsed + }; + } + }); + + statsd.count("rum.injection.failed", 1, tags); + } + + @Override + public void onInjectionSkipped(String servletVersion) { + injectionSkipped.incrementAndGet(); + + String[] tags = + skippedTagsCache.computeIfAbsent( + servletVersion, + version -> + new String[] { + "application_id:" + applicationId, + "integration_name:servlet", + "integration_version:" + version, + "reason:should_not_inject", + "remote_config_used:" + remoteConfigUsed + }); + + statsd.count("rum.injection.skipped", 1, tags); + } + + @Override + public void onInitializationSucceed() { + initializationSucceed.incrementAndGet(); + statsd.count("rum.injection.initialization.succeed", 1, INIT_TAGS); + } + + @Override + public void onContentSecurityPolicyDetected(String servletVersion) { + contentSecurityPolicyDetected.incrementAndGet(); + + String[] tags = + cspTagsCache.computeIfAbsent( + servletVersion, + version -> + new String[] { + "integration_name:servlet", + "integration_version:" + version, + "kind:header", + "reason:csp_header_found", + "status:seen" + }); + statsd.count("rum.injection.content_security_policy", 1, tags); + } + + @Override + public void onInjectionResponseSize(String servletVersion, long bytes) { + String[] tags = + responseTagsCache.computeIfAbsent( + servletVersion, + version -> + new String[] { + "integration_name:servlet", + "integration_version:" + version, + "response_kind:header" + }); + statsd.distribution("rum.injection.response.bytes", bytes, tags); + } + + @Override + public void onInjectionTime(String servletVersion, long milliseconds) { + String[] tags = + timeTagsCache.computeIfAbsent( + servletVersion, + version -> new String[] {"integration_name:servlet", "integration_version:" + version}); + statsd.distribution("rum.injection.ms", milliseconds, tags); + } + + @Override + public void close() { + injectionSucceed.set(0); + injectionFailed.set(0); + injectionSkipped.set(0); + contentSecurityPolicyDetected.set(0); + initializationSucceed.set(0); + + succeedTagsCache.clear(); + skippedTagsCache.clear(); + cspTagsCache.clear(); + responseTagsCache.clear(); + timeTagsCache.clear(); + failedTagsCache.clear(); + } + + public String summary() { + return "\ninitializationSucceed=" + + initializationSucceed.get() + + "\ninjectionSucceed=" + + injectionSucceed.get() + + "\ninjectionFailed=" + + injectionFailed.get() + + "\ninjectionSkipped=" + + injectionSkipped.get() + + "\ncontentSecurityPolicyDetected=" + + contentSecurityPolicyDetected.get(); + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/rum/RumTelemetryCollector.java b/internal-api/src/main/java/datadog/trace/api/rum/RumTelemetryCollector.java new file mode 100644 index 00000000000..74638bdffec --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/rum/RumTelemetryCollector.java @@ -0,0 +1,96 @@ +package datadog.trace.api.rum; + +/** + * Collect RUM injection telemetry from the RumInjector This is implemented by the + * RumInjectorMetrics class + */ +public interface RumTelemetryCollector { + + RumTelemetryCollector NO_OP = + new RumTelemetryCollector() { + @Override + public void onInjectionSucceed(String integrationVersion) {} + + @Override + public void onInjectionFailed(String integrationVersion, String contentEncoding) {} + + @Override + public void onInjectionSkipped(String integrationVersion) {} + + @Override + public void onInitializationSucceed() {} + + @Override + public void onContentSecurityPolicyDetected(String integrationVersion) {} + + @Override + public void onInjectionResponseSize(String integrationVersion, long bytes) {} + + @Override + public void onInjectionTime(String integrationVersion, long milliseconds) {} + + @Override + public void close() {} + + @Override + public String summary() { + return ""; + } + }; + + /** + * Reports successful RUM injection. + * + * @param integrationVersion The version of the integration that was injected. + */ + void onInjectionSucceed(String integrationVersion); + + /** + * Reports failed RUM injection. + * + * @param integrationVersion The version of the integration that was injected. + * @param contentEncoding The content encoding of the response that was injected. + */ + void onInjectionFailed(String integrationVersion, String contentEncoding); + + /** + * Reports skipped RUM injection. + * + * @param integrationVersion The version of the integration that was injected. + */ + void onInjectionSkipped(String integrationVersion); + + /** Reports successful RUM injector initialization. */ + void onInitializationSucceed(); + + /** + * Reports content security policy detected in the response header to be injected. + * + * @param integrationVersion The version of the integration that was injected. + */ + void onContentSecurityPolicyDetected(String integrationVersion); + + /** + * Reports the size of the response before injection. + * + * @param integrationVersion The version of the integration that was injected. + * @param bytes The size of the response before injection. + */ + void onInjectionResponseSize(String integrationVersion, long bytes); + + /** + * Reports the time taken to inject the RUM SDK. + * + * @param integrationVersion The version of the integration that was injected. + * @param milliseconds The time taken to inject the RUM SDK. + */ + void onInjectionTime(String integrationVersion, long milliseconds); + + /** Closes the telemetry collector. */ + default void close() {} + + /** Returns a human-readable summary of the telemetry collected. */ + default String summary() { + return ""; + } +} diff --git a/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorMetricsTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorMetricsTest.groovy new file mode 100644 index 00000000000..eb7ba338bc0 --- /dev/null +++ b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorMetricsTest.groovy @@ -0,0 +1,242 @@ +package datadog.trace.api.rum + +import datadog.trace.api.StatsDClient +import spock.lang.Specification +import spock.lang.Subject + +class RumInjectorMetricsTest extends Specification { + def statsD = Mock(StatsDClient) + + @Subject + def metrics = new RumInjectorMetrics(statsD) + + void assertTags(String[] args, String... expectedTags) { + expectedTags.each { expectedTag -> + assert args.contains(expectedTag), "Expected tag '$expectedTag' not found in tags: ${args as List}" + } + } + + // Note: application_id and remote_config_used tags need dynamic runtime values that depend on + // the RUM configuration state, so we do not test them here. + def "test onInjectionSucceed"() { + when: + metrics.onInjectionSucceed("3") + metrics.onInjectionSucceed("5") + metrics.onInjectionSucceed("6") + + then: + 1 * statsD.count('rum.injection.succeed', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:3") + } + 1 * statsD.count('rum.injection.succeed', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:5") + } + 1 * statsD.count('rum.injection.succeed', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:6") + } + 0 * _ + } + + def "test onInjectionFailed"() { + when: + metrics.onInjectionFailed("3", "gzip") + metrics.onInjectionFailed("5", null) + metrics.onInjectionFailed("6", "gzip") + + then: + 1 * statsD.count('rum.injection.failed', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "content_encoding:gzip", "integration_name:servlet", "integration_version:3", "reason:failed_to_return_response_wrapper") + } + 1 * statsD.count('rum.injection.failed', 1, _) >> { args -> + def tags = args[2] as String[] + assert !tags.any { it.startsWith("content_encoding:") } + assertTags(tags, "integration_name:servlet", "integration_version:5", "reason:failed_to_return_response_wrapper") + } + 1 * statsD.count('rum.injection.failed', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "content_encoding:gzip", "integration_name:servlet", "integration_version:6", "reason:failed_to_return_response_wrapper") + } + 0 * _ + } + + def "test onInjectionSkipped"() { + when: + metrics.onInjectionSkipped("3") + metrics.onInjectionSkipped("5") + metrics.onInjectionSkipped("6") + + then: + 1 * statsD.count('rum.injection.skipped', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:3", "reason:should_not_inject") + } + 1 * statsD.count('rum.injection.skipped', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:5", "reason:should_not_inject") + } + 1 * statsD.count('rum.injection.skipped', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:6", "reason:should_not_inject") + } + 0 * _ + } + + def "test onContentSecurityPolicyDetected"() { + when: + metrics.onContentSecurityPolicyDetected("3") + metrics.onContentSecurityPolicyDetected("5") + metrics.onContentSecurityPolicyDetected("6") + + then: + 1 * statsD.count('rum.injection.content_security_policy', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:3", "kind:header", "reason:csp_header_found", "status:seen") + } + 1 * statsD.count('rum.injection.content_security_policy', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:5", "kind:header", "reason:csp_header_found", "status:seen") + } + 1 * statsD.count('rum.injection.content_security_policy', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:6", "kind:header", "reason:csp_header_found", "status:seen") + } + 0 * _ + } + + def "test onInitializationSucceed"() { + when: + metrics.onInitializationSucceed() + + then: + 1 * statsD.count('rum.injection.initialization.succeed', 1, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:N/A") + } + 0 * _ + } + + def "test onInjectionResponseSize with multiple sizes"() { + when: + metrics.onInjectionResponseSize("3", 256) + metrics.onInjectionResponseSize("5", 512) + metrics.onInjectionResponseSize("6", 1024) + + then: + 1 * statsD.distribution('rum.injection.response.bytes', 256, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:3", "response_kind:header") + } + 1 * statsD.distribution('rum.injection.response.bytes', 512, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:5", "response_kind:header") + } + 1 * statsD.distribution('rum.injection.response.bytes', 1024, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:6", "response_kind:header") + } + 0 * _ + } + + def "test onInjectionTime with multiple durations"() { + when: + metrics.onInjectionTime("5", 5L) + metrics.onInjectionTime("3", 10L) + metrics.onInjectionTime("6", 15L) + + then: + 1 * statsD.distribution('rum.injection.ms', 5L, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:5") + } + 1 * statsD.distribution('rum.injection.ms', 10L, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:3") + } + 1 * statsD.distribution('rum.injection.ms', 15L, _) >> { args -> + def tags = args[2] as String[] + assertTags(tags, "integration_name:servlet", "integration_version:6") + } + 0 * _ + } + + def "test summary with multiple events in different order"() { + when: + metrics.onInitializationSucceed() + metrics.onContentSecurityPolicyDetected("3") + metrics.onInjectionSkipped("5") + metrics.onInjectionFailed("3", "gzip") + metrics.onInjectionSucceed("3") + metrics.onInjectionFailed("6", null) + metrics.onInjectionSucceed("6") + metrics.onInjectionSkipped("3") + metrics.onContentSecurityPolicyDetected("6") + metrics.onInjectionResponseSize("3", 256) + metrics.onInjectionTime("5", 5L) + def summary = metrics.summary() + + then: + summary.contains("initializationSucceed=1") + summary.contains("injectionSucceed=2") + summary.contains("injectionFailed=2") + summary.contains("injectionSkipped=2") + summary.contains("contentSecurityPolicyDetected=2") + 1 * statsD.count('rum.injection.initialization.succeed', 1, _) + 2 * statsD.count('rum.injection.succeed', 1, _) + 2 * statsD.count('rum.injection.failed', 1, _) + 2 * statsD.count('rum.injection.skipped', 1, _) + 2 * statsD.count('rum.injection.content_security_policy', 1, _) + 1 * statsD.distribution('rum.injection.response.bytes', 256, _) + 1 * statsD.distribution('rum.injection.ms', 5L, _) + 0 * _ + } + + def "test metrics start at zero in summary"() { + when: + def summary = metrics.summary() + + then: + summary.contains("initializationSucceed=0") + summary.contains("injectionSucceed=0") + summary.contains("injectionFailed=0") + summary.contains("injectionSkipped=0") + summary.contains("contentSecurityPolicyDetected=0") + 0 * _ + } + + def "test close resets counters in summary"() { + when: + metrics.onInitializationSucceed() + metrics.onInjectionSucceed("3") + metrics.onInjectionFailed("3", "gzip") + metrics.onInjectionSkipped("3") + metrics.onContentSecurityPolicyDetected("3") + + def summaryBeforeClose = metrics.summary() + metrics.close() + def summaryAfterClose = metrics.summary() + + then: + summaryBeforeClose.contains("initializationSucceed=1") + summaryBeforeClose.contains("injectionSucceed=1") + summaryBeforeClose.contains("injectionFailed=1") + summaryBeforeClose.contains("injectionSkipped=1") + summaryBeforeClose.contains("contentSecurityPolicyDetected=1") + + summaryAfterClose.contains("initializationSucceed=0") + summaryAfterClose.contains("injectionSucceed=0") + summaryAfterClose.contains("injectionFailed=0") + summaryAfterClose.contains("injectionSkipped=0") + summaryAfterClose.contains("contentSecurityPolicyDetected=0") + + 1 * statsD.count('rum.injection.initialization.succeed', 1, _) + 1 * statsD.count('rum.injection.succeed', 1, _) + 1 * statsD.count('rum.injection.failed', 1, _) + 1 * statsD.count('rum.injection.skipped', 1, _) + 1 * statsD.count('rum.injection.content_security_policy', 1, _) + 0 * _ + } +} diff --git a/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy index 98988e40242..1b2fd4783fb 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy @@ -65,4 +65,164 @@ class RumInjectorTest extends DDSpecification { injector.getSnippetChars() != null injector.getMarkerChars() != null } + + void 'set telemetry collector'() { + setup: + def telemetryCollector = mock(RumTelemetryCollector) + + when: + RumInjector.setTelemetryCollector(telemetryCollector) + + then: + RumInjector.getTelemetryCollector() == telemetryCollector + + cleanup: + RumInjector.setTelemetryCollector(RumTelemetryCollector.NO_OP) + } + + void 'return NO_OP when telemetry collector is not set'() { + when: + RumInjector.setTelemetryCollector(null) + + then: + RumInjector.getTelemetryCollector() == RumTelemetryCollector.NO_OP + } + + void 'enable telemetry with StatsDClient'() { + when: + RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) + + then: + RumInjector.getTelemetryCollector() instanceof datadog.trace.api.rum.RumInjectorMetrics + + cleanup: + RumInjector.shutdownTelemetry() + } + + void 'enabling telemetry with a null StatsDClient sets the telemetry collector to NO_OP'() { + when: + RumInjector.enableTelemetry(null) + + then: + RumInjector.getTelemetryCollector() == RumTelemetryCollector.NO_OP + } + + void 'shutdown telemetry'() { + setup: + RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) + + when: + RumInjector.shutdownTelemetry() + + then: + RumInjector.getTelemetryCollector() == RumTelemetryCollector.NO_OP + } + + void 'initialize rum injector'() { + when: + RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) + def telemetryCollector = RumInjector.getTelemetryCollector() + telemetryCollector.onInitializationSucceed() + def summary = telemetryCollector.summary() + + then: + summary.contains("initializationSucceed=1") + + cleanup: + RumInjector.shutdownTelemetry() + } + + void 'telemetry integration works end-to-end'() { + when: + RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) + + def telemetryCollector = RumInjector.getTelemetryCollector() + telemetryCollector.onInjectionSucceed("3") + telemetryCollector.onInjectionFailed("3", "gzip") + telemetryCollector.onInjectionSkipped("3") + telemetryCollector.onContentSecurityPolicyDetected("3") + telemetryCollector.onInjectionResponseSize("3", 256) + telemetryCollector.onInjectionTime("3", 5L) + + def summary = telemetryCollector.summary() + + then: + summary.contains("injectionSucceed=1") + summary.contains("injectionFailed=1") + summary.contains("injectionSkipped=1") + summary.contains("contentSecurityPolicyDetected=1") + + cleanup: + RumInjector.shutdownTelemetry() + } + + void 'response size telemetry does not throw an exception'() { + setup: + def mockStatsDClient = mock(datadog.trace.api.StatsDClient) + + when: + RumInjector.enableTelemetry(mockStatsDClient) + + def telemetryCollector = RumInjector.getTelemetryCollector() + telemetryCollector.onInjectionResponseSize("3", 256) + telemetryCollector.onInjectionResponseSize("3", 512) + telemetryCollector.onInjectionResponseSize("5", 2048) + + then: + noExceptionThrown() + + cleanup: + RumInjector.shutdownTelemetry() + } + + void 'injection time telemetry does not throw an exception'() { + setup: + def mockStatsDClient = mock(datadog.trace.api.StatsDClient) + + when: + RumInjector.enableTelemetry(mockStatsDClient) + + def telemetryCollector = RumInjector.getTelemetryCollector() + telemetryCollector.onInjectionTime("5", 5L) + telemetryCollector.onInjectionTime("5", 10L) + telemetryCollector.onInjectionTime("3", 20L) + + then: + noExceptionThrown() + + cleanup: + RumInjector.shutdownTelemetry() + } + + void 'concurrent telemetry calls return an accurate summary'() { + setup: + RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) + def telemetryCollector = RumInjector.getTelemetryCollector() + def threads = [] + + when: + // simulate multiple threads calling telemetry methods + (1..50).each { i -> + threads << Thread.start { + telemetryCollector.onInjectionSucceed("3") + telemetryCollector.onInjectionFailed("3", "gzip") + telemetryCollector.onInjectionSkipped("3") + telemetryCollector.onContentSecurityPolicyDetected("3") + telemetryCollector.onInjectionResponseSize("3", 256) + telemetryCollector.onInjectionTime("3", 5L) + } + } + threads*.join() + + def summary = telemetryCollector.summary() + + then: + summary.contains("injectionSucceed=50") + summary.contains("injectionFailed=50") + summary.contains("injectionSkipped=50") + summary.contains("contentSecurityPolicyDetected=50") + + cleanup: + RumInjector.shutdownTelemetry() + } } diff --git a/internal-api/src/test/groovy/datadog/trace/api/rum/RumTelemetryCollectorTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/rum/RumTelemetryCollectorTest.groovy new file mode 100644 index 00000000000..19c423635a7 --- /dev/null +++ b/internal-api/src/test/groovy/datadog/trace/api/rum/RumTelemetryCollectorTest.groovy @@ -0,0 +1,106 @@ +package datadog.trace.api.rum + +import spock.lang.Specification + +class RumTelemetryCollectorTest extends Specification { + + def "test default NO_OP does not throw exception"() { + when: + RumTelemetryCollector.NO_OP.onInjectionSucceed("3") + RumTelemetryCollector.NO_OP.onInjectionSucceed("5") + RumTelemetryCollector.NO_OP.onInjectionFailed("3", "gzip") + RumTelemetryCollector.NO_OP.onInjectionFailed("5", null) + RumTelemetryCollector.NO_OP.onInjectionSkipped("3") + RumTelemetryCollector.NO_OP.onInjectionSkipped("5") + RumTelemetryCollector.NO_OP.onInitializationSucceed() + RumTelemetryCollector.NO_OP.onContentSecurityPolicyDetected("3") + RumTelemetryCollector.NO_OP.onContentSecurityPolicyDetected("5") + RumTelemetryCollector.NO_OP.onInjectionResponseSize("3", 256L) + RumTelemetryCollector.NO_OP.onInjectionResponseSize("5", 512L) + RumTelemetryCollector.NO_OP.onInjectionTime("3", 10L) + RumTelemetryCollector.NO_OP.onInjectionTime("5", 20L) + RumTelemetryCollector.NO_OP.close() + + then: + noExceptionThrown() + } + + def "test default NO_OP summary returns an empty string"() { + when: + def summary = RumTelemetryCollector.NO_OP.summary() + + then: + summary == "" + } + + def "test default NO_OP close method does not throw exception"() { + when: + RumTelemetryCollector.NO_OP.close() + + then: + noExceptionThrown() + } + + def "test defining a custom implementation does not throw exception"() { + setup: + def customCollector = new RumTelemetryCollector() { + @Override + void onInjectionSucceed(String integrationVersion) { + } + + @Override + void onInjectionFailed(String integrationVersion, String contentEncoding) { + } + + @Override + void onInjectionSkipped(String integrationVersion) { + } + + @Override + void onInitializationSucceed() { + } + + @Override + void onContentSecurityPolicyDetected(String integrationVersion) { + } + + @Override + void onInjectionResponseSize(String integrationVersion, long bytes) { + } + + @Override + void onInjectionTime(String integrationVersion, long milliseconds) { + } + } + + when: + customCollector.close() + def summary = customCollector.summary() + + then: + noExceptionThrown() + summary == "" + } + + def "test multiple close calls do not throw exception"() { + when: + RumTelemetryCollector.NO_OP.close() + RumTelemetryCollector.NO_OP.close() + RumTelemetryCollector.NO_OP.close() + + then: + noExceptionThrown() + } + + def "test multiple summary calls return the same empty string"() { + when: + def summary1 = RumTelemetryCollector.NO_OP.summary() + def summary2 = RumTelemetryCollector.NO_OP.summary() + def summary3 = RumTelemetryCollector.NO_OP.summary() + + then: + summary1 == "" + summary1 == summary2 + summary2 == summary3 + } +}