KAFKA-13793: Add validators for configs that lack validators#12010
KAFKA-13793: Add validators for configs that lack validators#12010mimaison merged 61 commits intoapache:trunkfrom
Conversation
… cpu and traffic on the broker side increase sharply JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310 Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
…sets method JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310 Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
2. Optimize the import of package Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
add test Method "testForceMetadataDeleteForPatternSubscriptionDuringRebalance()" Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us> Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 riven.sun@zoom.us
� Conflicts: � clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
|
Hi @C0urante @divijvaidya |
|
Hi @showuon @C0urante @divijvaidya |
|
Hi @showuon @guozhangwang |
C0urante
left a comment
There was a problem hiding this comment.
@RivenSun2 it's fine to modify the CommonClientConfigs class without a KIP.
| else { | ||
| if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null) | ||
| throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG + " configuration for KafkaConsumer must be non-null."); | ||
| } |
There was a problem hiding this comment.
| else { | |
| if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null) | |
| throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG + " configuration for KafkaConsumer must be non-null."); | |
| } | |
| else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null) | |
| throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); |
| else { | ||
| if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) | ||
| throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG + " configuration for KafkaConsumer must be non-null."); | ||
| } |
There was a problem hiding this comment.
| else { | |
| if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) | |
| throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG + " configuration for KafkaConsumer must be non-null."); | |
| } | |
| else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) | |
| throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); |
| if (keySerializer != null) | ||
| newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass()); | ||
| else { | ||
| if (newConfigs.get(KEY_SERIALIZER_CLASS_CONFIG) == null) |
There was a problem hiding this comment.
Similar to what Chris Egerton has suggested above these can also be collapsed into else-if
| if (valueSerializer != null) | ||
| newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass()); | ||
| else { | ||
| if (newConfigs.get(VALUE_SERIALIZER_CLASS_CONFIG) == null) |
There was a problem hiding this comment.
Can you use HashMap#containsKey if checking for the presence of a value or am I misreading something?
There was a problem hiding this comment.
Thank you for your suggestion, here we want to check the case where the key exists but the value is null.
There was a problem hiding this comment.
Oh, I see. So this check covers the situations a) the key is missing and b) the key is there and the value is null, while what I suggested would only cover a). Is there a possibility that you add a comment saying this above the line?
|
|
||
| /************* Authorizer Configuration ***********/ | ||
| .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) | ||
| .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc) |
There was a problem hiding this comment.
I think Scala allows you to just write ConfigDef.NonNullValidator without the new keyword and the (), but I might be wrong.
There was a problem hiding this comment.
This should be fine, I see the same in this file : new RaftConfig.ControllerQuorumVotersValidator()
There was a problem hiding this comment.
Can a STRING configuration be null? I wonder if this validator can actually trigger
| CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, | ||
| ConfigDef.Type.STRING, | ||
| CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, | ||
| in(Utils.enumOptions(classOf[SecurityProtocol]):_*), |
There was a problem hiding this comment.
This is very interesting. I got thrown off by reading this after seeing in(Utils.enumOptions(SecurityProtocol.class)) because I couldn't understand why do we unpack the arguments in one of the situations, but not in the other. I suspect this is because in the Java files we don't use the Scala in function?
There was a problem hiding this comment.
Actually referring to the ConfigDef.ValidString.in(String... validStrings) method.
There was a problem hiding this comment.
Ah, I see. The Scala file imports the in method
There was a problem hiding this comment.
Maybe Scala does not support indefinite parameters, that is String... param
There was a problem hiding this comment.
Fascinating, I wasn't aware of this. Okay, this makes sense as well.
C0urante
left a comment
There was a problem hiding this comment.
LGTM! Thanks @RivenSun2, feel free to address the nits at your discretion.
I haven't taken a close look at any of the broker or Streams changes, but Connect+clients seem solid.
| if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { | ||
| if (clientSaslMechanism == null || clientSaslMechanism.trim().isEmpty()) { | ||
| throw new ConfigException(SaslConfigs.SASL_MECHANISM, null, "When the " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + | ||
| " configuration enables the SASL mechanism, mechanism must be non-null or non-empty string."); |
There was a problem hiding this comment.
Nit:
| " configuration enables the SASL mechanism, mechanism must be non-null or non-empty string."); | |
| " configuration enables SASL, mechanism must be non-null and non-empty"); |
|
|
||
| @Override | ||
| protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { | ||
| CommonClientConfigs.postValidateSaslMechanismConfig(this); |
There was a problem hiding this comment.
This is pretty smooth--you've only added one line to an existing method in this class, and we now get intelligent, conditional validation of the SASL mechanism config. Nice job!
| configs.put(SaslConfigs.SASL_MECHANISM, " "); | ||
| ce = assertThrows(ConfigException.class, () -> new AdminClientConfig(configs)); | ||
| assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM)); |
There was a problem hiding this comment.
Nit: I don't think it's worth adding this test case. At this point we're really verifying the behavior of the AbstractConfig and ConfigDef classes and how they automatically trim strings, not any special logic in the AdminClient class itself.
There was a problem hiding this comment.
Yes, agree with you, this configuration validation is not the behavior of AdminClient. But I hope to follow ProducerConfigTest or ConsumerConfigTest and hand over this configuration verification to AdminClientConfigTest
There was a problem hiding this comment.
I agree with @C0urante, we don't need to have that test case in all these classes.
There was a problem hiding this comment.
Can we also remove it from ConsumerConfig and ProducerConfig?
| SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); | ||
| String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM); | ||
| if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { | ||
| if (clientSaslMechanism == null || clientSaslMechanism.trim().isEmpty()) { |
There was a problem hiding this comment.
Nit: We don't need to trim here, do we?
| if (clientSaslMechanism == null || clientSaslMechanism.trim().isEmpty()) { | |
| if (clientSaslMechanism == null || clientSaslMechanism.isEmpty()) { |
There was a problem hiding this comment.
Strings containing only spaces are also considered illegal values, as indicated in the corresponding testCase.
configs.put(SaslConfigs.SASL_MECHANISM, " ");
ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
There was a problem hiding this comment.
Isn't the string automatically trimmed here by the AbstractConfig?
There was a problem hiding this comment.
Oh, you are right. The ConfigDef.parseType method already does this logic.
Thanks.
|
Hi @showuon @guozhangwang |
|
Hi @showuon @guozhangwang |
|
Hi @guozhangwang @mimaison |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the PR, it's a nice cleanup. I left a few comments.
| protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs, | ||
| Deserializer<?> keyDeserializer, | ||
| Deserializer<?> valueDeserializer) { | ||
| // validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value |
There was a problem hiding this comment.
Do we need these extra checks?
If I don't set serdes with trunk, I get an error:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:641)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
AbstractConfig should enforce each field has a valid value. We need to perform extra validation when we need multiple fields to have compatible values.
There was a problem hiding this comment.
The situation you describe should be that the key.deserializer parameter is not set.
The changes here are to deal with two situations:
-
As described in JIRA, when the user explicitly sets null, expect a
ConfigExceptioninstead of aNullPointerException.
eg:props.put("key.serializer", null); -
In order to be compatible with the usage scenarios of some users, as @C0urante mentioned, the user will only pass in the serializer instance when building the KafkaProducer:
I'm wondering if people might be constructing ProducerConfig and ConsumerConfig instances in order to do some kind of validation before storing a configuration somewhere, that's then used later to create a producer/consumer. If so, they might be omitting the (de)serializer properties (or rather, just using null as a placeholder) if the (de)serializer isn't known at that time and they plan on using the constructor that accepts already-instantiated Serializer/Deserializer instances.
| configs.put(SaslConfigs.SASL_MECHANISM, " "); | ||
| ce = assertThrows(ConfigException.class, () -> new AdminClientConfig(configs)); | ||
| assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM)); |
There was a problem hiding this comment.
I agree with @C0urante, we don't need to have that test case in all these classes.
| "a.bootstrap.servers", "servers-one", | ||
| "b.bootstrap.servers", "servers-two", | ||
| "security.protocol", "SASL", | ||
| "security.protocol", "SSL", |
There was a problem hiding this comment.
We currently don't have checks in any of the *Config classes (this is what this PR is addressing). But obviously when creating an actual client, the value of security.protocol is used and only at that point you get an exception with a bad value is set. So MirrorMaker can't run with security.protocol=SASL.
For MirrorMaker, you get:
[2022-05-03 20:32:41,008] ERROR Stopping due to error (org.apache.kafka.connect.mirror.MirrorMaker:313)
org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:545)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:143)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:52)
at org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:236)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:137)
at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:149)
at org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:300)
Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol.SASL
at java.base/java.lang.Enum.valueOf(Enum.java:240)
at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:516)
With this PR, you would get:
[2022-05-03 20:38:13,763] ERROR Stopping due to error (org.apache.kafka.connect.mirror.MirrorMaker:313)
org.apache.kafka.common.config.ConfigException: Invalid value SASL for configuration security.protocol: String must be one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:961)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:499)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:146)
at org.apache.kafka.clients.admin.AdminClientConfig.<init>(AdminClientConfig.java:234)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:143)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:52)
at org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:236)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:137)
at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:149)
at org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:300)
|
|
||
| /************* Authorizer Configuration ***********/ | ||
| .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) | ||
| .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc) |
There was a problem hiding this comment.
Can a STRING configuration be null? I wonder if this validator can actually trigger
Adding |
� Conflicts: � core/src/main/scala/kafka/server/KafkaConfig.scala
|
Hi @mimaison Thank you for your review. |
|
Hi @mimaison Thank you for your review. |
| } | ||
|
|
||
| @Test | ||
| public void testInvalidSaslMechanism() { |
There was a problem hiding this comment.
This is testing SaslConfigs.SASL_MECHANISM, so would it be better to move this test in SaslConfigsTest
There was a problem hiding this comment.
If we move this testCase to SaslConfigsTest, the checkstyle of SaslConfigsTest will fail.
Disallowed import - org.apache.kafka.clients.CommonClientConfigs.[ImportControl]
There was a problem hiding this comment.
You can modify this file to prevent those errors.
There was a problem hiding this comment.
Actually since this is testing logic that is in CommonClientConfigs, maybe it's better to keep this here. Otherwise we'd move postValidateSaslMechanismConfig() in SaslConfigs
| ce = assertThrows(ConfigException.class, () -> new TestConfig(configs)); | ||
| assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM)); | ||
|
|
||
| configs.put(SaslConfigs.SASL_MECHANISM, " "); |
There was a problem hiding this comment.
Let's also drop this last case. This is testing AbstractConfig and it seems unrelated to the rest of this PR.
Add validators for configs that lack validators