diff --git a/pip/pip-281.md b/pip/pip-281.md new file mode 100644 index 0000000000000..7bcf420c3d64a --- /dev/null +++ b/pip/pip-281.md @@ -0,0 +1,103 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted, +you can use the [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class and extend it to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws exception. +```java + + public Record read() throws Exception { + Record record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + return record; + } + + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} +``` + +Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) + + +### Compatibility + +This PIP is to provide a method for users rather than introducing a new interface. + +- So it is forward compatible +- However, connectors using this method are not backward compatible. +For example, If a Kafka source connector built upon pulsar-io v3.1 (including features introduced in this PIP) and uses the `notifyError` method, +when it switches back to pulsar-io v3.0 (excluding features introduced in this PIP), it will encounter errors during compilation. + +### In Scope + +After this PIP, the source connectors can extends the `PushSource`, and use `notifyError` method to throw exception. Such as: +- [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java) +- [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43) +- [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59) +- etc. + +### Out of Scope +None + +## Design & Implementation Details + +- Abstract BatchPushSource logic to AbstractPushSource. +- Let PushSource to extends AbstractPushSource to extend a new method(notifyError). + +Please refer this PR: https://github.com/apache/pulsar/pull/20791 + +## Note +None + + +## Concrete Example + +### BEFORE +- Not possible + +### AFTER + +```java +public class PushSourceTest { + + PushSource testBatchSource = new PushSource() { + @Override + public void open(Map config, SourceContext context) throws Exception { + + } + + @Override + public void close() throws Exception { + + } + }; + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception") + public void testNotifyErrors() throws Exception { + testBatchSource.notifyError(new RuntimeException("test exception")); + testBatchSource.readNext(); + } +} +``` + +## Links +None