diff --git a/README.md b/README.md
index 4c6fcc51..8d094969 100644
--- a/README.md
+++ b/README.md
@@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file:
+ * - Authentication: Used for retrying authentication requests when the server is unreachable. + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * where the SDK needs to fetch updated flag or group data. + *
+ *
+ * The default value is {@code 10}. + *
+ * Note: This setting does not apply to streaming requests (either the initial connection or + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * (infinite retries). + *
+ * Example usage: + *
+ * {@code
+ * BaseConfig config = BaseConfig.builder()
+ * .maxRequestRetry(20)
+ * .build();
+ * }
+ *
+ */
+ @Builder.Default private final long maxRequestRetry = DEFAULT_REQUEST_RETRIES;
}
diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java
index bec1ed88..7ef4090d 100644
--- a/src/main/java/io/harness/cf/client/api/InnerClient.java
+++ b/src/main/java/io/harness/cf/client/api/InnerClient.java
@@ -87,7 +87,6 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
log.info("Starting SDK client with configuration: {}", this.options);
this.connector = connector;
this.connector.setOnUnauthorized(this::onUnauthorized);
-
// initialization
repository =
new StorageRepository(
@@ -96,7 +95,9 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
authService = new AuthService(this.connector, options.getPollIntervalInSeconds(), this);
pollProcessor =
new PollingProcessor(this.connector, repository, options.getPollIntervalInSeconds(), this);
- metricsProcessor = new MetricsProcessor(this.connector, this.options, this);
+ metricsProcessor =
+ new MetricsProcessor(
+ this.connector, this.options, this, connector.getShouldFlushAnalyticsOnClose());
updateProcessor = new UpdateProcessor(this.connector, this.repository, this);
// start with authentication
@@ -228,7 +229,9 @@ public void onDisconnected(String reason) {
closing,
options.getPollIntervalInSeconds());
log.debug("SSE disconnect detected - asking poller to refresh flags");
- pollProcessor.retrieveAll();
+ if (!closing) {
+ pollProcessor.retrieveAll();
+ }
}
}
@@ -388,6 +391,12 @@ public void processEvaluation(
public void close() {
log.info("Closing the client");
closing = true;
+
+ // Mark the connector as shutting down to stop request retries from taking place. The
+ // connections will eventually
+ // be evicted when the connector is closed, but this ensures that if metrics are flushed when
+ // closed then it won't attempt to retry if the first request fails.
+ connector.setIsShuttingDown();
off();
authService.close();
repository.close();
diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java
index e1e27590..2958dbb6 100644
--- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java
+++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java
@@ -4,6 +4,7 @@
import static io.harness.cf.client.common.Utils.shutdownExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;
+import io.harness.cf.Version;
import io.harness.cf.client.common.SdkCodes;
import io.harness.cf.client.common.StringUtils;
import io.harness.cf.client.connector.Connector;
@@ -15,10 +16,7 @@
import io.harness.cf.model.TargetData;
import io.harness.cf.model.Variation;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -113,13 +111,19 @@ public boolean containsKey(K key) {
private final LongAdder metricsSent = new LongAdder();
private final int maxFreqMapSize;
+ private final boolean shouldFlushMetricsOnClose;
+
public MetricsProcessor(
- @NonNull Connector connector, @NonNull BaseConfig config, @NonNull MetricsCallback callback) {
+ @NonNull Connector connector,
+ @NonNull BaseConfig config,
+ @NonNull MetricsCallback callback,
+ boolean shouldFlushMetricsOnClose) {
this.connector = connector;
this.config = config;
this.frequencyMap = new FrequencyMap<>();
this.targetsSeen = ConcurrentHashMap.newKeySet();
this.maxFreqMapSize = clamp(config.getBufferSize(), 2048, MAX_FREQ_MAP_TO_RETAIN);
+ this.shouldFlushMetricsOnClose = shouldFlushMetricsOnClose;
callback.onMetricsReady();
}
@@ -218,7 +222,7 @@ protected Metrics prepareSummaryMetricsBody(Map+ * - Authentication: Used for retrying authentication requests when the server is unreachable. + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * where the SDK needs to fetch updated flag or group data. + *
+ * Note: This setting does not apply to streaming requests (either the initial connection or + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * (infinite retries). + */ + @Builder.Default private long maxRequestRetry = 10; + + /** + * Indicates whether to flush analytics data when the SDK is closed. + *
+ * When set to {@code true}, any remaining analytics data (such as metrics) + * will be sent to the server before the SDK is fully closed. If {@code false}, + * the data will not be flushed, and any unsent analytics data may be lost. + *
+ * The default value is {@code false}. + *
+ * Note: The flush will attempt to send the data in a single request. + * Any failures during this process will not be retried, and the analytics data + * may be lost. + * + *
Example usage: + *
+ * {@code
+ * HarnessConfig harnessConfig = HarnessConfig.builder()
+ * .flushAnalyticsOnClose(true)
+ * .build();
+ * }
+ *
+ */
+ @Builder.Default private final boolean flushAnalyticsOnClose = false;
+
+ /**
+ * The timeout for flushing analytics on SDK close.
+ * + * This option sets the maximum duration, in milliseconds, the SDK will wait for the + * analytics data to be flushed after the SDK has been closed. If the flush process takes longer + * than this timeout, the request will be canceled, and any remaining data will + * not be sent. This ensures that the SDK does not hang indefinitely during shutdown. + *
+ * The default value is {@code 30000ms} which is the default read timeout for requests made by the SDK + *
+ * Note: This timeout only applies to the flush process that happens when + * {@code flushAnalyticsOnClose} is set to {@code true}. It does not affect other + * requests made by the SDK during normal operation. + * + *
Example usage: + *
+ * {@code
+ *
+ * HarnessConfig harnessConfig = HarnessConfig.builder()
+ * .flushAnalyticsOnClose(true)
+ * // Timeout the analytics flush request in 3000ms (3 seconds)
+ * .flushAnalyticsOnCloseTimeout(3000).build();
+ * .build();
+ * }
+ *
+ */
+ @Builder.Default private final int flushAnalyticsOnCloseTimeout = 30000;
}
diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java
index 2486b837..e869b66f 100644
--- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java
+++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java
@@ -15,6 +15,8 @@
import java.security.cert.X509Certificate;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -33,6 +35,8 @@ public class HarnessConnector implements Connector, AutoCloseable {
private final String apiKey;
private final HarnessConfig options;
+ private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+
private String token;
private String environmentUuid;
private String cluster;
@@ -89,13 +93,19 @@ ApiClient makeApiClient(int retryBackOfDelay) {
.getHttpClient()
.newBuilder()
.addInterceptor(this::reauthInterceptor)
- .addInterceptor(new NewRetryInterceptor(3, retryBackOfDelay))
+ .addInterceptor(
+ new NewRetryInterceptor(
+ options.getMaxRequestRetry(), retryBackOfDelay, isShuttingDown))
.build());
return apiClient;
}
private Response reauthInterceptor(Interceptor.Chain chain) throws IOException {
+ if (isShuttingDown.get()) {
+ return null;
+ }
+
final Request request =
chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build();
log.debug("Checking for 403 in interceptor: requesting url {}", request.url().url());
@@ -127,18 +137,39 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) {
.getHttpClient()
.newBuilder()
.addInterceptor(this::metricsInterceptor)
- .addInterceptor(new NewRetryInterceptor(3, retryBackoffDelay))
+ .addInterceptor(
+ new NewRetryInterceptor(
+ options.getMaxRequestRetry(), retryBackoffDelay, isShuttingDown))
.build());
return apiClient;
}
private Response metricsInterceptor(Interceptor.Chain chain) throws IOException {
- final Request request =
- chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build();
- log.debug("metrics interceptor: requesting url {}", request.url().url());
- return chain.proceed(request);
+ Request originalRequest = chain.request();
+
+ // If this is flush when the SDK has been closed, then apply a per request timeout instead
+ // of the okhttp client timeout
+ if (isShuttingDown.get()) {
+ log.debug("SDK is shutting down, applying custom call timeout for flush request");
+
+ Request shutdownRequest =
+ originalRequest.newBuilder().addHeader("X-Request-ID", getRequestID()).build();
+
+ // Apply custom timeouts (e.g., 5 seconds for each timeout type)
+ return chain
+ .withConnectTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS)
+ .withReadTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS)
+ .withWriteTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS)
+ .proceed(shutdownRequest);
+ } else {
+ final Request request =
+ originalRequest.newBuilder().addHeader("X-Request-ID", getRequestID()).build();
+ log.debug("metrics interceptor: requesting url {}", request.url().url());
+
+ return chain.proceed(request);
+ }
}
protected String getRequestID() {
@@ -405,13 +436,15 @@ public Service stream(@NonNull final Updater updater) throws ConnectorException
updater,
Math.max(options.getSseReadTimeout(), 1),
ThreadLocalRandom.current().nextInt(5000, 10000),
- options.getTlsTrustedCAs());
+ options.getTlsTrustedCAs(),
+ isShuttingDown);
return eventSource;
}
@Override
public void close() {
log.debug("closing connector");
+ isShuttingDown.set(true);
api.getApiClient().getHttpClient().connectionPool().evictAll();
log.debug("All apiClient connections evicted");
metricsApi.getApiClient().getHttpClient().connectionPool().evictAll();
@@ -437,6 +470,15 @@ private void setupTls(ApiClient apiClient) {
}
}
+ public void setIsShuttingDown() {
+ this.isShuttingDown.set(true);
+ }
+
+ @Override
+ public boolean getShouldFlushAnalyticsOnClose() {
+ return options.isFlushAnalyticsOnClose();
+ }
+
private static boolean isNullOrEmpty(String string) {
return string == null || string.trim().isEmpty();
}
diff --git a/src/main/java/io/harness/cf/client/connector/LocalConnector.java b/src/main/java/io/harness/cf/client/connector/LocalConnector.java
index 6dd0aa2d..5ce77297 100644
--- a/src/main/java/io/harness/cf/client/connector/LocalConnector.java
+++ b/src/main/java/io/harness/cf/client/connector/LocalConnector.java
@@ -195,6 +195,16 @@ public void close() {
log.debug("LocalConnector closed");
}
+ @Override
+ public boolean getShouldFlushAnalyticsOnClose() {
+ return false;
+ }
+
+ @Override
+ public void setIsShuttingDown() {
+ // No need for local connector as no retries used
+ }
+
private class FileWatcherService implements Service, AutoCloseable {
private final FileWatcher flagWatcher;
private final FileWatcher segmentWatcher;
diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java
index 0bafbb54..7df56229 100644
--- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java
+++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java
@@ -1,5 +1,7 @@
package io.harness.cf.client.connector;
+import static io.harness.cf.client.api.BaseConfig.DEFAULT_REQUEST_RETRIES;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -8,6 +10,7 @@
import java.util.Date;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.*;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@@ -19,16 +22,33 @@ public class NewRetryInterceptor implements Interceptor {
private static final SimpleDateFormat imfDateFormat =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US);
private final long retryBackoffDelay;
- private final long maxTryCount;
+ private final boolean retryForever;
+
+ private final AtomicBoolean isShuttingDown;
+
+ // Use SDK default is not specified
+ private long maxTryCount = DEFAULT_REQUEST_RETRIES;
- public NewRetryInterceptor(long retryBackoffDelay) {
+ public NewRetryInterceptor(long retryBackoffDelay, AtomicBoolean isShuttingDown) {
this.retryBackoffDelay = retryBackoffDelay;
- this.maxTryCount = 5;
+ this.retryForever = false;
+ this.isShuttingDown = isShuttingDown;
}
- public NewRetryInterceptor(long maxTryCount, long retryBackoffDelay) {
+ public NewRetryInterceptor(
+ long maxTryCount, long retryBackoffDelay, AtomicBoolean isShuttingDown) {
this.retryBackoffDelay = retryBackoffDelay;
this.maxTryCount = maxTryCount;
+ this.retryForever = false;
+ this.isShuttingDown = isShuttingDown;
+ }
+
+ // New constructor with retryForever flag
+ public NewRetryInterceptor(
+ long retryBackoffDelay, boolean retryForever, AtomicBoolean isShuttingDown) {
+ this.retryBackoffDelay = retryBackoffDelay;
+ this.retryForever = retryForever;
+ this.isShuttingDown = isShuttingDown;
}
@NotNull
@@ -36,7 +56,7 @@ public NewRetryInterceptor(long maxTryCount, long retryBackoffDelay) {
public Response intercept(@NotNull Chain chain) throws IOException {
int tryCount = 1;
boolean successful;
- boolean limitReached = false;
+ boolean limitReached;
Response response = null;
String msg = "";
do {
@@ -66,29 +86,44 @@ public Response intercept(@NotNull Chain chain) throws IOException {
return response;
}
}
+
if (!successful) {
int retryAfterHeaderValue = getRetryAfterHeaderInSeconds(response);
long backOffDelayMs;
+
if (retryAfterHeaderValue > 0) {
// Use Retry-After header if detected first
log.trace("Retry-After header detected: {} seconds", retryAfterHeaderValue);
backOffDelayMs = retryAfterHeaderValue * 1000L;
} else {
- // Else fallback to a randomized exponential backoff
- backOffDelayMs = retryBackoffDelay * tryCount;
+ // Else fallback to a randomized exponential backoff with a max delay of 1 minute
+ // (60,000ms)
+ backOffDelayMs = Math.min(retryBackoffDelay * tryCount, 60000L);
+ }
+
+ String retryLimitDisplay = retryForever ? "∞" : String.valueOf(maxTryCount);
+ limitReached = !retryForever && tryCount >= maxTryCount;
+
+ if (isShuttingDown.get()) {
+ log.warn(
+ "Request attempt {} to {} was not successful, [{}], SDK is shutting down, no retries will be attempted",
+ tryCount,
+ chain.request().url(),
+ msg);
+ return response; // Exit without further retries
}
- limitReached = tryCount >= maxTryCount;
log.warn(
- "Request attempt {} to {} was not successful, [{}]{}",
+ "Request attempt {} of {} to {} was not successful, [{}]{}",
tryCount,
+ retryLimitDisplay,
chain.request().url(),
msg,
limitReached
- ? ", retry limited reached"
+ ? ", retry limit reached"
: String.format(
Locale.getDefault(),
- ", retrying in %dms (retry-after hdr: %b)",
+ ", retrying in %dms (retry-after hdr: %b)",
backOffDelayMs,
retryAfterHeaderValue > 0));
@@ -97,7 +132,7 @@ public Response intercept(@NotNull Chain chain) throws IOException {
}
}
tryCount++;
- } while (!successful && !limitReached);
+ } while (!successful && (retryForever || tryCount <= maxTryCount) && !isShuttingDown.get());
return response;
}
diff --git a/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java b/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java
index abb1676d..e2b33e94 100644
--- a/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java
+++ b/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java
@@ -50,7 +50,8 @@ void testRegisterEvaluationContention() throws Exception {
BaseConfig.builder()
// .globalTargetEnabled(false)
.build(),
- new DummyMetricsCallback());
+ new DummyMetricsCallback(),
+ false);
metricsProcessor.start();
diff --git a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java
index 5e6b3efd..cda8b26a 100644
--- a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java
+++ b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java
@@ -1,8 +1,7 @@
package io.harness.cf.client.api;
import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import io.harness.cf.client.connector.Connector;
import io.harness.cf.client.connector.ConnectorException;
@@ -34,11 +33,55 @@ public void setup() {
MockitoAnnotations.openMocks(this);
metricsProcessor =
Mockito.spy(
- new MetricsProcessor(connector, BaseConfig.builder().bufferSize(10_001).build(), this));
+ new MetricsProcessor(
+ connector, BaseConfig.builder().bufferSize(10_001).build(), this, false));
metricsProcessor.reset();
}
+ @Test
+ public void testFlushAnalyticsOnCloseDisabled() throws ConnectorException, InterruptedException {
+ // Arrange
+ Metrics mockMetrics = mock(Metrics.class);
+ doNothing().when(connector).postMetrics(mockMetrics);
+
+ // Act: Push some metrics data and call flush
+ Target target = Target.builder().identifier("target-1").build();
+ Variation variation = Variation.builder().identifier("true").value("true").build();
+ metricsProcessor.pushToQueue(target, "feature-1", variation);
+
+ // Mimic shutdown behavior
+ metricsProcessor.close();
+
+ // Assert: Verify that postMetrics not called during shutdown
+ verify(connector, times(0)).postMetrics(any(Metrics.class));
+ verifyNoMoreInteractions(connector);
+ }
+
+ @Test
+ public void testFlushAnalyticsOnCloseEnabled() throws ConnectorException, InterruptedException {
+ // Arrange
+ metricsProcessor =
+ Mockito.spy(
+ new MetricsProcessor(
+ connector, BaseConfig.builder().bufferSize(10_001).build(), this, true));
+
+ Metrics mockMetrics = mock(Metrics.class);
+ doNothing().when(connector).postMetrics(mockMetrics);
+
+ // Act: Push some metrics data and call flush
+ Target target = Target.builder().identifier("target-1").build();
+ Variation variation = Variation.builder().identifier("true").value("true").build();
+ metricsProcessor.pushToQueue(target, "feature-1", variation);
+
+ // Mimic shutdown behavior
+ metricsProcessor.close();
+
+ // Assert: Verify that postMetrics is called during shutdown
+ verify(connector, times(1)).postMetrics(any(Metrics.class));
+ metricsProcessor.reset();
+ }
+
@Test
public void testPushToQueue() throws InterruptedException {
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(BUFFER_SIZE);
@@ -183,7 +226,7 @@ void shouldPostCorrectMetrics_WhenGlobalTargetEnabledOrDisabled(boolean globalTa
doNothing().when(mockConnector).postMetrics(metricsArgumentCaptor.capture());
final MetricsProcessor processor =
- new MetricsProcessor(mockConnector, mockConfig, Mockito.mock(MetricsCallback.class));
+ new MetricsProcessor(mockConnector, mockConfig, Mockito.mock(MetricsCallback.class), false);
final Target target = Target.builder().identifier("target123").build();
final Variation variation = Variation.builder().identifier("true").value("true").build();
diff --git a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java
index 69101a7e..df8086e7 100644
--- a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java
+++ b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java
@@ -79,4 +79,12 @@ public int getTotalMetricEvaluations() {
@Override
public void close() {}
+
+ @Override
+ public boolean getShouldFlushAnalyticsOnClose() {
+ return false;
+ }
+
+ @Override
+ public void setIsShuttingDown() {}
}
diff --git a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java
index 9cf33132..8d0e5bd1 100644
--- a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java
+++ b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java
@@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -27,8 +28,11 @@ static class StreamDispatcher extends Dispatcher {
protected MockResponse makeStreamResponse() {
int reqNo = request.getAndIncrement();
- if (reqNo <= 3) {
- // Force a disconnect on the first few attempts
+
+ if (reqNo <= 12) {
+ // Force a disconnect after the default SDK request retry limit of 10, which does not apply
+ // to stream requests which have
+ // no limit on retryable errors
out.printf("ReqNo %d will be disconnected on purpose\n", reqNo);
return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST);
} else {
@@ -61,7 +65,10 @@ protected MockResponse makeStreamResponse() {
int reqNo = request.getAndIncrement();
// Force a disconnect on all requests
out.printf("ReqNo %d will be disconnected on purpose\n", reqNo);
- return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST);
+ // Set a 400 response so that the stream does not retry. This is because since 1.8.0 the
+ // stream
+ // retries forever on retryable errors.
+ return new MockResponse().setResponseCode(400).setBody("{\"status\":\"failed\"}");
}
}
@@ -78,7 +85,8 @@ void shouldNotCallErrorHandlerIfRetryEventuallyReconnectsToStreamEndpoint()
updater,
1,
1,
- null)) {
+ null,
+ new AtomicBoolean(false))) {
eventSource.start();
TimeUnit.SECONDS.sleep(15);
@@ -104,14 +112,15 @@ void shouldRestartPollerIfAllConnectionAttemptsToStreamEndpointFail()
updater,
1,
1,
- null)) {
+ null,
+ new AtomicBoolean(false))) {
eventSource.start();
- TimeUnit.SECONDS.sleep(15);
+ TimeUnit.SECONDS.sleep(3);
}
- // for this test, connection to the /stream endpoint will never succeed.
- // we expect the disconnect handler to be called, connect handler should not be called
+ // for this test, connection to the /stream endpoint will never because of an un-retryable
+ // error. We expect the disconnect handler to be called, connect handler should not be called
assertEquals(0, updater.getConnectCount().get());
assertEquals(0, updater.getFailureCount().get());