Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -133,7 +134,7 @@ public final class JdkHttpSender implements HttpSender {
private static ExecutorService newExecutor() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
Math.max(Runtime.getRuntime().availableProcessors(), 5),
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand All @@ -157,24 +158,28 @@ private static HttpClient configureClient(
@Override
public void send(
MessageWriter messageWriter, Consumer<HttpResponse> onResponse, Consumer<Throwable> onError) {
CompletableFuture<HttpResponse> unused =
CompletableFuture.supplyAsync(
() -> {
try {
return sendInternal(messageWriter);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
},
executorService)
.whenComplete(
(httpResponse, throwable) -> {
if (throwable != null) {
onError.accept(throwable);
return;
}
onResponse.accept(httpResponse);
});
try {
CompletableFuture<HttpResponse> unused =
CompletableFuture.supplyAsync(
() -> {
try {
return sendInternal(messageWriter);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
},
executorService)
.whenComplete(
(httpResponse, throwable) -> {
if (throwable != null) {
onError.accept(throwable);
return;
}
onResponse.accept(httpResponse);
});
} catch (RejectedExecutionException e) {
onError.accept(e);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test coverage for this missing

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh Thanks a lot, maybe now it's able to be merged 🙂

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused why this is marked resolved - I don't see any test coverage added for it. Maybe you forgot to push a commit?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes m sorry i didn't commit and push, now it's good

}
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,36 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.HttpResponse;
import io.opentelemetry.sdk.common.export.MessageWriter;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpHeaders;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -166,6 +177,38 @@ void sendInternal_NonRetryableException() throws IOException, InterruptedExcepti
verify(mockHttpClient, times(1)).send(any(), any());
}

@Test
void defaultExecutor_isBounded() {
JdkHttpSender defaultSender =
new JdkHttpSender(
URI.create("http://localhost"),
"text/plain",
null,
Duration.ofNanos(1),
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
null,
null,
Long.MAX_VALUE);

try {
int expectedMax = Math.max(Runtime.getRuntime().availableProcessors(), 5);
assertThat(defaultSender)
.extracting(
"executorService", as(InstanceOfAssertFactories.type(ThreadPoolExecutor.class)))
.satisfies(
executor -> {
assertThat(executor.getMaximumPoolSize()).isEqualTo(expectedMax);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
});
} finally {
defaultSender.shutdown();
}
}

@Test
void connectTimeout() {
sender =
Expand All @@ -189,6 +232,124 @@ void connectTimeout() {
assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10)));
}

@SuppressWarnings("unchecked")
@Test
void send_successfulResponse_callsOnResponse() throws Exception {
java.net.http.HttpResponse<InputStream> mockJdkResponse =
mock(java.net.http.HttpResponse.class);
when(mockJdkResponse.statusCode()).thenReturn(200);
when(mockJdkResponse.body()).thenReturn(new ByteArrayInputStream(new byte[0]));
when(mockJdkResponse.headers())
.thenReturn(HttpHeaders.of(Collections.emptyMap(), (a, b) -> true));
doReturn(mockJdkResponse).when(mockHttpClient).send(any(), any());

JdkHttpSender testSender =
new JdkHttpSender(
mockHttpClient,
URI.create("http://localhost"),
"text/plain",
null,
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
Long.MAX_VALUE);

try {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

testSender.send(
new NoOpRequestBodyWriter(),
response -> {
responseRef.set(response);
latch.countDown();
},
error -> {
errorRef.set(error);
latch.countDown();
});

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(responseRef.get()).isNotNull();
assertThat(responseRef.get().getStatusCode()).isEqualTo(200);
assertThat(errorRef.get()).isNull();
} finally {
testSender.shutdown();
}
}

@Test
void send_ioException_callsOnError() throws Exception {
doThrow(new IOException("send failed")).when(mockHttpClient).send(any(), any());

JdkHttpSender testSender =
new JdkHttpSender(
mockHttpClient,
URI.create("http://localhost"),
"text/plain",
null,
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
Long.MAX_VALUE);

try {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

testSender.send(
new NoOpRequestBodyWriter(),
response -> {
responseRef.set(response);
latch.countDown();
},
error -> {
errorRef.set(error);
latch.countDown();
});

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(errorRef.get()).isNotNull();
assertThat(errorRef.get()).hasRootCauseInstanceOf(IOException.class);
assertThat(errorRef.get()).hasRootCauseMessage("send failed");
assertThat(responseRef.get()).isNull();
} finally {
testSender.shutdown();
}
}

@Test
void send_rejectedExecution_callsOnError() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
executor.shutdown();

JdkHttpSender testSender =
new JdkHttpSender(
mockHttpClient,
URI.create("http://localhost"),
"text/plain",
null,
Duration.ofSeconds(10),
Collections::emptyMap,
null,
executor,
Long.MAX_VALUE);

AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

testSender.send(new NoOpRequestBodyWriter(), responseRef::set, errorRef::set);

assertThat(errorRef.get()).isNotNull();
assertThat(errorRef.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(responseRef.get()).isNull();
}

private static class NoOpRequestBodyWriter implements MessageWriter {
@Override
public void writeMessage(OutputStream output) {}
Expand Down
Loading