KAFKA-9673: Filter and Conditional SMTs#8699
Conversation
|
Still need to review test coverage, but @kkonstantine, @mimaison, @bbejeck you might want to give it an initial pass. |
|
@C0urante you might also want to take a look. |
C0urante
left a comment
There was a problem hiding this comment.
Looking pretty good @tombentley! Thanks for the ping.
I was able to get through all of the non-test files except for ConnectorConfig; will try to return and make another pass by the end of the week to cover the rest.
Also, do you think it might be worth it to write some integration tests for this stuff? I'm thinking at the very least it'd be cool to verify that Predicates work end-to-end for both source and sink connectors, and since we'd need to pick some SMTs to test the functionality of the Predicate plugin with, it might be a convenient excuse to also end-to-end test the new Filter SMT introduced here.
There was a problem hiding this comment.
Same comment here as with Filter; probably want to return a non-null ConfigDef here.
There was a problem hiding this comment.
This is related to the issue discussed about configure().
We could return an empty ConfigDef here, but that would be a lie which could ultimately lead some other error if someone tried to use it with configure().
We can't invent a ConfigDef schema for this because the PredicatedTransformer would need to know about the Transformer is was going to be wrapping, but it can't know that before it's been configured with at least the Transformer's ConfigDef and it can't be configured before config() has been called. So we have a chicken and egg problem. Something (ConnectorConfig) must have some a priori knowledge of either PredicatedTransformer's ConfigDef, or know how to configure it without needing to call config() at all.
Since PredicatedTransformer is a purely internal class which will never be directly exposed to Connect users, we're not obliged to stick to the contract of config() and configure(). i.e. So both PredicatedTransformer.config and PredicatedTransformer.configure can throw when called, since we know no one else can call them and we know ConnectorConfig never will.
There was a problem hiding this comment.
Hmmm... I think there might be some awkwardness here with trying to make PredicatedTransformer implement the Transformation interface. Could we replace every Transformation in the TransformationChain's transformation list with a PredicatedTransformer and, if there are no predicates configured for a transform by the user, make the default behavior for the PredicatedTransformer class to blindly apply its transformation?
This would solve a few problems:
- No risk of users trying to actually use a
PredicatedTransformerin a connector config, which they may try to do if we don't add logic to prevent it from being picked up during plugin path scanning on startup and logged as an SMT plugin - No need to implement methods that aren't used
- One code path instead of two for application of transformations
- More flexibility in instantiation and, possibly, the ability to encapsulate some of the
ConfigDefgeneration logic in a separate class fromConnectorConfig(haven't looked into the specifics of this yet so may not actually be feasible or that elegant)
There was a problem hiding this comment.
Do we need to remove these properties here, or can we just read them? Removing might cause issues with SMTs that have config properties with these names; would leaving them in be likely to cause issues as well?
There was a problem hiding this comment.
The compatibility section of the KIP says that if a connector already has these configs then they'll be masked by the new implicit configs. If we don't remove them here then we'd be passing the KIP-585 configs to a connector which had it's own semantics for those config keys, which would be incorrect.
There was a problem hiding this comment.
Hmmm... wish we'd caught that earlier. Seems safer to just leave the properties in, but unless we want to call for a re-vote and an extension on the KIP deadline guess we'll have to keep this as-is.
|
@C0urante thanks for the review, some excellent points there! I think an integration test is a great idea, which I'll work on next. I've addressed all your other comments. |
kkonstantine
left a comment
There was a problem hiding this comment.
Thanks for the PR @tombentley. This looks in the right direction.
I managed to do a first pass. Comments are mostly minor.
Yet, we'll need to consider correct that the new package and interfaces are correctly included or excluded in PluginUtils to make these classes compatible with classloading isolation.
* Add Predicate interface * Add Filter SMT * Add the predicate implementations defined in the KIP. * Create abstraction in ConnectorConfig for configuring Transformations and Connectors with the "alias prefix" mechanism * Add tests and fix existing tests.
9d0ec29 to
e76a0b4
Compare
|
Rebased for conflict. @kkonstantine I've addressed those first comments, thanks! Still some work on the integration test (not passing when run via gradle). |
|
ok to test |
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @tombentley. I made a first pass, and for the most part this looks good. A few comments/questions below. I'll keep reviewing in more detail.
| .build(); | ||
|
|
||
| // start the clusters | ||
| connect.start(); |
There was a problem hiding this comment.
Should we wait until all brokers and Connect workers are available, via something like:
connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers did not start in time.");
connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker did not start in time.");
| connectorHandle.expectedCommits(numFooRecords); | ||
|
|
||
| // start a sink connector | ||
| connect.configureConnector(CONNECTOR_NAME, props); |
There was a problem hiding this comment.
This is an asynchronous method, and it's likely the connector will not be started and running before the test proceeds to the next statements. This can lead to very flaky tests.
We could instead wait until the connector is actually running, using something like:
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time.");
| String value = simpleConfig.getString(PATTERN_CONFIG); | ||
| try { | ||
| result = Pattern.compile(value); | ||
| } catch (PatternSyntaxException e) { | ||
| throw new ConfigException(PATTERN_CONFIG, value, "entry must be a Java-compatible regular expression: " + e.getMessage()); | ||
| } |
There was a problem hiding this comment.
Can we ever get to line 64? The constructor of the config (line 58) should fail if the pattern validator fails to ensure the pattern is a valid regex, which means that if we make it past 58 then line 62 will never fail.
Am I missing something?
Co-authored-by: Randall Hauch <rhauch@gmail.com>
rhauch
left a comment
There was a problem hiding this comment.
Got through the test code, and found a few more suggestions.
| } | ||
| } | ||
|
|
||
| @Ignore("Is this really an error. There's no actual need for the predicates config (unlike transforms where it defines the order).") |
There was a problem hiding this comment.
It'd be good to have the test reflect the current behavior.
| try { | ||
| new ConnectorConfig(MOCK_PLUGINS, props); | ||
| fail(); | ||
| } catch (ConfigException e) { | ||
| assertTrue(e.getMessage().contains("Value must be at least 80")); | ||
| } |
There was a problem hiding this comment.
We've moved to using assertThrows here, which would look something like:
| try { | |
| new ConnectorConfig(MOCK_PLUGINS, props); | |
| fail(); | |
| } catch (ConfigException e) { | |
| assertTrue(e.getMessage().contains("Value must be at least 80")); | |
| } | |
| ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props)); | |
| assertTrue(e.getMessage().contains("Value must be at least 42")); |
|
ok to test |
…d more validation to TopicNameMatches
|
Added several commits that fixed the failing unit and integration tests, and addressed several of my other comments. |
|
ok to test |
|
retest this please |
1 similar comment
|
retest this please |
|
Builds were all aborted: |
|
retest this please |
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @tombentley! LGTM, pending a green build (which are currently waiting for executors).
|
ok to test |
|
Jenkins jobs were never associated with this PR (see INFRA-20344), but here are the jobs for the latest commit: |
|
retest this please |
kkonstantine
left a comment
There was a problem hiding this comment.
LGTM, pending a green build
Nice work @tombentley !
|
@rhauch @kkonstantine thanks very much for making those fixes for me, I appreciate the time that must've taken. I've made one more trivial correction to the integration test. |
|
Thanks, @tombentley. Merging with 3 green builds that Jenkins did not tie back to this PR. |
* 'trunk' of github.com:apache/kafka: (36 commits) Remove redundant `containsKey` call in KafkaProducer (apache#8761) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) MINOR: Log the reason for coordinator discovery failure (apache#8747) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) MINOR: remove unnecessary timeout for admin request (apache#8738) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) MINOR: Update documentation.html to refer to 2.6 (apache#8745) MINOR: Update documentation.html to refer to 2.5 (apache#8744) KAFKA-9673: Filter and Conditional SMTs (apache#8699) KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720) KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735) MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728) KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736) KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731) KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589) ... # Conflicts: # core/src/main/scala/kafka/log/Log.scala
Committer Checklist (excluded from commit message)