Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private void onProviderEvent(
case PROVIDER_ERROR:
if (providerEventDetails != null
&& providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) {
onFatal();
onFatal(providerEventDetails);
break;
}

Expand Down Expand Up @@ -283,15 +283,12 @@ private void onError() {
}
}

private void onFatal() {
private void onFatal(ProviderEventDetails providerEventDetails) {
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
}
this.syncResources.setFatal(true);

this.emitProviderError(ProviderEventDetails.builder()
.errorCode(ErrorCode.PROVIDER_FATAL)
.build());
this.syncResources.fatalError(providerEventDetails);
this.emitProviderError(providerEventDetails);

shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.exceptions.FatalError;
import dev.openfeature.sdk.exceptions.GeneralError;
import lombok.Getter;
Expand All @@ -15,10 +16,10 @@
@Getter
class FlagdProviderSyncResources {
@Setter
private volatile ProviderEvent previousEvent = null;
private volatile ProviderEvent previousEvent;

@Setter
private volatile boolean isFatal;
private volatile ProviderEventDetails fatalProviderEventDetails;

private volatile EvaluationContext enrichedContext = new ImmutableContext();
private volatile boolean isInitialized;
Expand All @@ -39,7 +40,6 @@ public synchronized boolean initialize() {
return false;
}
this.isInitialized = true;
this.isFatal = false;
this.notifyAll();
return true;
}
Expand Down Expand Up @@ -68,7 +68,7 @@ public synchronized boolean initialize() {
public void waitForInitialization(long deadline) {
long start = System.currentTimeMillis();
long end = start + deadline;
while (!isInitialized && !isShutDown) {
while (!isInitialized && !isShutDown && !isFatal) {
long now = System.currentTimeMillis();
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
Expand All @@ -77,7 +77,7 @@ public void waitForInitialization(long deadline) {
}
long remaining = end - now;
synchronized (this) {
if (isShutDown) {
if (isShutDown || isFatal) {
break;
}
if (isInitialized) { // might have changed in the meantime
Expand All @@ -91,12 +91,16 @@ public void waitForInitialization(long deadline) {
}
}
}
if (isShutDown) {
String msg = "Already shut down due to previous error.";
if (isFatal) {
throw new FatalError(msg);
if (isFatal) {
var fatalEvent = fatalProviderEventDetails;
if (fatalEvent != null) {
Comment thread
chrfwow marked this conversation as resolved.
throw new FatalError("Initialization failed due to a fatal error: " + fatalEvent.getMessage());
} else {
throw new FatalError("Initialization failed due to a fatal error.");
}
throw new GeneralError(msg);
}
Comment on lines +94 to +101
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The FatalError exception is being thrown without propagating the ErrorCode from fatalProviderEventDetails. The test initAfterFatalPropagatesErrorEvent asserts that the ErrorCode should be PROVIDER_FATAL, but the current FatalError constructor only takes a message. This likely defaults to the GENERAL error code in the underlying OpenFeatureError, which would cause the test to fail.

To ensure the correct error code is propagated, you should use a constructor that accepts an ErrorCode. Assuming FatalError has been updated in the SDK to include a constructor that takes an ErrorCode, you should use it to pass the error code from the event. This will ensure the error code is correctly propagated as intended.

Suggested change
if (isFatal) {
var fatalEvent = fatalProviderEventDetails;
if (fatalEvent != null) {
throw new FatalError("Initialization failed due to a fatal error: " + fatalEvent.getMessage());
} else {
throw new FatalError("Initialization failed due to a fatal error.");
}
}
if (isFatal) {
var fatalEvent = fatalProviderEventDetails;
if (fatalEvent != null && fatalEvent.getErrorCode() != null) {
throw new GeneralError(fatalEvent.getErrorCode(), "Initialization failed due to a fatal error: " + fatalEvent.getMessage());
} else {
throw new FatalError("Initialization failed due to a fatal error.");
}
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that the user only invokes fatalError(...) only with a fatal error. I do not plan to add any checks to that matter, because there is nothing to do in this case.
Therefore I will ignore this comment

if (isShutDown) {
throw new GeneralError("Already shut down due to previous error.");
}
}

Expand All @@ -107,4 +111,10 @@ public synchronized void shutdown() {
isShutDown = true;
this.notifyAll();
}

public synchronized void fatalError(ProviderEventDetails providerEventDetails) {
isFatal = true;
fatalProviderEventDetails = providerEventDetails;
this.notifyAll();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.exceptions.FatalError;
import dev.openfeature.sdk.exceptions.GeneralError;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -115,7 +118,6 @@ void callingInitialize_wakesUpWaitingThread() throws InterruptedException {
void callingShutdownWithPreviousNonFatal_wakesUpWaitingThread_WithGeneralException() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final AtomicBoolean successfulTest = new AtomicBoolean();
flagdProviderSyncResources.setFatal(false);

Thread waitingThread = new Thread(() -> {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -147,7 +149,7 @@ void callingShutdownWithPreviousNonFatal_wakesUpWaitingThread_WithGeneralExcepti
void callingShutdownWithPreviousFatal_wakesUpWaitingThread_WithFatalException() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final AtomicBoolean successfulTest = new AtomicBoolean();
flagdProviderSyncResources.setFatal(true);
flagdProviderSyncResources.fatalError(null);

Thread waitingThread = new Thread(() -> {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -184,4 +186,63 @@ void waitForInitializationAfterCallingInitialize_returnsInstantly() {
// do not use MAX_TIME_TOLERANCE here, this should happen faster than that
Assertions.assertTrue(start + 1 >= end);
}

@Timeout(2)
@Test
void fatalHasPrecedenceOverInitAndShutdown() {
flagdProviderSyncResources.fatalError(null);
flagdProviderSyncResources.initialize();
flagdProviderSyncResources.shutdown();

Assertions.assertThrows(FatalError.class, () -> flagdProviderSyncResources.waitForInitialization(10000));
}

@Timeout(2)
@Test
void fatalAbortsInit() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final AtomicLong waitTime = new AtomicLong(Long.MAX_VALUE);
final AtomicReference<Exception> fatalException = new AtomicReference<>();

Thread waitingThread = new Thread(() -> {
long start = System.currentTimeMillis();
isWaiting.set(true);
try {
flagdProviderSyncResources.waitForInitialization(10000);
} catch (Exception e) {
fatalException.set(e);
}
long end = System.currentTimeMillis();
long duration = end - start;
waitTime.set(duration);
});
waitingThread.start();

while (!isWaiting.get()) {
Thread.yield();
}

Thread.sleep(MAX_TIME_TOLERANCE); // waitingThread should have started waiting in the meantime

var fatalEvent = ProviderEventDetails.builder()
.errorCode(ErrorCode.PROVIDER_FATAL)
.message("Some message")
.build();
flagdProviderSyncResources.fatalError(fatalEvent);

waitingThread.join();

var wait = MAX_TIME_TOLERANCE * 3;

Assertions.assertTrue(
waitTime.get() < wait,
() -> "Wakeup should be almost instant, but took " + waitTime.get()
+ " ms, which is more than the max of"
+ wait + " ms");
Assertions.assertNotNull(fatalException.get());
Assertions.assertInstanceOf(FatalError.class, fatalException.get());
Assertions.assertEquals(
"Initialization failed due to a fatal error: " + fatalEvent.getMessage(),
fatalException.get().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveStringResponse;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceBlockingStub;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceStub;
import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.FlagEvaluationDetails;
import dev.openfeature.sdk.FlagValueType;
Expand All @@ -48,6 +49,7 @@
import dev.openfeature.sdk.Reason;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import dev.openfeature.sdk.exceptions.FatalError;
import dev.openfeature.sdk.internal.TriConsumer;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand All @@ -60,8 +62,10 @@
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
Expand Down Expand Up @@ -676,6 +680,38 @@ void updatesSyncMetadataWithCallback() throws Exception {
}
}

@Test
void initAfterFatalPropagatesErrorEvent() {
// given
final var ctx = new ImmutableContext();
final var metadata = new MutableStructure();

// mock a resolver
final var onEvent = new AtomicReference<TriConsumer<ProviderEvent, ProviderEventDetails, Structure>>();
try (var mockResolver = mockConstruction(
InProcessResolver.class,
(mock, context) -> onEvent.set((TriConsumer<ProviderEvent, ProviderEventDetails, Structure>)
context.arguments().get(1)))) {

FlagdProvider provider = new FlagdProvider(FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.build());

onEvent.get()
.accept(
ProviderEvent.PROVIDER_ERROR,
ProviderEventDetails.builder()
.message("msg")
.errorCode(ErrorCode.PROVIDER_FATAL)
.build(),
metadata);

var error = Assertions.assertThrows(FatalError.class, () -> provider.initialize(ctx));
Assertions.assertEquals("Initialization failed due to a fatal error: msg", error.getMessage());
Assertions.assertEquals(ErrorCode.PROVIDER_FATAL, error.getErrorCode());
}
}

// test helper
// create provider with given grpc provider and state supplier
private FlagdProvider createProvider(ChannelConnector connector, ServiceBlockingStub mockBlockingStub) {
Expand Down
Loading