Skip to content

KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations#8654

Merged
rhauch merged 8 commits intoapache:trunkfrom
rhauch:kafka-9931-kip-605
May 23, 2020
Merged

KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations#8654
rhauch merged 8 commits intoapache:trunkfrom
rhauch:kafka-9931-kip-605

Conversation

@rhauch
Copy link
Copy Markdown
Contributor

@rhauch rhauch commented May 12, 2020

KIP-605 has passed.

Expanded the allowed values for the internal topics’ replication factor and partitions from positive values to also include -1 to signify that the broker defaults should be used.

The Kafka storage classes were already constructing a NewTopic object (always with a replication factor and partitions) and sending it to Kafka when required. This change will avoid setting the replication factor and/or number of partitions on this NewTopic if the worker configuration uses -1 for the corresponding configuration value.

Quite a few new tests were added to verify that the TopicAdmin utility class is correctly using the AdminClient, that the DistributedConfig validators for these configurations are correct, and that the DistributedConfig is correctly assembling the configuration properties that define the topic settings, which are now accessed by the three Kafka storage objects before they create the topics.

Also added support for additional topic settings used when creating the config, status, and offset internal topics.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@rhauch rhauch requested a review from kkonstantine May 12, 2020 14:19
@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 12, 2020

This includes the commit from #8653, which corrects the replication factor properties in one of the MirrorMaker2 integration test, which should have been:

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

but instead were

config.storage.topic.replication.factor=1
offset.storage.topic.replication.factor=1
status.storage.topic.replication.factor=1

Note the extra topic. in the property name. Because these didn't match the expected *.storage.replication.factor properties, they new changes considered the topic.replication.factor to be a topic setting that was passed via the NewTopic object and AdminClient to the broker with the topic creation request. However, that failed with the following exception:

[2020-05-11 15:12:01,518] ERROR [Worker clientId=connect-1, groupId=backup-mm2] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:297)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'mm2-offsets.backup.internal'
        at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:309)
        at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:105)
        at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
        at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:115)
        at org.apache.kafka.connect.runtime.Worker.start(Worker.java:186)
        at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:123)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:284)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: topic.replication.factor
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:274)
        ... 11 more
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: topic.replication.factor

IOW, this change causes a Connect distributed worker or MirrorMaker2 to fail upon startup where it didn't prior to this change if/when the configuration contains extra properties that begin with any of the following prefixes and where the unprefixed configuration does not match a valid topic setting in the Kafka broker being used:

  • offset.storage.*
  • config.storage.*
  • status.storage.*

@rhauch rhauch added the connect label May 13, 2020
@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 14, 2020

BTW, I've improved the error message when Connect's TopicAdmin fails to create a topic because of an unknown topic setting:

Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:297)
org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.backup.internal': Unknown topic config name: topic.replication.factor

Here's what this looks like in the log message just before the herder exits:

[2020-05-14 09:36:22,348] ERROR [Worker clientId=connect-2, groupId=backup-mm2] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:297)
org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.backup.internal': Unknown topic config name: topic.replication.factor
	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:305)
	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:105)
	at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:115)
	at org.apache.kafka.connect.runtime.Worker.start(Worker.java:186)
	at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:123)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:284)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: topic.replication.factor

@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 14, 2020

I've added some integration tests for creating the internal topic, including verifying some existing functionality w/r/t the replication factor and number of partitions with various Kafka cluster sizes.

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @rhauch
This looks pretty good already!

I left a few minor comments after a first pass.

Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java Outdated
Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java Outdated
Comment thread connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java Outdated
@kkonstantine
Copy link
Copy Markdown
Contributor

fyi, builds fail on checkstyle.

@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 20, 2020

Thanks for the review, @kkonstantine. I think I've incorporated all of your suggestions.

@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 20, 2020

ok to test

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ready.
Added a few optional comments that caught my eye on the last pass, but otherwise
LGTM

@kkonstantine
Copy link
Copy Markdown
Contributor

retest this please

rhauch added 8 commits May 21, 2020 13:06
…r distributed worker internal topics

Expanded the allowed values for the internal topics’ replication factor and partitions from positive values to also include -1 to signify that the broker defaults should be used.

The Kafka storage classes were already constructing a `NewTopic` object (always with a replication factor and partitions) and sending it to Kafka when required. This change will avoid setting the replication factor and/or number of partitions on this `NewTopic` if the worker configuration uses -1 for the corresponding configuration value.

Quite a few new tests were added to verify that the `TopicAdmin` utility class is correctly using the AdminClient, and that the `DistributedConfig` validators for these configurations are correct.
…istributed config

Added support for additional topic settings used when creating the config, status, and offset internal topics.
@rhauch rhauch force-pushed the kafka-9931-kip-605 branch from 74801cb to 4e2d988 Compare May 21, 2020 19:04
@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 21, 2020

Incorporated @kkonstantine's more recent suggestions, and further changed TopicAdmin to use existing TopicConfig constants rather than string literals.

Rebased to pick up the changes from #8653 rather than incorporating the same commit in this PR.

@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 21, 2020

FYI: previous builds had no failures related to Connect.

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rhauch
LGTM, expecting that the current build won't have related failures as well.

@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 22, 2020

retest this please

@rhauch
Copy link
Copy Markdown
Contributor Author

rhauch commented May 23, 2020

All Connect unit and integration tests passed in all of the builds, but each of the builds failed due to a few flaky integration tests in either Core or Streams.

@rhauch rhauch merged commit 981ef51 into apache:trunk May 23, 2020
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request May 24, 2020
* 'trunk' of github.com:apache/kafka:
  KAFKA-9888: Copy connector configs before passing to REST extensions (apache#8511)
  KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations (apache#8654)
  KAFKA-6145: Add unit tests for assignments of only stateless tasks (apache#8713)
  MINOR: Fix join group request timeout lower bound (apache#8702)
  MINOR: Improve security documentation for Kafka Streams apache#8710
  KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (apache#8696)
  KAFKA-10003: Mark KStream.through() as deprecated and update Scala API (apache#8679)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants