Skip to content

KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects #13185

Closed
gharris1727 wants to merge 20 commits intoapache:trunkfrom
gharris1727:kafka-14670-wrap-connectors
Closed

KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects #13185
gharris1727 wants to merge 20 commits intoapache:trunkfrom
gharris1727:kafka-14670-wrap-connectors

Conversation

@gharris1727
Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 commented Feb 1, 2023

Jira
This is the first part of the above ticket, applied only to SinkConnector and SourceConnector plugins.
Additional PRs will cover the other plugins, as the refactor was too large to reasonably review at once.

Design decisions:

  1. The IsolatedPlugin<P> class will be a common superclass for all plugin wrappers.
  2. The IsolatedPlugin superclass provides utility methods for subclasses to manage swapping the ThreadContextClassLoader for each call in a way that has minimal boilerplate.
  3. The Isolated* classes are intended to only be constructed within the plugin isolation infrastructure, and will all have package-local constructors.
  4. Testing runtime code that uses wrapped plugins will require mocking the wrappers, or instantiating a real Plugins class.
  5. Subclasses should define public methods which match the plugin class they are wrapping without being an explicit subclass. These methods should be marked with throws Exception to remind callers that they may throw arbitrary exceptions.

Open questions/issues:

  1. The hashCode, equals, and toString methods do not have throws Exception as the Object class does not have these throws clauses. That means that calling code cannot be forced to handle exceptions from these methods. For toString, the exception message is provided in place of the toString result, and the hashCode and equals are wholly decoupled from the underlying hashCode and equals implementations.
  2. The wrapper method signatures throw Exception and not Throwable. The distinction being that Exceptions are considered by the Java Language to be reasonable to catch in an application, and Throwables were not. I wasn't sure whether the Connect runtime should be forced to handle errors like OutOfMemoryError, LinkageError, etc, or just let them propagate and kill the calling thread.
  3. These wrappers do not enforce that the methods are not called on the herder thread, because I didn't come up with an elegant way to do so.
  4. I did a first-pass at propagating and handling the exceptions thrown by the connector classes, but I don't know if they are reasonable. Now that the exceptions are checked, the code enforces that exceptions are handled, but it is still up to us to determine the proper way to handle the exceptions.
  5. This PR does not remove existing loaderSwap calls that are currently ensuring isolation. Those can be moved/removed after all of this refactor lands, as it may still be necessary for the other plugins.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gharris1727. This is a great improvement that makes several common bugs much harder to write and I'm excited to see it land so that we can stop worrying about what's running on the herder thread, doing our due diligence around the context classloader, etc.

I've only taken a look at the functional changes and haven't reviewed changes to tests yet. I hope to do a full pass sometime this week.

Comment on lines +251 to +255
try {
updateConnectorTasks(connName);
} catch (Exception e) {
log.error("Unable to generate task configs for {}", connName, e);
}
Copy link
Copy Markdown
Contributor

@C0urante C0urante Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behavior too, right? We no longer throw in ConnectorContext::requestTaskReconfiguration if we encounter any errors.

This also seems reasonable (it aligns the behavior across standalone and distributed modes), but it does have consequences for the REST API, where restarting a connector no longer fails if we're unable to generate task configs for it (which is currently the case for both distributed and standalone modes).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a change in behavior.

There is precedent for throwing ConnectException from ConnectorContext::requestTaskReconfiguration, so perhaps wrapping this in a ConnectException and propagating it would be a better behavior. I can move this to HerderConnectorContext, except it would only be effective for the standalone herder.

We can also see this as an opportunity to improve the StandaloneHerder by handling reconfigurations asynchronously and retry them in the background, rather than 500'ing the REST API or dropping the failure silently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can also consider this a failure of the signature of Herder::requestTaskReconfiguration. The DistributedHerder makes this asynchronous, but provides no future or callback to confirm the progress of the request.
Arguably StandaloneHerder is implementing the function signature correctly as a request that either succeeds or fails.

It also makes me think that a connector which repeatedly calls requestTaskReconfiguration (and then always fails in generateTaskConfigs) could spam the herder with retried restart requests. This is such a messy situation that the old function signatures hid from us :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, a lot to unpack here!

The more I think about it, the more I like the existing behavior for handling failures in task config generation. We automatically retry in distributed mode in order to absorb the risk of writing to the config topic or issuing a REST request to the leader, but since neither of those take place in standalone mode, it's fine to just throw the exception back to the caller (either a connector invoking ConnectorContext::requestTaskReconfiguration, or a REST API call to restart the connector) since the likeliest cause is a failed call to Connector::taskConfigs and automatic retries are less likely to be useful.

I think we should basically just preserve existing behavior here, with the one exception of fixing how we handle failed calls to requestTaskReconfiguration that occur during a call to restartConnector. Right now we don't handle any of those and, IIUC, just cause the REST request to time out after 90 seconds. Instead of timing out, we should return a 500 response in that case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's especially likely for connectors to continually invoke requestTaskReconfiguration given the automatic retry logic in distributed mode, and as of #13276, the impact of ongoing retries for that operation is drastically reduced.

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Sep 7, 2023

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Sep 7, 2023
@gharris1727 gharris1727 removed the stale Stale PRs label Jan 12, 2024
@github-actions
Copy link
Copy Markdown

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Dec 27, 2024
@github-actions
Copy link
Copy Markdown

This PR has been closed since it has not had any activity in 120 days. If you feel like this
was a mistake, or you would like to continue working on it, please feel free to re-open the
PR and ask for a review.

@github-actions github-actions Bot added the closed-stale PRs that were closed due to inactivity label Jan 27, 2025
@github-actions github-actions Bot closed this Jan 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

closed-stale PRs that were closed due to inactivity connect stale Stale PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants