Skip to content

KAFKA-6145: KIP-441: Fix assignor config passthough#8716

Merged
vvcephei merged 2 commits intoapache:trunkfrom
vvcephei:kafka-6145-one-shot-balance
May 27, 2020
Merged

KAFKA-6145: KIP-441: Fix assignor config passthough#8716
vvcephei merged 2 commits intoapache:trunkfrom
vvcephei:kafka-6145-one-shot-balance

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

Also fixes a system test by configuring the HATA to perform a one-shot balanced assignment

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cadonna and @ableegoldman , can you take a look at these fixes? I'm going to add a couple more tests, but I'd like to get your early feedback if you have time.

Comment on lines 1151 to 1153
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we forgot to copy over the configs.

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a note to AssignorConfiguration pointing to this for any new assignor-related configs that get added?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we automatically pass through the internal configs? I notice we don't copy over the task assignor class, or the new assignment listener callback I added for the integration tests. But both of them seem to get through

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's deprecated, but I think the PartitionGrouper config is missing as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we copy over the other internal configs in org.apache.kafka.streams.processor.internals.StreamThread#create

I'll add your listener to the config test. I'm not sure about PartitionGrouper.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out PartitionGrouper does not get copied over. I'll create a jira to track this, so that we don't have to get sidetracked in this PR with the question of what we should do about this deprecated config.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we automatically pass through any config that isn't registered

I have to say, this seems totally backwards to me. So basically we just happen to pass in any number of configs that we may or may not need, but will split out specific configs that we do need unless explicitly told to include them? I understand the custom configs motivation, but then why not just pass through all the configs?

What if I wanted to access the value of one of my registered Streams configs in my plugged-in component? I'd have to add the same value a second time, but with an unregistered config name. Huh?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding KAFKA-10046, in current trunk we already have some logic that assumes the default partition grouper is always used, so I'd suggest we just bite the bullet and remove it in 2.6.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @guozhangwang

@ableegoldman , it seems backwards to me also.

Comment on lines 362 to 369
Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this constraint to mirror the constraint we already apply in StreamConfig. It's not critical, but I was disappointed that I had written a bunch of tests that included a technically invalid configuration.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the same for all the other configs?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should check the limits for all configs. However, I am not sure if we should check probingRebalanceIntervalMs to be >= 60 * 1000L (as we do in StreamsConfig) or just `>= 0.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could; to be fair, the config parser already checks the limits, so it's not necessary for production code.

It only comes up when we manually create these internal objects for tests. I added this particular bound specifically because I had previously passed in what I thought was a dummy value, which turned out to be a misconfiguration that actually affected the behavior I was testing.

Offhand, it doesn't seem like the same thing would happen with probingRebalanceIntervalMs, so it doesn't seem like the check here would have the same benefit; WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind; I refactored the class to use the same config validation for passed-in arguments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this useful while debugging the system test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can't have zero warmups, we don't need this condition (that I added in #8696)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing a double-space we were printing when there was a followup. (It would say with followup)

Comment on lines 59 to 63
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanding DeMorgan's law at @cadonna 's request (which I also appreciated).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is misaligned. I removed shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups and added shouldSkipWarmupsWhenAcceptableLagIsMax.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these tests erroneously set "max warmups" to zero.

Comment thread tests/kafkatest/services/streams.py Outdated
Comment on lines 47 to 49
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was handy to be able to see the used config file while debugging.

Comment thread tests/kafkatest/services/streams.py Outdated
Comment on lines 471 to 473
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this configuration to fix the flaky StreamsOptimizedTest.test_upgrade_optimized_topology

@vvcephei
Copy link
Copy Markdown
Contributor Author

FWIW:

================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.7.7
session_id:       2020-05-22--007
run time:         29 minutes 9.205 seconds
tests run:        10
passed:           10
failed:           0
ignored:          0
================================================================================
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   2 minutes 32.957 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   2 minutes 31.512 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   2 minutes 56.126 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   2 minutes 55.289 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   2 minutes 52.812 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   3 minutes 4.072 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   3 minutes 3.525 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   3 minutes 4.718 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   3 minutes 3.213 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
status:     PASS
run time:   3 minutes 4.576 seconds
--------------------------------------------------------------------------------

@vvcephei
Copy link
Copy Markdown
Contributor Author

Previously, it would always fail for me within the first one to three tests, the first one more often than not.

Comment on lines 43 to 68
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this test, and verified that it does indeed fail on trunk for the expected reason that the new configs were ignored and defaults were substituted instead.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I don't think it was meant as a cost saving thing, just to make it easier to understand when something did or did not have caught-up clients. If you find this logic easier to follow, go for it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! One nit is that "active" alone is not sufficient for being considered caught-up. Can we rename the active condition to running or activeRunning, etc?

Comment on lines 1151 to 1153
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we automatically pass through the internal configs? I notice we don't copy over the task assignor class, or the new assignment listener callback I added for the integration tests. But both of them seem to get through

Comment on lines 1151 to 1153
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's deprecated, but I think the PartitionGrouper config is missing as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the handful of AssignorConfiguration related tests from StreamsPartitionAssignorTest to here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, unfortunately, that test relies on mocking package-private fields from another package. I'll just leave it alone for now.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to avoid piling on even more unrelated questions to the thread below, but there's another config that would need to be passed in, the admin client timeout.
That said, can we just remove it? It only gets the timeout the admin is configured with anyway

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei May 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I'll take a look at it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does wind up "surviving" into the config that we use in the assignor, but I'm not sure if it's on purpose or just lucky.

I'll postpone any existential questions, and just add it to the regression test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it was also tested here: org.apache.kafka.streams.StreamsConfigTest#consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a separate PR to remove the extra timeout config

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR, @vvcephei !

Here my feedback!

Comment on lines 1151 to 1153
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Please add verifications to StreamsConfigTest#consumerConfigMustContainStreamPartitionAssignorConfig()

Comment on lines 362 to 369
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should check the limits for all configs. However, I am not sure if we should check probingRebalanceIntervalMs to be >= 60 * 1000L (as we do in StreamsConfig) or just `>= 0.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch indeed! I agree with @ableegoldman about the renaming. I am in favour of activeRunning or activeAndRunning.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please!

@vvcephei
Copy link
Copy Markdown
Contributor Author

Thanks for the reviews, @ableegoldman and @cadonna ! I believe I've addressed all your feedback.

@vvcephei
Copy link
Copy Markdown
Contributor Author

This java 14 failure is strange:

00:06:34.561 > Task :streams:compileTestJava
00:06:34.561 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk14-scala2.13/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java:495: error: incompatible types: inference variable K#1 has incompatible bounds
00:06:34.561             verify(
00:06:34.562                   ^
00:06:34.562     equality constraints: V#1,V#1,V#1,V#1,V#1,V#1,V#1,K#2,K#3,String,K#1,K#1,K#1,K#1,K#1,K#1
00:06:34.562     lower bounds: Long,String
00:06:34.562   where K#1,V#1,K#2,V#2,K#3,V#3 are type-variables:
00:06:34.562     K#1 extends Object declared in class KeyValueTimestamp
00:06:34.562     V#1 extends Object declared in class KeyValueTimestamp
00:06:34.562     K#2 extends Object declared in method <K#2,V#2>verify(List<TestRecord<K#2,V#2>>,List<KeyValueTimestamp<K#2,V#2>>)
00:06:34.711     V#2 extends Object declared in method <K#2,V#2>verify(List<TestRecord<K#2,V#2>>,List<KeyValueTimestamp<K#2,V#2>>)
00:06:34.711     K#3 extends Object declared in method <K#3,V#3>drainProducerRecords(TopologyTestDriver,String,Deserializer<K#3>,Deserializer<V#3>)
00:06:34.711     V#3 extends Object declared in method <K#3,V#3>drainProducerRecords(TopologyTestDriver,String,Deserializer<K#3>,Deserializer<V#3>)
00:06:40.558 1 error

I'm going to try again, and see what we see.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Retest this please

1 similar comment
@vvcephei
Copy link
Copy Markdown
Contributor Author

Retest this please

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment got lost in a discussion thread, but can we add a note to AssignorConfiguration pointing out that any Streams configs added here will need to be explicitly passed through? It seems like it's too easy to fall into this same trap again

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. Sure thing!

@vvcephei
Copy link
Copy Markdown
Contributor Author

The test failures were the result of extra tests that were added on trunk after this branch. The branch builder does a merge with trunk before running the tests. I'm rebasing and fixing the tests.

@vvcephei
Copy link
Copy Markdown
Contributor Author

The only failures were:
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment on the TaskAssignorIntegrationTest but feel free to merge as-is

public static final String TIME = "__time__";

// This is settable in the main Streams config, but it's a private API for testing
public static final String ASSIGNMENT_LISTENER = "__asignment.listener__";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♀️

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a separate PR to remove the extra timeout config

mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also validate that the task assignor gets passed through? We could even pass in a custom assignor and use that to verify the correct assignor configs got passed through.

Of course, reflection black magic-ry is just more fun 🙂

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea; I'm not sure why I didn't think to do this already. With my newfound understanding of this config translation logic, I'm confident that it gets copied over right now, because it's not a registered config, but a regression test would be nice.

I'll quickly follow up with a separate PR so that I can merge this one.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks for the fix!

@vvcephei vvcephei merged commit 2cff1fa into apache:trunk May 27, 2020
@vvcephei vvcephei deleted the kafka-6145-one-shot-balance branch May 27, 2020 18:50
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants