From a93456487b30270e82221312d23f6007d31ec9f0 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 14 Jul 2023 17:58:51 +0800 Subject: [PATCH 1/6] Create pip-281: Add notifyError method on PushSource --- pip/pip-281.md | 100 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 pip/pip-281.md diff --git a/pip/pip-281.md b/pip/pip-281.md new file mode 100644 index 0000000000000..867087a0ceaa3 --- /dev/null +++ b/pip/pip-281.md @@ -0,0 +1,100 @@ +# Title: [io] Add notifyError method on PushSource + +## Motivation + +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted + +On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +It overrides the `read` method and provides the `consume` method for the user to call. + +However, if the source connector that extends from the class, +it cannot notify the function framework if it encounters an exception while consuming data internally, +in other words, the function call `source.read()` never triggers an exception and never exits the process. + + +## Goals + +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it. +```java + + public Record read() throws Exception { + Record record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + return record; + } + + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} +``` + +Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) + + +BTW: This is a very simple change and is forward compatible. Sorry, I didn't notice that this change requires PIP before, so the related PRs have been merged. +- https://github.com/apache/pulsar/pull/20791 + +If this PIP vote does not pass, I revert this PR after that. + +### In Scope + +Use this method, Like all current source connectors that extends the PushSource, process exit can be implemented. Such as: +- [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java) +- [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43) +- [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59) +- etc. + +### Out of Scope +None + +## Design & Implementation Details + +- Abstract BatchPushSource logic to AbstractPushSource. +- Let PushSource to extends AbstractPushSource to extend a new method(notifyError). + +Please refer this PR: https://github.com/apache/pulsar/pull/20791 + +## Note +None + + +## Concrete Example + +### BEFORE +- Not possible + +### AFTER + +```java +public class PushSourceTest { + + PushSource testBatchSource = new PushSource() { + @Override + public void open(Map config, SourceContext context) throws Exception { + + } + + @Override + public void close() throws Exception { + + } + }; + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception") + public void testNotifyErrors() throws Exception { + testBatchSource.notifyError(new RuntimeException("test exception")); + testBatchSource.readNext(); + } +} +``` + +## Links +None From 680143eb969a3543748c256f336fcf3cf729ff22 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sun, 16 Jul 2023 20:16:34 +0800 Subject: [PATCH 2/6] Add compatibility section. --- pip/pip-281.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pip/pip-281.md b/pip/pip-281.md index 867087a0ceaa3..19f2eb5787523 100644 --- a/pip/pip-281.md +++ b/pip/pip-281.md @@ -44,6 +44,15 @@ BTW: This is a very simple change and is forward compatible. Sorry, I didn't not If this PIP vote does not pass, I revert this PR after that. + +### Compatibility + +This PIP is to provide a method for users to use, not an new interface. + +- So it is forward compatible +- However, connector using this method are not backward compatible. +For example, the Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, and if it switches back to version 3.0(exclude this pip) Pulsar compilation, it will encounter compilation errors. + ### In Scope Use this method, Like all current source connectors that extends the PushSource, process exit can be implemented. Such as: From c6f18345c794cf08d5f8abb4f261bb4572c31526 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 17 Jul 2023 17:35:41 +0800 Subject: [PATCH 3/6] Remove redundant content. --- pip/pip-281.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pip/pip-281.md b/pip/pip-281.md index 19f2eb5787523..3833e013400a6 100644 --- a/pip/pip-281.md +++ b/pip/pip-281.md @@ -39,12 +39,6 @@ Add `notifyError` method on PushSource, This method can receive an exception and Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) -BTW: This is a very simple change and is forward compatible. Sorry, I didn't notice that this change requires PIP before, so the related PRs have been merged. -- https://github.com/apache/pulsar/pull/20791 - -If this PIP vote does not pass, I revert this PR after that. - - ### Compatibility This PIP is to provide a method for users to use, not an new interface. From 546039d116f67dceb3a578b99fd66c12b07d5d0f Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 2 Aug 2023 16:14:34 +0800 Subject: [PATCH 4/6] Fix code reviews. --- pip/pip-281.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pip/pip-281.md b/pip/pip-281.md index 3833e013400a6..dc6100192fba5 100644 --- a/pip/pip-281.md +++ b/pip/pip-281.md @@ -2,19 +2,18 @@ ## Motivation -In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted - -On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model. +In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted, +you can use the [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class and extend it to quickly implement the push message model. It overrides the `read` method and provides the `consume` method for the user to call. -However, if the source connector that extends from the class, +However, if the source connector extends from the class, it cannot notify the function framework if it encounters an exception while consuming data internally, in other words, the function call `source.read()` never triggers an exception and never exits the process. ## Goals -Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it. +Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws exception. ```java public Record read() throws Exception { @@ -41,15 +40,16 @@ Just like the implementation of the current [BatchPushSource](https://github.com ### Compatibility -This PIP is to provide a method for users to use, not an new interface. +This PIP is to provide a method for users rather than introducing a new interface. - So it is forward compatible - However, connector using this method are not backward compatible. -For example, the Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, and if it switches back to version 3.0(exclude this pip) Pulsar compilation, it will encounter compilation errors. +For example, the currently Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, +if it switches back to version 3.0(exclude this pip) Pulsar to compile, it will encounter compile errors. ### In Scope -Use this method, Like all current source connectors that extends the PushSource, process exit can be implemented. Such as: +Use this method, current source connectors can extends the `PushSource`, and use `notifyError` method to throw exception. Such as: - [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java) - [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43) - [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59) From d1c59620bee61f4506c89ef47001c84999e27a30 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sat, 5 Aug 2023 09:32:45 +0800 Subject: [PATCH 5/6] Clear sentence. --- pip/pip-281.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pip/pip-281.md b/pip/pip-281.md index dc6100192fba5..9b2b6cc73f1d3 100644 --- a/pip/pip-281.md +++ b/pip/pip-281.md @@ -43,13 +43,13 @@ Just like the implementation of the current [BatchPushSource](https://github.com This PIP is to provide a method for users rather than introducing a new interface. - So it is forward compatible -- However, connector using this method are not backward compatible. -For example, the currently Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, -if it switches back to version 3.0(exclude this pip) Pulsar to compile, it will encounter compile errors. +- However, connector using this method are not backward compatible. +For example, If `Kafka source` connector depends on version v3.0(include this pip feature) of pulsar-io and uses the `notifyError` method, +when it switches back to depends on 3.0(not include this pip feature) version of pulsar-io to compile will encounter compile errors. ### In Scope -Use this method, current source connectors can extends the `PushSource`, and use `notifyError` method to throw exception. Such as: +After this PIP, the source connectors can extends the `PushSource`, and use `notifyError` method to throw exception. Such as: - [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java) - [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43) - [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59) From 6406894ead0f246b9a94680314e8406fee1f7cb5 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 8 Aug 2023 23:21:51 +0800 Subject: [PATCH 6/6] Update pip/pip-281.md Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com> --- pip/pip-281.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pip/pip-281.md b/pip/pip-281.md index 9b2b6cc73f1d3..7bcf420c3d64a 100644 --- a/pip/pip-281.md +++ b/pip/pip-281.md @@ -43,9 +43,9 @@ Just like the implementation of the current [BatchPushSource](https://github.com This PIP is to provide a method for users rather than introducing a new interface. - So it is forward compatible -- However, connector using this method are not backward compatible. -For example, If `Kafka source` connector depends on version v3.0(include this pip feature) of pulsar-io and uses the `notifyError` method, -when it switches back to depends on 3.0(not include this pip feature) version of pulsar-io to compile will encounter compile errors. +- However, connectors using this method are not backward compatible. +For example, If a Kafka source connector built upon pulsar-io v3.1 (including features introduced in this PIP) and uses the `notifyError` method, +when it switches back to pulsar-io v3.0 (excluding features introduced in this PIP), it will encounter errors during compilation. ### In Scope