Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions pip/pip-281.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Title: [io] Add notifyError method on PushSource

## Motivation

In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted,
you can use the [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class and extend it to quickly implement the push message model.
It overrides the `read` method and provides the `consume` method for the user to call.

However, if the source connector extends from the class,
it cannot notify the function framework if it encounters an exception while consuming data internally,
in other words, the function call `source.read()` never triggers an exception and never exits the process.


## Goals

Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws exception.
```java

public Record<T> read() throws Exception {
Record<T> record = queue.take();
if (record instanceof ErrorNotifierRecord) {
throw ((ErrorNotifierRecord) record).getException();
}
return record;
}


/**
* Allows the source to notify errors asynchronously.
* @param ex
*/
public void notifyError(Exception ex) {
consume(new ErrorNotifierRecord(ex));
}
}
```

Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java)


### Compatibility

This PIP is to provide a method for users rather than introducing a new interface.

- So it is forward compatible
- However, connectors using this method are not backward compatible.
For example, If a Kafka source connector built upon pulsar-io v3.1 (including features introduced in this PIP) and uses the `notifyError` method,
when it switches back to pulsar-io v3.0 (excluding features introduced in this PIP), it will encounter errors during compilation.

### In Scope

After this PIP, the source connectors can extends the `PushSource`, and use `notifyError` method to throw exception. Such as:
- [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java)
- [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43)
- [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59)
- etc.

### Out of Scope
None

## Design & Implementation Details

- Abstract BatchPushSource logic to AbstractPushSource.
- Let PushSource to extends AbstractPushSource to extend a new method(notifyError).

Please refer this PR: https://github.com/apache/pulsar/pull/20791

## Note
None


## Concrete Example

### BEFORE
- Not possible

### AFTER

```java
public class PushSourceTest {

PushSource testBatchSource = new PushSource() {
@Override
public void open(Map config, SourceContext context) throws Exception {

}

@Override
public void close() throws Exception {

}
};

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
public void testNotifyErrors() throws Exception {
testBatchSource.notifyError(new RuntimeException("test exception"));
testBatchSource.readNext();
}
}
```

## Links
None