Skip to content

MINOR: ignore unused configuration when ConsumerCoordinator is not constructed#12041

Merged
guozhangwang merged 41 commits intoapache:trunkfrom
RivenSun2:minor-ignoreConsumerConfig
Apr 15, 2022
Merged

MINOR: ignore unused configuration when ConsumerCoordinator is not constructed#12041
guozhangwang merged 41 commits intoapache:trunkfrom
RivenSun2:minor-ignoreConsumerConfig

Conversation

@RivenSun2
Copy link
Copy Markdown
Contributor

Following PR #11940, ignore unused config when ConsumerCoordinator is not constructed

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

RivenSun2 and others added 30 commits September 19, 2021 17:59
… cpu and traffic on the broker side increase sharply

JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310

Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
…sets method

JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310

Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
2. Optimize the import of package

Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
add test Method "testForceMetadataDeleteForPatternSubscriptionDuringRebalance()"

Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 <riven.sun@zoom.us>

Reviewers: Luke Chen <showuon@gmail.com>
Author: RivenSun2 riven.sun@zoom.us
� Conflicts:
�	clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@RivenSun2
Copy link
Copy Markdown
Contributor Author

Hi @guozhangwang @showuon
please help to review PR when available.
Thanks.

new ConsumerCoordinator(groupRebalanceConfig,
if (!groupId.isPresent()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED config is related to groupID?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the value of groupId will affect whether to construct the consumerCoordinator,
Constructing consumerCoordinator will retrieve THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED config.

@guozhangwang guozhangwang merged commit cf5e714 into apache:trunk Apr 15, 2022
@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM. Merged to trunk.

@C0urante
Copy link
Copy Markdown
Contributor

@RivenSun2 Isn't this a correct message to log? These properties are unused if a group ID is supplied, aren't they? That lines up pretty well with the new log message wording of "supplied but are not used yet".

BTW, thanks for cleaning that stuff up into a single message--much nicer to read now!

@RivenSun2
Copy link
Copy Markdown
Contributor Author

Hi @C0urante Thank you for your reply. I see what you mean.
What we now expect is that a configuration is supplied but not used due to special behavior inside KafkaClient, and we want to be able to ignore this configuration.
For example, the serialization and deserialization configuration is treated the same as the initialization of KafkaConsumer and KafkaProducer.
Thanks.

@RivenSun2
Copy link
Copy Markdown
Contributor Author

In other words, for a configuration (including unknownConfig), if neither Kafka nor the user retrieves this configuration when the KafkaClient completes initialization, the log prompt will only be printed.

Kafka is responsible for retrieving all knownConfigs, just like the transactional.id configuration, it will be used whether the user has passed in transactional.id or not.

            String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

            this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

@C0urante
Copy link
Copy Markdown
Contributor

If Kafka doesn't retrieve a config because it has no effect due to other configs, isn't it still valid to warn the user?

I agree that if there's special internal logic that causes Kafka not to call get for a config even though it's applicable for the given client/scenario, then we shouldn't log a warning. The case of (de)serializer properties in the consumer and producer fall into this category.

But if we're not calling get because of publicly-documented behavior, a warning could be useful in that case to clear up potential misunderstandings by the user of what the configs they passed in actually do. The case of configs related to offset commits like the auto.commit.interval.ms fall into this category when no group.id is specified.

Not sure which category that the internal internal.throw.on.fetch.stable.offset.unsupported property falls into to be honest, but since it appears to be intended for use exclusively by Kafka Streams and isn't publicly documented, we probably should keep hiding warning messages that mention it since it probably originates from Kafka Streams anyways and directly not from users.

@C0urante
Copy link
Copy Markdown
Contributor

Also worth noting that if we really do want to disable all unused config warnings for all (public) configs that Kafka defines, we could probably do this automatically instead of on a case-by-case basis.

For example, in KafkaConsumer, we could modify the constructor:

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @SuppressWarnings("unchecked")
    KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        try {
            // We don't use some properties in some cases; no need to log a warning to the user for them
            ConsumerConfig.configNames().forEach(config::ignore);
            // ...
        } catch (Throwable t) {
            // ...
        }
}

This would be easier to maintain and reduce noise in the code base related to config wrangling.

@RivenSun2
Copy link
Copy Markdown
Contributor Author

Hi @C0urante Thanks for your reply, I understand what you mean.
I also agree to ignore all ignoreConfig together.

But take the two parameters of KafkaProducer as an example (there are many similar situations in KafkaConsumer):
enable.idempotence transactional.id

Regardless of whether the user has passed in these two parameters, they will always be used because of the internal behavior of kafka to retrieve these two configurations to do some logic.

Our current intention is to call the ignore method uniformly if the configuration is not retrieved due to the special behavior of Kafka.

The logUnused method may work better when users use custom plugins and configurations.

WDYT? @guozhangwang

Thanks.

@guozhangwang
Copy link
Copy Markdown
Contributor

I agree with @RivenSun2 's rationale here. More specifically I think 1) we are not really trying to just ignore all configs that are not used; instead we just want to 2) not print the unnecessary warn log entry for those configs that are rightfully ignored by the library.

@C0urante
Copy link
Copy Markdown
Contributor

@guozhangwang thanks, but it seems like the disagreement here is on what actually constitutes an unnecessary warning message. If someone believes their consumer is auto-committing when it's not, that seems worthy of a warning. Of course, this could be issued elsewhere and with more specificity, but not logging at all is probably a regression in this scenario, not an improvement.

philipnee added a commit to philipnee/kafka that referenced this pull request Apr 22, 2022
revert test commit

test commit

revert test commit

MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (apache#11985)

Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

MINOR: Correct Connect docs on connector/task states (apache#11914)

The `DESTROYED` state is represented internally as a tombstone record when running in distributed mode and by the removal of the connector/task from the in-memory status map when running in standalone mode. As a result, it will never appear to users of the REST API, and we should remove mention of it from our docs so that developers creating tooling against the REST API don't write unnecessary logic to account for that state.

Reviewers: Mickael Maison <mickael.maison@gmail.com>

KAFKA-13828; Ensure reasons sent by the consumer are small (apache#12043)

This PR reworks the reasons used in the ConsumerCoordinator to ensure that they remain reasonably short.

Reviewers: Bruno Cadonna <bruno@confluent.io>

KAFKA-13542: Add rebalance reason in Kafka Streams (apache#12018)

Reviewers: Bruno Cadonna <bruno@confluent.io>, David Jacot <djacot@confluent.io>

MINOR: Verify stopReplica if broker epoch not stale (apache#12040)

Verify that ReplicaManager.stopReplica is called if the stop replica
request doesn't result in a stale broker epoch error.

Reviewers: Mickael Maison <mimaison@users.noreply.github.com>

KAFKA-13651; Add audit logging to `StandardAuthorizer` (apache#12031)

This patch adds audit support through the kafka.authorizer.logger logger to StandardAuthorizer. It
follows the same conventions as AclAuthorizer with a similarly formatted log message. When
logIfAllowed is set in the Action, then the log message is at DEBUG level; otherwise, we log at
trace. When logIfDenied is set, then the log message is at INFO level; otherwise, we again log at
TRACE.

Reviewers: Colin P. McCabe <cmccabe@apache.org>

KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode apache#11910

In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
names. We should implement this in KRaft mode.  This PR also changes TopicCommandIntegrationTest to
support KRaft mode.

Reviewers: Colin P. McCabe <cmccabe@apache.org>

KAFKA-12613: Fix inconsistent validation logic between KafkaConfig and LogConfig (apache#10472)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

KAFKA-13823 Feature flag changes from KIP-778 (apache#12036)

This PR includes the changes to feature flags that were outlined in KIP-778.  Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.

The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands.  Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>

MINOR: Update LICENSE-binary (apache#12051)

Updates the license file.

Reviewer: Bill Bejeck <bbejeck@apache.org>

MINOR: Remove redundant conditional judgments in Selector.clear() (apache#12048)

Condition 'sendFailed' is always 'false' when reached.

Reviewers: Guozhang Wang <wangguoz@gmail.com>

MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (apache#12041)

Following PR apache#11940, ignore unused config when ConsumerCoordinator is not constructed.

Reviewers: Guozhang Wang <wangguoz@gmail.com>

[MINOR] Update upgrade documentation for 3.2 (apache#12055)

Reviewer: Bruno Cadonna <cadonna@apache.org>

MINOR: Move some TopicCommand and ConfigCommand integration tests to unit tests (apache#12024)

Move some TopicCommand and ConfigCommand integration tests to unit tests to speed up the tests

Reviewers: Luke Chen <showuon@gmail.com>

MINOR: Improve the description of principal under different mechanisms of sasl (apache#11947)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

MINOR: update comment in LocalLog.replaceSegments() (apache#12054)

Reviewers: Luke Chen <showuon@gmail.com>

MINOR: Make link in quickstart dynamic (apache#12057)

Reviewer: Matthias J. Sax <mjsax@apache.org>

KAFKA-13769: Explicitly route FK join results to correct partitions (apache#11945)

Prior to this commit FK response sink routed FK results to
SubscriptionResolverJoinProcessorSupplier using the primary key.

There are cases, where this behavior is incorrect. For example,
if KTable key serde differs from the data source serde which might
happen without a key changing operation.

Instead of determining the resolver partition by serializing the PK
this patch includes target partition in SubscriptionWrapper payloads.
Default FK response-sink partitioner extracts the correct partition
from the value and routes the message accordingly.

Reviewers: Matthias J. Sax <matthias@confluent.io>

KAFKA-13807: Fix incrementalAlterConfig and refactor some things (apache#12033)

Ensure that we can set log.flush.interval.ms at the broker or cluster level via
IncrementalAlterConfigs. This was broken by KAFKA-13749, which added log.flush.interval.ms as the
second synonym rather than the first. Add a regression test to DynamicConfigChangeTest.

Create ControllerRequestContext and pass it to every controller API. This gives us a uniform way to
pass through information like the deadline (if there is one) and the Kafka principal which is
making the request (in the future we will want to log this information).

In ControllerApis, enforce a timeout for broker heartbeat requests which is equal to the heartbeat
request interval, to avoid heartbeats piling up on the controller queue. This should have been done
previously, but we overlooked it.

Add a builder for ClusterControlManager and ReplicationControlManager to avoid the need to deal
with a lot of churn (especially in test code) whenever a new constructor parameter gets added for
one of these.

In ControllerConfigurationValidator, create a separate function for when we just want to validate
that a ConfigResource is a valid target for DescribeConfigs. Previously we had been re-using the
validation code for IncrementalAlterConfigs, but this was messy.

Split out the replica placement code into a separate package and reorganize it a bit.

Reviewers: David Arthur <mumrah@gmail.com

MINOR: Correct spelling errors in KafkaRaftClient (apache#12061)

Correct spelling errors in KafkaRaftClient

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>

MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (apache#12027)

The second validation does not verify the second bounce because the verified producer and the verified consumer are stopped in `self.run_validation`. This means that the second `run_validation` just spit out the same information as the first one. Instead, we should just run the validation at the end.

Reviewers: Jason Gustafson <jason@confluent.io>

KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date (apache#11681)

The reason why this test is flaky is because we have race condition at the beginning of the test, when brokers are staring up, and the adminClient is requesting for brokers metadata. Once the adminClient only got partial metadata, the test will fail, because in these tests, brokers will be shutdown to test leader election.

Fix this issue by explicitly waiting for metadata cache up-to-date in waitForReadyBrokers, and let admin client get created after waitForReadyBrokers.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, dengziming <dengziming1993@gmail.com>

KAFKA-13832: Fix flaky testAlterAssignment (apache#12060)

In KRaft mode the metadata is not propagate in time, so we should should wait for it before make assertions.

Reviewers:  Luke Chen <showuon@gmail.com>

KAFKA-13654: Extend KStream process with new Processor API (apache#11993)

Updates the KStream process API to cover the use cases
of both process and transform, and deprecate the KStream transform API.

Implements KIP-820

Reviewer: John Roesler <vvcephei@apache.org>

KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft (apache#12063)

Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a
broker configuration is changed. This is backwards. This function must be called only for broker
configs, and never for topic configs or cluster configs.

The second bug is that there were several configurations such as max.connections which are related
to broker listeners, but which do not involve changing the registered listeners. We should support
these configurations in KRaft. This PR fixes the configuration change validation to support this case.

Reviewers: Jason Gustafson <jason@confluent.io>, Matthew de Detrich <mdedetrich@gmail.com>

KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (apache#12004)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

MINOR: Scala cleanups in core (apache#12058)

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>

KAFKA-13799: Improve documentation for Kafka zero-copy (apache#12052)

Improve documentation for Kafka zero-copy. Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer.

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

KAFKA-13785: [5/N][emit final] cache for time ordered window store (apache#12030)

A new cache for RocksDBTimeOrderedWindowStore. Need this because RocksDBTimeOrderedWindowStore's key ordering is different from CachingWindowStore which has issues for MergedSortedCacheWindowStoreIterator

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>

KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names (apache#11703)

Reviewers: Guozhang Wang <wangguoz@gmail.com>

HOTFIX: fix broken trunk due to conflicting and overlapping commits (apache#12074)

Reviewers: Victoria Xia <victoria.xia@confluent.io>, David Arthur <mumrah@gmail.com>

MINOR: cleanup for postProcessAndValidateIdempotenceConfigs method (apache#12069)

Reviewers: Luke Chen <showuon@gmail.com>

KAFKA-13834: fix drain batch starving issue (apache#12066)

In drainBatchesForOneNode method, there's possibility causing some partitions in a node will never get picked. Fix this issue by maintaining a drainIndex for each node.

Reviewers: Luke Chen <showuon@gmail.com>, RivenSun <91005273+RivenSun2@users.noreply.github.com>

MINOR: Improve Gradle Caching and Fix Deprecations (apache#12003)

* Fix UP-TO-DATE check in `create*VersionFile` tasks

`create*VersionFile` tasks explicitly declared output UP-TO-DATE status
 as being false. This change properly sets the inputs to
`create*VersionFile` tasks to the `commitId` and `version` values and
sets `receiptFile` locally rather than in an extra property.

* Enable output caching for `process*Messages` tasks

`process*Messages` tasks did not have output caching enabled. This
change enables that caching, as well as setting a property name and
RELATIVE path sensitivity.

* Fix existing Gradle deprecations

Replaces `JavaExec#main` with `JavaExec#mainClass`

Replaces `Report#destination` with `Report#outputLocation`

Adds a `generator` configuration to projects that need to resolve
the `generator` project (rather than referencing the runtimeClasspath
of the `generator` project from other project contexts.

Reviewers: Mickael Maison <mickael.maison@gmail.com>


MINOR; Fix partition change record noop check (apache#12073)

When LeaderRecoveryState was added to the PartitionChangeRecord, the
check for being a noop was not updated. This commit fixes that and
improves the associated test to avoid this oversight in the future.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>

MINOR: revert back to 60s session timeout for static membership test (apache#11881)

Reviewers: Guozhang Wang <wangguoz@gmail.com>

KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (apache#12075)

This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically,
if we had a broker registration in the metadata log, but no associated heartbeat, previously the
HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the
metadata log replay path in ClusterControlManager.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

@C0urante Yes I agree this is a fair question to debate on, and I personally think that it could be very subjective --- as you can see in https://issues.apache.org/jira/browse/KAFKA-13689, some people may prefer "for those not picked configs since they are not needed, do not bother to log it" while others would say "please let me know". In addition, I think this is not a regression to not log an ignored config, e.g. in KafkaConsumer we have been ignoring other configs (like KEY_DESERIALIZER_CLASS_CONFIG, when the actual deserializer object is provided) without logging, so it is more seems to me to be aligned with the existing behavior.

All that being said, if we feel that the general rule of thumb should be "always let me know when some configs are ignored", then we can have this discussion and if outcome is yes we should change the behavior universally.

@C0urante
Copy link
Copy Markdown
Contributor

C0urante commented May 3, 2022

Thanks Guozhang. I think the cost of logging warnings in cases like this is fairly low as users can and should adjust their configurations to not use nonsensical properties, and the benefit can be high in the event that a user is confused about client behavior. I do sympathize with concerns that the warning for an unused property may make it seem like the property is unconditionally unrecognized (i.e., not defined by a client at all) instead of conditionally unrecognized (i.e., not used because of other properties).

One alternative could be to use the newly-introduced ConnectUtils::ensureProperty or something similar to it (possibly one that logs a warning if any value for a specific property is given, regardless of whether it matches the default). This way, we could continue logging warnings for cases like these, but make it clear exactly why the property should not be included in the config.

Either way, I think the piecemeal logic introduced in this PR is suboptimal. Dedicating one line for every to-be-ignored property is unnecessary if we want to remove these warnings for all properties defined by a client; in that case, we can use the approach I described earlier, which will be easier to maintain and take up less space.

@guozhangwang
Copy link
Copy Markdown
Contributor

Thanks @C0urante for your thoughts. I'd like to clarify one thing that, today users can pass in both defined and unknown config values, where the latter may be used in some plugin modules (e.g. Kafka Streams's partition assignor). Since the config.logUnused() is called at the end of the the client constructor, at that time the latter category of configs may not be retrieved yet, and that does not mean that they will never be retrieved later after the constructor. So the logging message: "... were supplied but are not used yet." is reasonable, as "by that time" we do not know if they are never used or not, and we cannot just call ignore on all these configs in order to not log them.

Now for the former case, generally we expect that by the time config.logUnused() is called, all defined configs should be retrieved. If the client does not retrieve them yet, AND users have specified values for those configs, it's debatable that we should let users know as a reminder that they can consider removing those overrides; whereas for those configs which are not overridden by users, we would not bother to let them know at all.

If we want to do that, I'd suggest we do it universally: i.e. for all cases, including the previous ignored cases like KEY_DESERIALIZER_CLASS_CONFIG. Maybe you can send out a discussion email in the community to ask for a consensus?

@C0urante
Copy link
Copy Markdown
Contributor

@guozhangwang to be clear, nobody is advocating that we call ignore on everything. I was proposing that we call ignore on everything that's already defined, which is pretty clear if you read the code example I gave. Does that clear things up?

@guozhangwang
Copy link
Copy Markdown
Contributor

AH yes, that's clear.

My concern was that it's assuming the defined properties should be all retrieved in the constructor (since the logUnused is called at the end of it). I think it's true for most clients --- at least I think in producer and consumer, but it may not be the case for streams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants