From 0cd0f122ccb988e6153d59fa1f1c610d3cdbf5c4 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 7 Sep 2023 11:37:01 +0800 Subject: [PATCH 1/7] [improve][io] PIP-297: Support terminating Function & Connector with the fatal exception --- .../pulsar/functions/api/BaseContext.java | 6 + .../functions/instance/ContextImpl.java | 12 +- .../instance/JavaInstanceRunnable.java | 14 +- .../functions/instance/ContextImplTest.java | 32 +++- .../instance/JavaInstanceRunnableTest.java | 154 ++++++++++++++++++ 5 files changed, 209 insertions(+), 9 deletions(-) diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java index 25874c595d9d4..185031fa29d88 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java @@ -217,4 +217,10 @@ default ClientBuilder getPulsarClientBuilder() { throw new UnsupportedOperationException("not implemented"); } + /** + * Terminate the function instance with a fatal exception. + * + * @param t the fatal exception to be raised + */ + void fatal(Throwable t); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index d03f57e97205c..6664a00510e56 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -137,12 +137,14 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable private final Function.FunctionDetails.ComponentType componentType; + private final java.util.function.Consumer fatalHandler; + public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager, - StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder) - throws PulsarClientException { + StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder, + java.util.function.Consumer fatalHandler) { this.config = config; this.logger = logger; this.clientBuilder = clientBuilder; @@ -150,6 +152,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, this.pulsarAdmin = pulsarAdmin; this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader()); this.statsManager = statsManager; + this.fatalHandler = fatalHandler; this.producerBuilder = (ProducerBuilderImpl) client.newProducer().blockIfQueueFull(true).enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); @@ -534,6 +537,11 @@ public ClientBuilder getPulsarClientBuilder() { return clientBuilder; } + @Override + public void fatal(Throwable t) { + fatalHandler.accept(t); + } + private Producer getProducer(String topicName, Schema schema) throws PulsarClientException { Producer producer; if (tlPublishProducers != null) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 691e547256a7e..71bad3dc71a73 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -282,9 +282,14 @@ private synchronized void setup() throws Exception { ContextImpl setupContext() throws PulsarClientException { Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); + Thread currentThread = Thread.currentThread(); + Consumer fatalHandler = throwable -> { + this.deathException = throwable; + currentThread.interrupt(); + }; return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, fatalHandler); } public interface AsyncResultConsumer { @@ -342,6 +347,13 @@ public void run() { } } } catch (Throwable t) { + if (t instanceof InterruptedException && deathException != null) { + log.info("Encountered fatal exception: ", deathException); + if (stats != null) { + stats.incrSysExceptions(deathException); + } + return; + } log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId( instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace(), diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index e0ebb52da7490..659c83877e747 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; @@ -117,7 +118,7 @@ public void setup() throws PulsarClientException { client, new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, t -> {}); context.setCurrentMessageContext((Record) () -> null); } @@ -231,7 +232,7 @@ public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws PulsarClien new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, t -> {}); context.getPulsarAdmin(); } @@ -245,7 +246,7 @@ public void testUnsupportedExtendedSinkContext() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, t -> {}); try { context.seek("z", 0, Mockito.mock(MessageId.class)); Assert.fail("Expected exception"); @@ -276,7 +277,7 @@ public void testExtendedSinkContext() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, t -> {}); Consumer mockConsumer = Mockito.mock(Consumer.class); when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); context.setInputConsumers(Lists.newArrayList(mockConsumer)); @@ -308,7 +309,7 @@ public void testGetConsumer() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, t -> {}); Consumer mockConsumer = Mockito.mock(Consumer.class); when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString()); context.setInputConsumers(Lists.newArrayList(mockConsumer)); @@ -332,7 +333,7 @@ public void testGetConsumerMultiTopic() throws PulsarClientException { new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), - pulsarAdmin, clientBuilder); + pulsarAdmin, clientBuilder, t -> {}); ConsumerImpl consumer1 = Mockito.mock(ConsumerImpl.class); when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString()); ConsumerImpl consumer2 = Mockito.mock(ConsumerImpl.class); @@ -435,4 +436,23 @@ public Map getProperties() { assertEquals(record.getProperties().get("prop-key"), "prop-value"); assertNull(record.getValue()); } + + @Test + public void testFatal() { + Throwable fatalException = new Exception("test-fatal-exception"); + AtomicBoolean fatalInvoked = new AtomicBoolean(false); + context = new ContextImpl( + config, + logger, + client, + new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), + new String[0], + FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), + pulsarAdmin, clientBuilder, t -> { + assertEquals(t, fatalException); + fatalInvoked.set(true); + }); + context.fatal(fatalException); + assertTrue(fatalInvoked.get()); + } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 0ba1d24ba74fe..30a35efca169f 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -27,15 +27,21 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeSet; import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; import lombok.Setter; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -48,6 +54,11 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.Source; +import org.apache.pulsar.io.core.SourceContext; +import org.awaitility.Awaitility; import org.jetbrains.annotations.NotNull; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -91,6 +102,23 @@ private JavaInstanceRunnable createRunnable(FunctionDetails functionDetails) thr return javaInstanceRunnable; } + private JavaInstanceRunnable createRunnable(org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec, + String functionClassName, SinkSpec sinkSpec) + throws PulsarClientException { + ClientBuilder clientBuilder = mock(ClientBuilder.class); + when(clientBuilder.build()).thenReturn(null); + FunctionDetails functionDetails = FunctionDetails.newBuilder() + .setSource(sourceSpec) + .setClassName(functionClassName) + .setSink(sinkSpec) + .build(); + InstanceConfig config = createInstanceConfig(functionDetails); + config.setClusterName("test-cluster"); + return new JavaInstanceRunnable( + config, clientBuilder, PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(), null, null, + null, null, null, Thread.currentThread().getContextClassLoader(), null); + } + private Method makeAccessible(JavaInstanceRunnable javaInstanceRunnable) throws Exception { Method method = javaInstanceRunnable.getClass().getDeclaredMethod("setupSerDe", Class[].class, ClassLoader.class); @@ -333,4 +361,130 @@ public void testBeanPropertiesReader() throws Exception { .getBeanProperties(ConnectorTestConfig2.class); Assert.assertEquals(new TreeSet<>(beanProperties), new TreeSet<>(Arrays.asList("field1", "withGetter"))); } + + public static class TestSourceConnector implements Source { + + private LinkedBlockingQueue> queue; + private SourceContext context; + + public void pushRecord(Record record) throws Exception { + queue.put(record); + } + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + context = sourceContext; + queue = new LinkedBlockingQueue<>(); + } + + @Override + public Record read() throws Exception { + return queue.take(); + } + + @Override + public void close() throws Exception { + + } + + public void fatalConnector() { + context.fatal(new Exception(FailComponentType.FAIL_SOURCE.toString())); + } + } + + public static class TestFunction implements Function> { + @Override + public CompletableFuture process(String input, Context context) throws Exception { + return CompletableFuture.completedFuture(input).thenApply((value) -> { + if (FailComponentType.FAIL_FUNC.toString().equals(value)) { + context.fatal(new Exception(FailComponentType.FAIL_FUNC.toString())); + return null; + } else { + return value; + } + }); + } + } + + public static class TestSinkConnector implements Sink { + SinkContext context; + + @Override + public void open(Map config, SinkContext sinkContext) throws Exception { + this.context = sinkContext; + } + + @Override + public void write(Record record) throws Exception { + new Thread(() -> { + if (FailComponentType.FAIL_SINK.toString().equals(record.getValue())) { + context.fatal(new Exception(FailComponentType.FAIL_SINK.toString())); + } + }).start(); + } + + @Override + public void close() throws Exception { + + } + } + + private Object getPrivateField(JavaInstanceRunnable javaInstanceRunnable, String fieldName) + throws NoSuchFieldException, IllegalAccessException { + Field field = JavaInstanceRunnable.class.getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(javaInstanceRunnable); + } + + public enum FailComponentType { + FAIL_SOURCE, + FAIL_FUNC, + FAIL_SINK + } + + @DataProvider(name = "failComponentType") + public Object[][] failType() { + return new Object[][]{{FailComponentType.FAIL_SOURCE}, {FailComponentType.FAIL_FUNC}, + {FailComponentType.FAIL_SINK}}; + } + + @Test(dataProvider = "failComponentType") + public void testFatalTheInstance(FailComponentType failComponentType) throws Exception { + JavaInstanceRunnable javaInstanceRunnable = createRunnable( + org.apache.pulsar.functions.proto.Function.SourceSpec.newBuilder() + .setClassName(TestSourceConnector.class.getName()).build(), + TestFunction.class.getName(), + SinkSpec.newBuilder().setClassName(TestSinkConnector.class.getName()).build() + ); + + Thread fnThread = new Thread(javaInstanceRunnable); + fnThread.start(); + + // Wait for the setup to complete + AtomicReference source = new AtomicReference<>(); + Awaitility.await() + .pollInterval(Duration.ofMillis(200)) + .atMost(Duration.ofSeconds(10)) + .ignoreExceptions().untilAsserted(() -> { + TestSourceConnector sourceConnector = (TestSourceConnector) getPrivateField(javaInstanceRunnable, + "source"); + Assert.assertNotNull(sourceConnector); + source.set(sourceConnector); + }); + + if (failComponentType == FailComponentType.FAIL_SOURCE) { + source.get().fatalConnector(); + } else { + source.get().pushRecord(failComponentType::toString); + } + + Awaitility.await() + .pollInterval(Duration.ofMillis(200)) + .atMost(Duration.ofSeconds(10)) + .ignoreExceptions().untilAsserted(() -> { + Throwable deathException = (Throwable) getPrivateField(javaInstanceRunnable, "deathException"); + Assert.assertNotNull(deathException); + Assert.assertEquals(deathException.getMessage(), failComponentType.toString()); + }); + } } From 63cb469ecef5767cf41d18d4ff8f220fba2608a2 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 7 Sep 2023 16:53:52 +0800 Subject: [PATCH 2/7] Refine the test --- .../instance/JavaInstanceRunnable.java | 27 ++++++++++--------- .../instance/JavaInstanceRunnableTest.java | 23 ++++++++++------ 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 71bad3dc71a73..d51538bfa12b8 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -132,7 +132,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private JavaInstance javaInstance; @Getter - private Throwable deathException; + private volatile Throwable deathException; // function stats private ComponentStatsManager stats; @@ -345,23 +345,24 @@ public void run() { // process the synchronous results handleResult(currentRecord, result); } + + if (deathException != null) { + throw deathException; + } } } catch (Throwable t) { - if (t instanceof InterruptedException && deathException != null) { + if (deathException != null) { log.info("Encountered fatal exception: ", deathException); - if (stats != null) { - stats.incrSysExceptions(deathException); - } - return; + } else { + log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId( + instanceConfig.getFunctionDetails().getTenant(), + instanceConfig.getFunctionDetails().getNamespace(), + instanceConfig.getFunctionDetails().getName(), + instanceConfig.getInstanceId()), t); + deathException = t; } - log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId( - instanceConfig.getFunctionDetails().getTenant(), - instanceConfig.getFunctionDetails().getNamespace(), - instanceConfig.getFunctionDetails().getName(), - instanceConfig.getInstanceId()), t); - deathException = t; if (stats != null) { - stats.incrSysExceptions(t); + stats.incrSysExceptions(deathException); } } finally { log.info("Closing instance"); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 30a35efca169f..2d97a128661d7 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -395,14 +395,15 @@ public void fatalConnector() { public static class TestFunction implements Function> { @Override public CompletableFuture process(String input, Context context) throws Exception { - return CompletableFuture.completedFuture(input).thenApply((value) -> { - if (FailComponentType.FAIL_FUNC.toString().equals(value)) { + CompletableFuture future = new CompletableFuture<>(); + new Thread(() -> { + if (FailComponentType.FAIL_FUNC.toString().equals(input)) { context.fatal(new Exception(FailComponentType.FAIL_FUNC.toString())); - return null; } else { - return value; + future.complete(input); } - }); + }).start(); + return future; } } @@ -472,19 +473,25 @@ public void testFatalTheInstance(FailComponentType failComponentType) throws Exc source.set(sourceConnector); }); + // Fail the connector or function if (failComponentType == FailComponentType.FAIL_SOURCE) { source.get().fatalConnector(); } else { source.get().pushRecord(failComponentType::toString); } + // Assert that the instance is terminated with the fatal exception Awaitility.await() .pollInterval(Duration.ofMillis(200)) .atMost(Duration.ofSeconds(10)) .ignoreExceptions().untilAsserted(() -> { - Throwable deathException = (Throwable) getPrivateField(javaInstanceRunnable, "deathException"); - Assert.assertNotNull(deathException); - Assert.assertEquals(deathException.getMessage(), failComponentType.toString()); + Assert.assertNotNull(javaInstanceRunnable.getDeathException()); + Assert.assertEquals(javaInstanceRunnable.getDeathException().getMessage(), + failComponentType.toString()); + + // Assert the java instance is closed + Assert.assertFalse(fnThread.isAlive()); + Assert.assertFalse((boolean) getPrivateField(javaInstanceRunnable, "isInitialized")); }); } } From 9c269f052c1ec2fd60b4e018db09dad495cd7b14 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 8 Sep 2023 09:27:34 +0800 Subject: [PATCH 3/7] Optimize imports and log message --- .../pulsar/functions/instance/JavaInstanceRunnable.java | 2 +- .../functions/instance/JavaInstanceRunnableTest.java | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index d51538bfa12b8..7a404cd9330f3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -352,7 +352,7 @@ public void run() { } } catch (Throwable t) { if (deathException != null) { - log.info("Encountered fatal exception: ", deathException); + log.info("Fatal exception occurred in the instance", deathException); } else { log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId( instanceConfig.getFunctionDetails().getTenant(), diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 2d97a128661d7..131d96572a5e3 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -24,7 +24,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - +import com.fasterxml.jackson.annotation.JsonIgnore; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; @@ -32,8 +32,6 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; - -import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; @@ -52,6 +50,7 @@ import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; import org.apache.pulsar.io.core.Sink; @@ -102,7 +101,7 @@ private JavaInstanceRunnable createRunnable(FunctionDetails functionDetails) thr return javaInstanceRunnable; } - private JavaInstanceRunnable createRunnable(org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec, + private JavaInstanceRunnable createRunnable(SourceSpec sourceSpec, String functionClassName, SinkSpec sinkSpec) throws PulsarClientException { ClientBuilder clientBuilder = mock(ClientBuilder.class); @@ -452,7 +451,7 @@ public Object[][] failType() { @Test(dataProvider = "failComponentType") public void testFatalTheInstance(FailComponentType failComponentType) throws Exception { JavaInstanceRunnable javaInstanceRunnable = createRunnable( - org.apache.pulsar.functions.proto.Function.SourceSpec.newBuilder() + SourceSpec.newBuilder() .setClassName(TestSourceConnector.class.getName()).build(), TestFunction.class.getName(), SinkSpec.newBuilder().setClassName(TestSinkConnector.class.getName()).build() From 98a28b4402bddea7113a6d3c747eb986cd0b85ae Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 8 Sep 2023 09:32:47 +0800 Subject: [PATCH 4/7] Optimize tests --- .../pulsar/functions/instance/JavaInstanceRunnableTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 131d96572a5e3..134e77a3b58a2 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -113,9 +113,9 @@ private JavaInstanceRunnable createRunnable(SourceSpec sourceSpec, .build(); InstanceConfig config = createInstanceConfig(functionDetails); config.setClusterName("test-cluster"); - return new JavaInstanceRunnable( - config, clientBuilder, PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(), null, null, - null, null, null, Thread.currentThread().getContextClassLoader(), null); + return new JavaInstanceRunnable(config, clientBuilder, + PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(), null, null, null, null, null, + Thread.currentThread().getContextClassLoader(), null); } private Method makeAccessible(JavaInstanceRunnable javaInstanceRunnable) throws Exception { From 18fca73922c0a1d9b09b2685ccb282fa5bcb57b1 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 8 Sep 2023 11:10:53 +0800 Subject: [PATCH 5/7] Fix tests --- .../pulsar/io/common/IOConfigUtilsTest.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index 52afac1a5ac0c..fd291a8a3c9b3 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -259,12 +259,12 @@ public ByteBuffer getState(String key) { public CompletableFuture getStateAsync(String key) { return null; } - + @Override public void deleteState(String key) { - + } - + @Override public CompletableFuture deleteStateAsync(String key) { return null; @@ -284,6 +284,11 @@ public ConsumerBuilder newConsumerBuilder(Schema schema) throws Pulsar public PulsarClient getPulsarClient() { return null; } + + @Override + public void fatal(Throwable t) { + + } } @Test @@ -449,12 +454,12 @@ public ByteBuffer getState(String key) { public CompletableFuture getStateAsync(String key) { return null; } - + @Override public void deleteState(String key) { - + } - + @Override public CompletableFuture deleteStateAsync(String key) { return null; @@ -464,6 +469,11 @@ public CompletableFuture deleteStateAsync(String key) { public PulsarClient getPulsarClient() { return null; } + + @Override + public void fatal(Throwable t) { + + } } @Test From 3dc7a37252f74a1d358f78fecc2c131a7861333d Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 11 Sep 2023 11:08:02 +0800 Subject: [PATCH 6/7] Fix tests --- .../pulsar/io/kafka/sink/KafkaAbstractSinkTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java index d59cdb1d9b63d..9537b6576b44e 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java @@ -164,12 +164,12 @@ public ByteBuffer getState(String key) { public CompletableFuture getStateAsync(String key) { return null; } - + @Override public void deleteState(String key) { - + } - + @Override public CompletableFuture deleteStateAsync(String key) { return null; @@ -179,6 +179,11 @@ public CompletableFuture deleteStateAsync(String key) { public PulsarClient getPulsarClient() { return null; } + + @Override + public void fatal(Throwable t) { + + } }; ThrowingRunnable openAndClose = ()->{ try { From 17aa607f0c6b6c99fe673b08ea3169527b5dbcd9 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 13 Sep 2023 19:43:16 +0800 Subject: [PATCH 7/7] Add comments for the corner case and fix the fatal exception log --- .../functions/instance/JavaInstanceRunnable.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 7a404cd9330f3..b3850cbb53dac 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -347,12 +347,23 @@ public void run() { } if (deathException != null) { + // Ideally the current java instance thread will be interrupted when the deathException is set. + // But if the CompletableFuture returned by the Pulsar Function is completed exceptionally(the + // function has invoked the fatal method) before being put into the JavaInstance + // .pendingAsyncRequests, the interrupted exception may be thrown when putting this future to + // JavaInstance.pendingAsyncRequests. The interrupted exception would be caught by the JavaInstance + // and be skipped. + // Therefore, we need to handle this case by checking the deathException here and rethrow it. throw deathException; } } } catch (Throwable t) { if (deathException != null) { - log.info("Fatal exception occurred in the instance", deathException); + log.error("[{}] Fatal exception occurred in the instance", FunctionCommon.getFullyQualifiedInstanceId( + instanceConfig.getFunctionDetails().getTenant(), + instanceConfig.getFunctionDetails().getNamespace(), + instanceConfig.getFunctionDetails().getName(), + instanceConfig.getInstanceId()), deathException); } else { log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId( instanceConfig.getFunctionDetails().getTenant(),