From 54a7a042637f3efb5ecd9cb15db6566ca7be2973 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 28 Aug 2023 17:40:18 +0800 Subject: [PATCH 1/7] [improve][pip] PIP-297: Support raising exceptions using Function & Connector context --- pip/pip-297.md | 167 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 pip/pip-297.md diff --git a/pip/pip-297.md b/pip/pip-297.md new file mode 100644 index 0000000000000..dad329a963a29 --- /dev/null +++ b/pip/pip-297.md @@ -0,0 +1,167 @@ +# Title: Support raising exceptions using Function & Connector context + +# Background knowledge + +The **Pulsar Function** is a serverless computing framework that runs on top of Pulsar and processes messages. + +The **Pulsar IO Connector** is a framework that allows users to easily integrate Pulsar with external systems, such as +databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage +connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and +sink. A **source connector** imports data from another system to Pulsar, while a **sink connector** exports data from +Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in +the following, we treat the connector as a special kind of function. The `function` refers to both function and +connector. + +**Function Instance** is a running instance of a Pulsar IO Connector that interacts with a specific external system or a +Pulsar Function that processes messages from the topic. + +**Function Framework** is a framework for running the Function instance. + +**Function Context** is an interface that provides access to various information and resources for the connector or the +function. The function context is passed to the connector or the function when it is initialized, and then can be used +to interact with the Pulsar system. + +The current implementation of the exception handler: +**Function instance thread**: The function framework initializes a thread for each function instance to handle the +core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, +executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the exception, etc. + +**Exception handling logic**: The function itself can throw exceptions, and this thread will catch the exception and +then close the function. This means that the function will stop working until it is restarted manually or +automatically by the function framework. + +Even though it is not explicitly defined, there are two types of exceptions that could be handled for the function or +the framework. + +- **Recoverable exception**:This is an exception that the function can recover from by itself, such as network + connection issues, persistence issues, etc. The function can catch these exceptions and retry the operation until it + succeeds or reaches a limit. +- **Unrecoverable exception**: This is an exception that the function cannot recover from by itself and needs to notify + the framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an + incompatible system. The function framework will catch these exceptions, report them to users, and terminate the + function. + +# Motivation + +The current implementation of the connector and Pulsar Function exception handler cannot handle the unrecoverable +exceptions that are thrown outside of the function instance thread. + +For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If +any unrecoverable exceptions occur in those threads, the function instance thread will not be aware of them and will +not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue +here: https://github.com/apache/pulsar/issues/9464 + +The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from +an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has +been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding +the notifyError method to the PushSource class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this +does not solve the same problem that all source connectors face. + +The problem is same for the Pulsar Function. The user can also process messages asynchronously in a Pulsar Function. If +any fatal exceptions happen in the user's thread, the function framework can't handle the exception +correctly. + +We need a way for the connector and function developers to throw unrecoverable exceptions outside the function instance +thread. The function framework should catch these exceptions and terminate the function accordingly. + +# Goals + +## In Scope + +- This proposal will apply both to the Pulsar Function and the Pulsar Connector. +- Support throwing unrecoverable exceptions outside the function instance thread. +- The function framework should terminate the connector or function when there are any unrecoverable exceptions thrown + asynchronously. + +## Out of Scope + +- The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in + this PIP. This PIP only provides the feature for the connector developer to raise the exception outside the function + instance thread. The fixes should be in serval different PRs. + +# High Level Design + +Introduce a new method `raiseException` to the context. All the connector implementation code and the function code can +use this context and call the `raiseException` method to raise an unrecoverable exception to the function framework. + +After the connector or function raises the unrecoverable exception, the function instance thread will be interrupted. +The function framework then could catch the exception, log it, and then terminate the function instance. + +# Detailed Design + +## Design & Implementation Details + +This PIP proposes to add a new method`raiseException`to the context `BaseContext`. This method allows the +connector or the function code to report an unrecoverable exception to the function framework. The `SinkContext` +and `SourceContext` +are all inherited from `BaseContext`. Therefore, all the sink connectors and source connectors can invoke this new +method. The pulsar function context class `Context` is also inherited from `BaseContext`. Therefore, the function code +can also invoke this new method. + +In the `raiseException`, the function instance thread will be interrupted. The function instance thread can then catch +the interrupt exception and get the unrecoverable exception. The function framework then logs this exception, +reports to the metrics, and finally terminates the function instance. + +## Public-facing Changes + +### Public API + +Introduce `raiseException` method to the `BaseContext`: + +```java +public interface BaseContext { + /** + * Raise an unrecoverable exception to the function framework. + * + * @param e the exception to be raised + */ + void raiseException(Exception e); +} +``` + +### Binary protocol + +No changes for this part. + +### Configuration + +No changes for this part. + +### CLI + +No changes for this part. + +### Metrics + +No changes for this part. + +# Monitoring + +No changes for this part. + +# Security Considerations + +No security-related changes. +The new method `raiseException` will only take effect on the current function instance. It won't affect other function +instances even they are in the same function worker. + +# Backward & Forward Compatibility + +## Revert + +No operation required. + +## Upgrade + +No operation required. + +# Alternatives + +None. + +# General Notes + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: From 0a7adf7442b8c7434afe477674e90ef4e25d3a11 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 28 Aug 2023 18:06:58 +0800 Subject: [PATCH 2/7] Update pip --- pip/pip-297.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pip/pip-297.md b/pip/pip-297.md index dad329a963a29..2765a3cfa0e1c 100644 --- a/pip/pip-297.md +++ b/pip/pip-297.md @@ -1,4 +1,4 @@ -# Title: Support raising exceptions using Function & Connector context +# Support raising exceptions using Function & Connector context # Background knowledge @@ -24,7 +24,8 @@ to interact with the Pulsar system. The current implementation of the exception handler: **Function instance thread**: The function framework initializes a thread for each function instance to handle the core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, -executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the exception, etc. +executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the +exception, etc. **Exception handling logic**: The function itself can throw exceptions, and this thread will catch the exception and then close the function. This means that the function will stop working until it is restarted manually or @@ -33,7 +34,7 @@ automatically by the function framework. Even though it is not explicitly defined, there are two types of exceptions that could be handled for the function or the framework. -- **Recoverable exception**:This is an exception that the function can recover from by itself, such as network +- **Recoverable exception**: This is an exception that the function can recover from by itself, such as network connection issues, persistence issues, etc. The function can catch these exceptions and retry the operation until it succeeds or reaches a limit. - **Unrecoverable exception**: This is an exception that the function cannot recover from by itself and needs to notify From f07495d77a655599182fb3b15eafec4b68983f1b Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 28 Aug 2023 18:18:22 +0800 Subject: [PATCH 3/7] Update title --- pip/pip-297.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-297.md b/pip/pip-297.md index 2765a3cfa0e1c..3d8f5c639454d 100644 --- a/pip/pip-297.md +++ b/pip/pip-297.md @@ -1,4 +1,4 @@ -# Support raising exceptions using Function & Connector context +# Title: Support raising exceptions using Function & Connector context # Background knowledge From a654b0e2c922d6a5a0b984835ba1055c5d8d3d88 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 29 Aug 2023 18:04:47 +0800 Subject: [PATCH 4/7] Refine PIP based on the review comments --- pip/pip-297.md | 106 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 33 deletions(-) diff --git a/pip/pip-297.md b/pip/pip-297.md index 3d8f5c639454d..5fedeeb0708de 100644 --- a/pip/pip-297.md +++ b/pip/pip-297.md @@ -21,48 +21,72 @@ Pulsar Function that processes messages from the topic. function. The function context is passed to the connector or the function when it is initialized, and then can be used to interact with the Pulsar system. -The current implementation of the exception handler: +## The current implementation of the exception handler + **Function instance thread**: The function framework initializes a thread for each function instance to handle the core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the -exception, etc. +exception, etc. And let's define the **Connector thread/Function thread** as a thread that is created by the connector +or function itself. **Exception handling logic**: The function itself can throw exceptions, and this thread will catch the exception and then close the function. This means that the function will stop working until it is restarted manually or automatically by the function framework. -Even though it is not explicitly defined, there are two types of exceptions that could be handled for the function or -the framework. +Even though it is not explicitly defined, there are two types of exceptions that should be handled by the function or +the framework: -- **Recoverable exception**: This is an exception that the function can recover from by itself, such as network - connection issues, persistence issues, etc. The function can catch these exceptions and retry the operation until it - succeeds or reaches a limit. -- **Unrecoverable exception**: This is an exception that the function cannot recover from by itself and needs to notify - the framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an +- **Fatal exception**: This is an exception that the function cannot recover from by itself and needs to notify the + framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an incompatible system. The function framework will catch these exceptions, report them to users, and terminate the function. +- **Non-fatal exception** is an exception that the function instance don't need to be terminated for. It could be + handled by the connector or function itself. Or be thrown by the function. This exception won't cause the function + instance to be terminated. + +### How to handle exceptions thrown from connectors + +All the exceptions thrown form the connector are treated as fatal exceptions. + +If the exception is thrown from the function instance thread, the function framework will catch the exception and +terminate the function instance. + +If the exception is thrown from the connector thread that is created by the connector itself, the function framework +will not be able to catch the exception and terminate the function instance. The connector will hang forever. +The `Motivation` part will talk more about this case. + +If the exception is thrown from the external system, the connector implementation could treat it as a retryable +exception and retry to process the message later, or throw it to indicate it as a fatal exception. + +### How to handle exceptions thrown from functions + +All the exceptions thrown from the pulsar function are treated as non-fatal exceptions. The function framework will +catch the exception and log it. But it will not terminate the function instance. + +There is no way for the function developer to throw a fatal exception to the function framework to terminate the +function instance. # Motivation -The current implementation of the connector and Pulsar Function exception handler cannot handle the unrecoverable -exceptions that are thrown outside of the function instance thread. +The current implementation of the connector and Pulsar Function exception handler cannot handle the fatal exceptions +that are thrown outside the function instance thread. For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If -any unrecoverable exceptions occur in those threads, the function instance thread will not be aware of them and will +any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue here: https://github.com/apache/pulsar/issues/9464 The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding -the notifyError method to the PushSource class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this -does not solve the same problem that all source connectors face. +the notifyError method to the `PushSource` class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this +does not solve the same problem that all source connectors face because not all connectors are implemented based on +the `PushSource` class. -The problem is same for the Pulsar Function. The user can also process messages asynchronously in a Pulsar Function. If -any fatal exceptions happen in the user's thread, the function framework can't handle the exception -correctly. +The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function +framework. We need to provide a way for the function developer to implement it. -We need a way for the connector and function developers to throw unrecoverable exceptions outside the function instance +We need a way for the connector and function developers to throw fatal exceptions outside the function instance thread. The function framework should catch these exceptions and terminate the function accordingly. # Goals @@ -70,53 +94,68 @@ thread. The function framework should catch these exceptions and terminate the f ## In Scope - This proposal will apply both to the Pulsar Function and the Pulsar Connector. -- Support throwing unrecoverable exceptions outside the function instance thread. -- The function framework should terminate the connector or function when there are any unrecoverable exceptions thrown +- Support throwing fatal exceptions outside the function instance thread. +- The function framework should terminate the connector or function when there are any fatal exceptions thrown asynchronously. ## Out of Scope - The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in this PIP. This PIP only provides the feature for the connector developer to raise the exception outside the function - instance thread. The fixes should be in serval different PRs. + instance thread. The fixes should be in several different PRs. # High Level Design -Introduce a new method `raiseException` to the context. All the connector implementation code and the function code can -use this context and call the `raiseException` method to raise an unrecoverable exception to the function framework. +Introduce a new method `raiseFatalException` to the context. All the connector implementation code and the function code +can +use this context and call the `raiseFatalException` method to raise an fatal exception to the function framework. -After the connector or function raises the unrecoverable exception, the function instance thread will be interrupted. +After the connector or function raises the fatal exception, the function instance thread will be interrupted. The function framework then could catch the exception, log it, and then terminate the function instance. # Detailed Design ## Design & Implementation Details -This PIP proposes to add a new method`raiseException`to the context `BaseContext`. This method allows the -connector or the function code to report an unrecoverable exception to the function framework. The `SinkContext` +This PIP proposes to add a new method`raiseFatalException`to the context `BaseContext`. This method allows the +connector or the function code to report a fatal exception to the function framework. The `SinkContext` and `SourceContext` are all inherited from `BaseContext`. Therefore, all the sink connectors and source connectors can invoke this new method. The pulsar function context class `Context` is also inherited from `BaseContext`. Therefore, the function code can also invoke this new method. -In the `raiseException`, the function instance thread will be interrupted. The function instance thread can then catch -the interrupt exception and get the unrecoverable exception. The function framework then logs this exception, +In the `raiseFatalException`, the function instance thread will be interrupted. The function instance thread can then +catch the interrupt exception and get the fatal exception. The function framework then logs this exception, reports to the metrics, and finally terminates the function instance. +Tbe behavior when invoking the `raiseFatalException` method: + +- For the connector thread or function thread: + - Invoke the `raiseFatalException` method + - Send the exception to the function framework. There is a field `deathException` in the + class `JavaInstanceRunnable` + that is used to store the fatal exception. + - Interrupt the function instance thread +- For the function instance thread: + - Catch the interrupt exception + - Get the exception from the function framework + - Report the log and metrics + - Close the function instance + ## Public-facing Changes ### Public API -Introduce `raiseException` method to the `BaseContext`: +Introduce `raiseFatalException` method to the `BaseContext`: ```java public interface BaseContext { /** - * Raise an unrecoverable exception to the function framework. + * Raise a fatal exception to the function framework. The function instance will then be terminated. * - * @param e the exception to be raised + * @param t the fatal exception to be raised */ - void raiseException(Exception e); + void raiseFatalException(Throwable t); } ``` @@ -143,7 +182,8 @@ No changes for this part. # Security Considerations No security-related changes. -The new method `raiseException` will only take effect on the current function instance. It won't affect other function +The new method `raiseFatalException` will only take effect on the current function instance. It won't affect other +function instances even they are in the same function worker. # Backward & Forward Compatibility From c4f7fb100d193519461cc78099ea80a5037603c6 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 4 Sep 2023 15:04:54 +0800 Subject: [PATCH 5/7] Use `fatal` instead of `raiseFatalExceptions` and refine the pip --- pip/pip-297.md | 53 +++++++++++++++++++++++--------------------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/pip/pip-297.md b/pip/pip-297.md index 5fedeeb0708de..d9bbdc4d7b5bc 100644 --- a/pip/pip-297.md +++ b/pip/pip-297.md @@ -1,4 +1,4 @@ -# Title: Support raising exceptions using Function & Connector context +# Title: Support terminating Function & Connector with the fatal exception # Background knowledge @@ -68,8 +68,9 @@ function instance. # Motivation -The current implementation of the connector and Pulsar Function exception handler cannot handle the fatal exceptions -that are thrown outside the function instance thread. +Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown +outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler +cannot handle the fatal exceptions that are thrown outside the function instance thread. For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will @@ -93,22 +94,19 @@ thread. The function framework should catch these exceptions and terminate the f ## In Scope +- Support terminating the function instance with fatal exceptions - This proposal will apply both to the Pulsar Function and the Pulsar Connector. -- Support throwing fatal exceptions outside the function instance thread. -- The function framework should terminate the connector or function when there are any fatal exceptions thrown - asynchronously. ## Out of Scope - The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in - this PIP. This PIP only provides the feature for the connector developer to raise the exception outside the function - instance thread. The fixes should be in several different PRs. + this PIP. This PIP only provides the feature for the connector developer to terminate the function instance. The fixes + should be in several different PRs. # High Level Design -Introduce a new method `raiseFatalException` to the context. All the connector implementation code and the function code -can -use this context and call the `raiseFatalException` method to raise an fatal exception to the function framework. +Introduce a new method `fatal` to the context. All the connector implementation code and the function code +can use this context and call the `fatal` method to terminate the instance while raising a fatal exception. After the connector or function raises the fatal exception, the function instance thread will be interrupted. The function framework then could catch the exception, log it, and then terminate the function instance. @@ -117,24 +115,22 @@ The function framework then could catch the exception, log it, and then terminat ## Design & Implementation Details -This PIP proposes to add a new method`raiseFatalException`to the context `BaseContext`. This method allows the -connector or the function code to report a fatal exception to the function framework. The `SinkContext` -and `SourceContext` -are all inherited from `BaseContext`. Therefore, all the sink connectors and source connectors can invoke this new -method. The pulsar function context class `Context` is also inherited from `BaseContext`. Therefore, the function code -can also invoke this new method. +This PIP proposes to add a new method`fatal`to the context `BaseContext`. This method allows the connector or the +function code to report a fatal exception to the function framework and terminate the instance. The `SinkContext` +and `SourceContext` are all inherited from `BaseContext`. Therefore, all the sink connectors and source connectors can +invoke this new method. The pulsar function context class `Context` is also inherited from `BaseContext`. Therefore, the +function code can also invoke this new method. -In the `raiseFatalException`, the function instance thread will be interrupted. The function instance thread can then +In the `fatal` method, the function instance thread will be interrupted. The function instance thread can then catch the interrupt exception and get the fatal exception. The function framework then logs this exception, reports to the metrics, and finally terminates the function instance. -Tbe behavior when invoking the `raiseFatalException` method: +Tbe behavior when invoking the `fatal` method: - For the connector thread or function thread: - - Invoke the `raiseFatalException` method + - Invoke the `fatal` method - Send the exception to the function framework. There is a field `deathException` in the - class `JavaInstanceRunnable` - that is used to store the fatal exception. + class `JavaInstanceRunnable` that is used to store the fatal exception. - Interrupt the function instance thread - For the function instance thread: - Catch the interrupt exception @@ -146,16 +142,16 @@ Tbe behavior when invoking the `raiseFatalException` method: ### Public API -Introduce `raiseFatalException` method to the `BaseContext`: +Introduce `fatal` method to the `BaseContext`: ```java public interface BaseContext { /** - * Raise a fatal exception to the function framework. The function instance will then be terminated. + * Terminate the function instance with a fatal exception. * * @param t the fatal exception to be raised */ - void raiseFatalException(Throwable t); + void fatal(Throwable t); } ``` @@ -182,9 +178,8 @@ No changes for this part. # Security Considerations No security-related changes. -The new method `raiseFatalException` will only take effect on the current function instance. It won't affect other -function -instances even they are in the same function worker. +The new method `fatal` will only take effect on the current function instance. It won't affect other function instances +even they are in the same function worker. # Backward & Forward Compatibility @@ -204,5 +199,5 @@ None. # Links -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/j59gzzwjp8c48lwv5poddm9qzlp2hol0 * Mailing List voting thread: From 0cc55be2af3a687ad60d74d1674f2370a8a4a8ec Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 4 Sep 2023 15:19:01 +0800 Subject: [PATCH 6/7] Add alternative part --- pip/pip-297.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pip/pip-297.md b/pip/pip-297.md index d9bbdc4d7b5bc..468a0c0715547 100644 --- a/pip/pip-297.md +++ b/pip/pip-297.md @@ -193,7 +193,17 @@ No operation required. # Alternatives -None. +## Using futures to handle results or exceptions returned the connector + +The benefit of this solution is that it makes the use of exception throwing more intuitive to the connector developer. + +But it requires changes to existing interfaces, including `Source` and `Sink`, which can complicate connector +development. And we still need the `fatal` method to handle some cases such as terminating the instance in code outside +of the message processing logic. This alternative solution can't handle this case. + +Meanwhile, the implementation of this solution will also be more complex, involving changes to the core message +processing logic of the function framework. We need to turn the entire message processing logic into an asynchronous +pattern. # General Notes From 8f69d2544b044670f2453da99791a4eef706ac9d Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 4 Sep 2023 18:34:51 +0800 Subject: [PATCH 7/7] Add voting thread --- pip/pip-297.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-297.md b/pip/pip-297.md index 468a0c0715547..2985864beed43 100644 --- a/pip/pip-297.md +++ b/pip/pip-297.md @@ -210,4 +210,4 @@ pattern. # Links * Mailing List discussion thread: https://lists.apache.org/thread/j59gzzwjp8c48lwv5poddm9qzlp2hol0 -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/ggok3c2601mnbdomr65v3pjth3lk6fr8