Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,10 @@ default ClientBuilder getPulsarClientBuilder() {
throw new UnsupportedOperationException("not implemented");
}

/**
* Terminate the function instance with a fatal exception.
*
* @param t the fatal exception to be raised
*/
void fatal(Throwable t);
Comment thread
shibd marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,22 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable

private final Function.FunctionDetails.ComponentType componentType;

private final java.util.function.Consumer<Throwable> fatalHandler;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder)
throws PulsarClientException {
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder,
java.util.function.Consumer<Throwable> fatalHandler) {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
this.client = client;
this.pulsarAdmin = pulsarAdmin;
this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader());
this.statsManager = statsManager;
this.fatalHandler = fatalHandler;

this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -534,6 +537,11 @@ public ClientBuilder getPulsarClientBuilder() {
return clientBuilder;
}

@Override
public void fatal(Throwable t) {
fatalHandler.accept(t);
}

private <T> Producer<T> getProducer(String topicName, Schema<T> schema) throws PulsarClientException {
Producer<T> producer;
if (tlPublishProducers != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private JavaInstance javaInstance;
@Getter
private Throwable deathException;
private volatile Throwable deathException;

// function stats
private ComponentStatsManager stats;
Expand Down Expand Up @@ -282,9 +282,14 @@ private synchronized void setup() throws Exception {
ContextImpl setupContext() throws PulsarClientException {
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
Thread currentThread = Thread.currentThread();
Consumer<Throwable> fatalHandler = throwable -> {
this.deathException = throwable;
currentThread.interrupt();
};
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, fatalHandler);
}

public interface AsyncResultConsumer {
Expand Down Expand Up @@ -340,16 +345,35 @@ public void run() {
// process the synchronous results
handleResult(currentRecord, result);
}

if (deathException != null) {
Comment thread
shibd marked this conversation as resolved.
// Ideally the current java instance thread will be interrupted when the deathException is set.
// But if the CompletableFuture returned by the Pulsar Function is completed exceptionally(the
// function has invoked the fatal method) before being put into the JavaInstance
// .pendingAsyncRequests, the interrupted exception may be thrown when putting this future to
// JavaInstance.pendingAsyncRequests. The interrupted exception would be caught by the JavaInstance
// and be skipped.
// Therefore, we need to handle this case by checking the deathException here and rethrow it.
throw deathException;
}
}
} catch (Throwable t) {
log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), t);
deathException = t;
if (deathException != null) {
log.error("[{}] Fatal exception occurred in the instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), deathException);
} else {
log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), t);
deathException = t;
}
if (stats != null) {
stats.incrSysExceptions(t);
stats.incrSysExceptions(deathException);
}
} finally {
log.info("Closing instance");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void setup() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
context.setCurrentMessageContext((Record<String>) () -> null);
}

Expand Down Expand Up @@ -231,7 +232,7 @@ public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws PulsarClien
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
context.getPulsarAdmin();
}

Expand All @@ -245,7 +246,7 @@ public void testUnsupportedExtendedSinkContext() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
try {
context.seek("z", 0, Mockito.mock(MessageId.class));
Assert.fail("Expected exception");
Expand Down Expand Up @@ -276,7 +277,7 @@ public void testExtendedSinkContext() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand Down Expand Up @@ -308,7 +309,7 @@ public void testGetConsumer() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand All @@ -332,7 +333,7 @@ public void testGetConsumerMultiTopic() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
Expand Down Expand Up @@ -435,4 +436,23 @@ public Map<String, String> getProperties() {
assertEquals(record.getProperties().get("prop-key"), "prop-value");
assertNull(record.getValue());
}

@Test
public void testFatal() {
Throwable fatalException = new Exception("test-fatal-exception");
AtomicBoolean fatalInvoked = new AtomicBoolean(false);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder, t -> {
assertEquals(t, fatalException);
fatalInvoked.set(true);
});
context.fatal(fatalException);
assertTrue(fatalInvoked.get());
}
}
Loading