From 093e806564917632906c197bec454a6e9e66180c Mon Sep 17 00:00:00 2001 From: Mohammed Abdessetar Elyagoubi Date: Fri, 10 Apr 2026 12:16:54 +0100 Subject: [PATCH 1/2] Bound JdkHttpSender thread pool to prevent DoS via unbounded thread creation The default executor used Integer.MAX_VALUE max threads with a SynchronousQueue, allowing thousands of threads under burst load. Cap at max(availableProcessors, 5) with CallerRunsPolicy for backpressure, and await termination on shutdown so in-flight requests complete before the HttpClient is closed. --- .../sender/jdk/internal/JdkHttpSender.java | 43 +++++++++++-------- .../jdk/internal/JdkHttpSenderTest.java | 33 ++++++++++++++ 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 2ffb296de9c..ea7c4c52556 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; @@ -133,7 +134,7 @@ public final class JdkHttpSender implements HttpSender { private static ExecutorService newExecutor() { return new ThreadPoolExecutor( 0, - Integer.MAX_VALUE, + Math.max(Runtime.getRuntime().availableProcessors(), 5), 60, TimeUnit.SECONDS, new SynchronousQueue<>(), @@ -157,24 +158,28 @@ private static HttpClient configureClient( @Override public void send( MessageWriter messageWriter, Consumer onResponse, Consumer onError) { - CompletableFuture unused = - CompletableFuture.supplyAsync( - () -> { - try { - return sendInternal(messageWriter); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, - executorService) - .whenComplete( - (httpResponse, throwable) -> { - if (throwable != null) { - onError.accept(throwable); - return; - } - onResponse.accept(httpResponse); - }); + try { + CompletableFuture unused = + CompletableFuture.supplyAsync( + () -> { + try { + return sendInternal(messageWriter); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, + executorService) + .whenComplete( + (httpResponse, throwable) -> { + if (throwable != null) { + onError.accept(throwable); + return; + } + onResponse.accept(httpResponse); + }); + } catch (RejectedExecutionException e) { + onError.accept(e); + } } // Visible for testing diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index 24f764f3857..85e843d8d1c 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -27,6 +27,7 @@ import java.net.http.HttpConnectTimeoutException; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; import org.assertj.core.api.InstanceOfAssertFactories; @@ -166,6 +167,38 @@ void sendInternal_NonRetryableException() throws IOException, InterruptedExcepti verify(mockHttpClient, times(1)).send(any(), any()); } + @Test + void defaultExecutor_isBounded() { + JdkHttpSender defaultSender = + new JdkHttpSender( + URI.create("http://localhost"), + "text/plain", + null, + Duration.ofNanos(1), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + null, + Long.MAX_VALUE); + + try { + int expectedMax = Math.max(Runtime.getRuntime().availableProcessors(), 5); + assertThat(defaultSender) + .extracting( + "executorService", as(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))) + .satisfies( + executor -> { + assertThat(executor.getMaximumPoolSize()).isEqualTo(expectedMax); + assertThat(executor.getRejectedExecutionHandler()) + .isInstanceOf(ThreadPoolExecutor.AbortPolicy.class); + }); + } finally { + defaultSender.shutdown(); + } + } + @Test void connectTimeout() { sender = From d4146be5074df24fc9fccdf80a90ad28a731bf29 Mon Sep 17 00:00:00 2001 From: abdessattar23 Date: Thu, 16 Apr 2026 09:57:25 +0100 Subject: [PATCH 2/2] Add test coverage for JdkHttpSender.send() callback paths --- .../jdk/internal/JdkHttpSenderTest.java | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index 85e843d8d1c..e329266b80d 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -9,15 +9,20 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.HttpResponse; import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Method; import java.net.ConnectException; @@ -25,10 +30,15 @@ import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpConnectTimeoutException; +import java.net.http.HttpHeaders; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLException; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.BeforeEach; @@ -222,6 +232,124 @@ void connectTimeout() { assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10))); } + @SuppressWarnings("unchecked") + @Test + void send_successfulResponse_callsOnResponse() throws Exception { + java.net.http.HttpResponse mockJdkResponse = + mock(java.net.http.HttpResponse.class); + when(mockJdkResponse.statusCode()).thenReturn(200); + when(mockJdkResponse.body()).thenReturn(new ByteArrayInputStream(new byte[0])); + when(mockJdkResponse.headers()) + .thenReturn(HttpHeaders.of(Collections.emptyMap(), (a, b) -> true)); + doReturn(mockJdkResponse).when(mockHttpClient).send(any(), any()); + + JdkHttpSender testSender = + new JdkHttpSender( + mockHttpClient, + URI.create("http://localhost"), + "text/plain", + null, + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + Long.MAX_VALUE); + + try { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + testSender.send( + new NoOpRequestBodyWriter(), + response -> { + responseRef.set(response); + latch.countDown(); + }, + error -> { + errorRef.set(error); + latch.countDown(); + }); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(responseRef.get()).isNotNull(); + assertThat(responseRef.get().getStatusCode()).isEqualTo(200); + assertThat(errorRef.get()).isNull(); + } finally { + testSender.shutdown(); + } + } + + @Test + void send_ioException_callsOnError() throws Exception { + doThrow(new IOException("send failed")).when(mockHttpClient).send(any(), any()); + + JdkHttpSender testSender = + new JdkHttpSender( + mockHttpClient, + URI.create("http://localhost"), + "text/plain", + null, + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + Long.MAX_VALUE); + + try { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + testSender.send( + new NoOpRequestBodyWriter(), + response -> { + responseRef.set(response); + latch.countDown(); + }, + error -> { + errorRef.set(error); + latch.countDown(); + }); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(errorRef.get()).isNotNull(); + assertThat(errorRef.get()).hasRootCauseInstanceOf(IOException.class); + assertThat(errorRef.get()).hasRootCauseMessage("send failed"); + assertThat(responseRef.get()).isNull(); + } finally { + testSender.shutdown(); + } + } + + @Test + void send_rejectedExecution_callsOnError() { + ThreadPoolExecutor executor = + new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); + executor.shutdown(); + + JdkHttpSender testSender = + new JdkHttpSender( + mockHttpClient, + URI.create("http://localhost"), + "text/plain", + null, + Duration.ofSeconds(10), + Collections::emptyMap, + null, + executor, + Long.MAX_VALUE); + + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + testSender.send(new NoOpRequestBodyWriter(), responseRef::set, errorRef::set); + + assertThat(errorRef.get()).isNotNull(); + assertThat(errorRef.get()).isInstanceOf(RejectedExecutionException.class); + assertThat(responseRef.get()).isNull(); + } + private static class NoOpRequestBodyWriter implements MessageWriter { @Override public void writeMessage(OutputStream output) {}