KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters#14309
Conversation
7eea31d to
452d38b
Compare
gharris1727
left a comment
There was a problem hiding this comment.
When looking through the validateConverterConfig method, I realized that I've seen the same implementation beats before: in the EnrichablePlugin class. It uses a different style of abstraction (subclasses + abstract methods instead of lambdas) but I think it solves a similar problem.
This makes the key/value/header converters follow more of the producer/consumer/admin config validation style, rather than the transformation/predicate style. There is an extra layer of indirection (the use of aliases and prefixes) but there is also some overlap.
I wonder if we can unify these approaches, and maybe even use the "enrich" patten for producer/consumer/admin instead of the "merge" style.
There was a problem hiding this comment.
🤨
is there some reason why this can't be put in a finally block? The unchecked closable should be for exception-throwing operations that should have their exceptions suppressed, but maybeCloseQuietly should never throw an exception.
There was a problem hiding this comment.
It felt slightly more readable to use a try-with-resources block than a finally block (especially since we don't have any catch blocks). You're correct that Utils::maybeCloseQuietly doesn't throw any checked exceptions, but I had to provide a left-hand type (that extended the AutoCloseable interface) to prove that to the compiler, which ruled out AutoCloseable and Closeable since both of those interface's close methods throw checked exceptions.
Also, regarding this comment:
The unchecked closable should be for exception-throwing operations that should have their exceptions suppressed, but maybeCloseQuietly should never throw an exception.
Is this correct? The interface was introduced in #8618, where it was used on the left-hand side of a try-with-resources assignment that captured a method reference which did not throw a checked exception.
There was a problem hiding this comment.
It felt slightly more readable to use a try-with-resources block than a finally block (especially since we don't have any catch blocks).
I suppose there's some subjectivity involved here, since I found the UncheckedClosable and explicit lambda to be a bit hard to read initially, but understood after some inspection. Using try-finally without any catch clauses is a pretty normal arrangement, and I think more developers would be used to it as compared to using a lambda along with our special UncheckedClosable.
AFAIU the try-with-resources construct was added to help with cleaning up AutoClosable resources which can throw exceptions both during opening and closing, where it becomes tedious to set up the finally to perform the cleanup correctly. In this specific situation, the newInstance (open) errors are handled by a separate try, and the close errors are handled by closeQuietly, so none of the value-add of the try-with-resources is apparent.
I See what you mean though, as we do have exceptions from open and close, and we have somewhat tedious error handling surrounding them. But since the objects we're instantiating are "sometimes AutoClosable", the try-with-resources type checking is going to get in the way.
Using try-with-resources to handle open and close together, you could have a wrapper class MaybeClosable<T> implements UncheckedClosable, Supplier<T> along with a method static <T> MaybeClosable<T> quiet(T, String) that you would call like this:
try (MaybeClosable<Converter> wrapper = MaybeClosable.quiet(Utils.newInstance(...), "converter (for validation)") {
Converter converter = wrapper.get();
// do stuff
} catch (RuntimeException e) {
// exceptions from newInstance and do stuff
}
// exceptions from close are logged instead of propagated/suppressed
I think that would type-check but I haven't tried it out myself. Everything is just a rough suggestion, so please iterate on the names or ergonomics if you like the idea.
Without the closeQuietly semantics, it would look like this:
try (MaybeClosable<Converter> wrapper = MaybeClosable.propagate(Utils.newInstance(...)) {
Converter converter = wrapper.get();
// do stuff
} catch (RuntimeException e) {
// exceptions from newInstance and do stuff
}
// exceptions from close are not checked, but propagated or suppressed.
Is this correct? The interface was introduced in #8618, where it was used on the left-hand side of a try-with-resources assignment that captured a method reference which did not throw a checked exception.
When I said "exception throwing operation" I didn't mean "method that throws a checked exception", because I was thinking about how methods can throw RuntimeExceptions whether or not they have checked exceptions in the signature. I probably should have said "method that throws unchecked exceptions" to be unambiguous. Yes this PR and the linked PR both did not have checked exceptions, but they differ because one throws RuntimeExceptions and one does not.
In that PR I used try-with-resources because I wanted the propagate-or-suppress logic built into the try-with-resources for free, instead of the shadowing behavior that a normal finally clause has. If I just wanted to log exceptions in the finally operation, I think I would have used a Utils.closeQuietly call in the finally instead. Recently in #14277 i did a bare log in the finally instead of try-with-resources, as I didn't need the propagate-or-suppress logic. Maybe I should have used closeQuietly, but I didn't think of that at the time for whatever reason.
Most of my friction with the as-implemented version is that you're using the UncheckedClosable and all of the readability costs, without getting the benefit of the propagate-or-suppress logic.
There was a problem hiding this comment.
Thanks for an exhaustive analysis, I now see the error of my ways 🙏
I've pushed a new commit migrating the Utils::closeQuietly call to a finally block. The MaybeCloseable idea is fascinating but I don't think the additional cognitive burden is worth the small bump in ergonomics for this change. I do think it might be worth it to apply it across the whole code base (sort of like how #13185 forces the whole code base to be aware of plugin-thrown exceptions, at least for Connector instances).
I honestly find the If the stylistic suggestions are not blockers for review, but blockers for merging, do you think we could establish the ideal user-facing behavior here and then use a separate PR for a refactoring? I can target this branch with that PR (which would allow review to take place on it without having to merge this one to trunk), or target trunk (if we feel comfortable merging this without blocking on a refactor). And of course, if the stylistic suggestions are blockers for review, let me know and I can take a stab at that without doing anything fancy 😄 |
452d38b to
6781085
Compare
Sure, I'll buy that. I'm fine with migrating away from EnrichablePlugin to something else as long as it is a common abstraction. My concern here was just that we were adding a distinct third style of validating configurations when there appeared to be a lot of common functionality that could be shared.
I'm fine with reviewing this as-is and merging to trunk, and then refactoring the other two strategies in a follow-up. I think using lambdas is more appropriate than anonymous classes which are constructed for just one method call and then discarded. |
There was a problem hiding this comment.
I believe this is the only thing which makes this a converter-specific function. All of the other logic appears generic across all plugin types. It could be replaced with a Consumer<Map<String, String>> function which mutates the config before it is passed to the plugin, or a Function<Map<String, String>, Map<String, String>> if you wanted to be pure.
Then all of the variable names can just become plugin instead of converter.
There was a problem hiding this comment.
I like leaving room for a potential future refactor that uses this method for more than just converters, but there is one more part here that's converter-specific: we permit converters to return null ConfigDef objects from their config methods. We don't allow, e.g., SMTs or predicates to do the same.
To save time on potential future refactoring, I've renamed every parameter and variable in this method to use "plugin" instead of "converter, but I've kept the method name itself (validateConverterConfig) the same, and have retained some converter-specific language in log messages. I've also used a Map<String, String> defaultProperties instead of the functional programming approach in order to be a little more concise with existing caller code; hope this isn't too controversial. Let me know if this strikes the right balance!
I've also added a Javadoc to this method with examples for most parameters, since the parameter names only helped so much and even I was having trouble recalling how this method worked after a few months without reading it.
Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
… common logic into Utils::ensureConcrete
…ableClassValidator::ensureValid
6781085 to
756729e
Compare
… connector header, key, and value converters
…onfigDefFromConfigProvidingClass
756729e to
5b33be2
Compare
|
@gharris1727 similar to #14304 - apologies for the delay, I've addressed all outstanding concerns, ready for another round when you have time 👍 |
|
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
|
Hi @gharris1727 sorry for the delay. I've resolved the merge conflicts; let me know what you think if you have a moment. |
|
@gharris1727 I've resolved the merge conflicts again; can you please take a look when you get a chance? |
gharris1727
left a comment
There was a problem hiding this comment.
Thanks for the improvement @C0urante, it's a long time coming :)
I think your approach to add unexpected Exceptions to the class parameter is a good idea. Since this is exercising some new plugin behaviors there is a very reasonable chance for some exceptions coming from the plugin code. Having those included in the validation flow seems like the most reasonable approach.
| ConfigInfos headerConverterConfigInfos = validateHeaderConverterConfig(connectorProps, validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG)); | ||
| ConfigInfos keyConverterConfigInfos = validateKeyConverterConfig(connectorProps, validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG)); | ||
| ConfigInfos valueConverterConfigInfos = validateValueConverterConfig(connectorProps, validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG)); |
There was a problem hiding this comment.
nit: these constants are provided here, and also hard-coded inside these functions
There was a problem hiding this comment.
Yeah, the alternative was to pass in the entire validatedConnectorConfig and let the various validateXxxConverterConfig methods pull out the relevant ConfigValue field. But this seemed strange considering those methods only require a single ConfigValue object.
|
|
||
| try { | ||
| ConfigDef configDef; | ||
| try { |
There was a problem hiding this comment.
Do you want to add a stage for this call into the plugin?
There was a problem hiding this comment.
Good call, added several.
| Set<String> groups = new LinkedHashSet<>(); | ||
| List<ConfigInfo> configInfos = new ArrayList<>(); | ||
|
|
||
| if (configValues == null) { |
There was a problem hiding this comment.
I think this null check is only relevant when the value is coming from the overridePolicy.validate, in validateConverterConfig, I think the ConfigDef#validate call will always be non-null.
There was a problem hiding this comment.
ConfigDef::validate is non-final, and plugin instances may return a subclass from their config methods that possibly returns null.
I acknowledge that this is extremely unlikely, but it seems like this null guard is the best way to handle that scenario as opposed to, e.g., throwing an error and causing a 500 response to be returned. Thoughts?
|
|
||
| T pluginInstance; | ||
| try { | ||
| pluginInstance = Utils.newInstance(pluginClass, pluginInterface); |
There was a problem hiding this comment.
This is where the connector validation used the tempConnectors cache to re-use the connector objects.
Personally i'm fine with per-call instantiation, but thought it would be worth mentioning.
There was a problem hiding this comment.
Good point, and definitely worth calling out. I suspect this won't be a resource bottleneck and GC will be sufficient to clean up these instances before they eat up too much memory, but if not then we can certainly look into caching these plugin instances.
| List<ConfigValue> configValues; | ||
| try { | ||
| configValues = configDef.validate(pluginConfig); | ||
| } catch (RuntimeException e) { |
There was a problem hiding this comment.
| } catch (RuntimeException e) { | |
| } catch (Exception e) { |
There was a problem hiding this comment.
I don't understand this suggestion. When could a checked exception be thrown?
There was a problem hiding this comment.
For this and the Utils.newInstance comment i was thinking about preparing for #13185 but I suppose these can be addressed in that PR.
There was a problem hiding this comment.
Got it--yeah, I'd prefer to leave as-is to reduce FUD in case #13185 doesn't get merged soon.
| ConfigDef configDef; | ||
| try { | ||
| configDef = configDefAccessor.apply(pluginInstance); | ||
| } catch (RuntimeException e) { |
There was a problem hiding this comment.
| } catch (RuntimeException e) { | |
| } catch (Exception e) { |
| T pluginInstance; | ||
| try { | ||
| pluginInstance = Utils.newInstance(pluginClass, pluginInterface); | ||
| } catch (ClassNotFoundException | RuntimeException e) { |
There was a problem hiding this comment.
| } catch (ClassNotFoundException | RuntimeException e) { | |
| } catch (Exception e) { |
| configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo)); | ||
| } | ||
| return new ConfigInfos(connectorClass.toString(), errorCount, new ArrayList<>(groups), configInfoList); | ||
| return new ConfigInfos("", errorCount, new ArrayList<>(groups), configInfos); |
There was a problem hiding this comment.
nit: i don't like this empty string, but i see that it has no overall effect. I wonder if the validator can just return List and compute errorCount/groups at the end.
This doesn't have to be done in this PR.
There was a problem hiding this comment.
Yeah, it's a little weird with the empty string here. Hopefully it's fine for now but if we continue augmenting and refactoring this class I agree that it might be worth changing.
|
Thanks @gharris1727, can you give this another round? |
gharris1727
left a comment
There was a problem hiding this comment.
I only had nits, some of my comments in the last review don't need to be addressed.
| import java.util.EnumSet; | ||
| import java.util.Map.Entry; | ||
| import java.util.SortedSet; | ||
| import java.util.TreeSet; |
There was a problem hiding this comment.
I think this should be reverted, we don't touch anything else in this file.
| List<ConfigValue> configValues; | ||
| try { | ||
| configValues = configDef.validate(pluginConfig); | ||
| } catch (RuntimeException e) { |
There was a problem hiding this comment.
For this and the Utils.newInstance comment i was thinking about preparing for #13185 but I suppose these can be addressed in that PR.
|
Thanks @gharris1727. I've reverted the changes to |
… for connector header, key, and value converters (apache#14309) Reviewers: Greg Harris <greg.harris@aiven.io>
Jira 1, Jira 2
Depends on #14304
Although the
ConverterandHeaderConverterinterfaces both include methods to provide aConfigDefobject, we don't make use of them during preflight configuration validation. This makes Kafka Connect harder to use with, among other things, programmatic UIs that perform automatic validation as soon as users enter in new key/value pairs. Additionally, it causes errors in configuration to surface at runtime (after rebalances have taken place and while tasks are being instantiated), which is less convenient for all users.This PR uses these
ConfigDefobjects for preflight validation if the user specifies a custom key, value, and/or header converter class in their connector config. If a key, value, or header converter returns a nullConfigDef, we simply log a warning. Although this is technically disallowed by theConverterandHeaderConverterAPI, because we have not been enforcing that requirement up til now, we permit that case in order to not break existing setups.Committer Checklist (excluded from commit message)