Skip to content

KAFKA-6145: KIP-441: Add TaskAssignor class config#8541

Merged
vvcephei merged 12 commits intoapache:trunkfrom
vvcephei:kafka-6145-task-assignor-flag
Apr 28, 2020
Merged

KAFKA-6145: KIP-441: Add TaskAssignor class config#8541
vvcephei merged 12 commits intoapache:trunkfrom
vvcephei:kafka-6145-task-assignor-flag

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

  • add a config to set the TaskAssignor
  • set the default assignor to HighAvailabilityTaskAssignor
  • fix broken tests

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

* add a config to set the TaskAssignor
* set the default assignor to HighAvailabilityTaskAssignor
* fix broken tests
Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ableegoldman @cadonna ,

Do you mind taking a look at this? The scope wound up creeping a bit, since I decided to go ahead and make the integration tests pass while swapping in the new assignor (since there were so few that were failing).

If it's too big to review, I can try to pull out most of the test fixes into a preliminary PR, since they generally take the form of removing assumptions.

Comment thread build.gradle Outdated
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessary because the test name that JUnit generates for the parameterized StreamsPartitionAssignorTest is slightly too long. I have no way to shorten it because the thing that pushes it over is the fact that there are two package names in the parameterized method name, and there's no control over the format of the test name itself. So, I decided just to truncate the file name instead, which is almost certainly still unique for pretty much any test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only alternative I can think of is to parameterize the "short name" of the TaskAssignor, which seems kind of wacky.

Also, worth noting the impact of truncation is nothing if the file name is still unique. If the name is shared between two tests, then the impact is still nothing if both tests pass. The only observable effect is that if one or both tests fail, their logs would get combined. It seems like we can afford just to defer this problem until it happens, if ever.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been wanting this for a while, so I just decided to add it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Please add unit tests for this method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pointless unless we evaluate it inside the lambda.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify everyone's roles, I added a new assignor whose only behavior is to return all previously owned tasks, and then assign any unowned tasks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this and other tests from StreamsPartitionAssignorTest that had been guarded to only actually run when parameterized with "high availability"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the AndHighAvailabiltiyEnabled suffix from the test name? And/or just generally shorten it if you have a better idea

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why I moved these methods to their own test, so that this class can focus on verifying behavior that is invariant with respect to the parameterized assignor.

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a bunch of changes to this test, because it was pretty brittle with respect to changes in the HighAvailabilityTaskAssignor. For context, this is the second time I've touched the assignment code since we introduced the HATA, and it's the second time I've had to deal with irrelevant test failures in this class.

First, I replaced the ClientState mocks with "real" ClientStates, constructed to represent the desired scenario for each test. Mocks are really more appropriate for isolating a component from external components (like mocking a remote service). Mocking data types leads to verifying that a specific set of queries happens against the data type, which is likely to break any time the logic under test changes in any way. Another problem with data-type mocks is that they can violate the invariants of the data type itself. For example, you can mock a list that both isEmpty and contains items. In our case, we threw NPEs in the assignor that could never happen in production when the mocked assigned/standby tasks didn't agree with the assigned tasks or the stateful assigned tasks weren't mocked to agree with the lags. Now, we just construct a ClientState for each client, representing the desired scenario and make assertions on the resulting assignment.

Second, the tests as written rely heavily on shared mutable fields inserted into shared mutable collections to build the assignor. This can be a good way to minimize the text inside the test method, which lets readers focus on the proper logic of the test itself. However, it makes it harder to understand the full context of a test, and it also raises the possibility of tests polluting each others' environments. Since in this particular case, localizing all the setup code is about as compact as factoring it out, I went ahead and minimized the shared fields, and eliminated the mutability, the tests are self-contained.


assertFalse(taskAssignor.previousAssignmentIsValid());
@Test
public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These first tests are really the only ones to change. They're still asserting the same basic fact, but previousAssignmentIsValid is now an internal method, so we instead make assertions about the black-box semantics of the assignor, instead of a specific "visible for testing" method.

private final Map<UUID, ClientState> clients = new TreeMap<>();

@Test
public void shouldViolateBalanceToPreserveActiveTaskStickiness() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I added the new assignor, I added a regression test to verify its most important special function. Most of its validity is verified by the parameterized StreamsPartitionAssignorTest.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the first few comments, will finish up going through the testing code later. But thanks for the PR!

public final class AssignorConfiguration {
public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = "internal.high.availability.enabled";
private final boolean highAvailabilityEnabled;
public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put this with the other Streams internal configs? And/or follow the pattern of prefix+suffixing with __ ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, good idea.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I moved it to org.apache.kafka.streams.StreamsConfig.InternalConfig#INTERNAL_TASK_ASSIGNOR_CLASS. I made an ad-hoc decision not to add the underscores, though, because this config is different than the other internal configs. I added comments to InternalConfig to explain the difference.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all for useful logging 👍

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvement, this feels a lot nicer

@vvcephei
Copy link
Copy Markdown
Contributor Author

@ableegoldman
Copy link
Copy Markdown
Member

29 system tests failed

Expected, but still upsetting 🙀

@vvcephei
Copy link
Copy Markdown
Contributor Author

29 system tests failed

Expected, but still upsetting scream_cat

I should have said "130 system tests passed"

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ableegoldman , do you mind making another pass? I'm going to kick off the system tests again, but I've run the failing ones locally, and they pass now.

Comment on lines +152 to +154
// this test relies on the second instance getting the standby, so we specify
// an assignor with this contract.
props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, PriorTaskAssignor.class.getName());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a TODO. I'm planning to leave the test like this. (Just opening the floor for objections)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to take advantage of the now-pluggable assignor and write a test utility assignor that allows you to specify the assignment you want and the validate the inputs that you get. Obviously beyond the scope of the current PR, just a thought I had. Using the PriorTaskAssignor seems like the next best thing so 👍

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I had a similar thought, just ran out of motivation after debugging the integration tests.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very understandable

Comment on lines +482 to +483
def set_config(self, key, value):
self.extra_properties[key] = value
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this as a general mechanism in a couple of places to pass specific configs into Streams, so we don't have to make new constructors for every different parameterization.

Comment on lines +571 to +572
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
properties['internal.task.assignor.class'] = "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These will become follow-on tasks to fix each test. Thankfully, there aren't many.

# Start test harness
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This accounted for most of the test failures, and it's already fixed on trunk.

Comment on lines +147 to +151
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
extra_configs=",application.id=shutdown_with_broker_down" +
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one already had a different mechanism to add more configs, so I just left it alone.

@vvcephei
Copy link
Copy Markdown
Contributor Author

@vvcephei
Copy link
Copy Markdown
Contributor Author

Looks like the Jenkins agents are all saturated. I'll try again later to give them some breathing room.

@vvcephei
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei Thank you for the PR!

Here my feedback

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Please add unit tests for this method

vvcephei and others added 2 commits April 27, 2020 16:26
…nals/assignment/StickyTaskAssignorTest.java

Co-Authored-By: Bruno Cadonna <bruno@confluent.io>
…nals/assignment/TaskAssignorConvergenceTest.java

Co-Authored-By: Bruno Cadonna <bruno@confluent.io>
@vvcephei
Copy link
Copy Markdown
Contributor Author

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one extra change besides the code review in the last revision.

Comment on lines +26 to +35
/**
* A special task assignor implementation to be used as a fallback in case the
* configured assignor couldn't be invoked.
*
* Specifically, this assignor must:
* 1. ignore the task lags in the ClientState map
* 2. always return true, indicating that a follow-up rebalance is needed
*/
public class FallbackPriorTaskAssignor implements TaskAssignor {
private final StickyTaskAssignor delegate;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the PriorTaskAssignor and added a Javadoc to make its role clear.

Note that "PriorTaskAssignor" would be an appropriate behavioral name, except that it also always returns "true", and that it must ignore the lags, which is what makes it a "fallback" assignor here.

final boolean followupRebalanceNeeded = taskAssignor.assign();
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);

final boolean followupRebalanceNeeded = taskAssignor.assign(clientStates,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably rename this to probingRebalanceRequired or so on, see comment on FallbackPriorTaskAssignor

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems legit.

*
* Specifically, this assignor must:
* 1. ignore the task lags in the ClientState map
* 2. always return true, indicating that a follow-up rebalance is needed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning true here will schedule a followup rebalance at the probing interval, but we also schedule a followup rebalance immediately before instantiating this assignor (line 735). Is this intentional? IIUC your proposal was to trigger a followup rebalance right away, which we do by means of the assignment error code.

Of course, this is in memory so if the instance crashes and restarts we lose this information. I think we should actually avoid using the REBALANCE_NEEDED error code inside the assign method, and only allow. it during onAssignment. If we know that a followup rebalance is needed during assign we should just encode the nextScheduledRebalance with the current time

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a good point. Actually, I overlooked line 735. I'll remove that one.

My proposal actually was to just wait for the probing rebalance interval in case the lag computation failed. It seems like this should be ok, since Streams will still make progress in the mean time, and it avoids the pathological case where we could just constantly rebalance if the end-offsets API is down for some reason.

clientStates = singletonMap(UUID_1, client1);
createTaskAssignor();

assertFalse(taskAssignor.previousAssignmentIsValid());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: I think, you can now restrict access to previousAssignmentIsValid() to private.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just remove it completely 😉

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have a follow-on PR that touches this method, I'll leave it alone and just proceed to merge. We should consider both of these options in the follow-on.

Thanks!

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks John!

@vvcephei vvcephei merged commit 5bb3415 into apache:trunk Apr 28, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Apr 30, 2020
…/master`

* apache-github/trunk: (45 commits)
  MINOR: Fix broken JMX link in docs by adding missing starting double quote (apache#8587)
  KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219 (apache#8567)
  KAFKA-9922: Update demo instructions in examples README (apache#8559)
  KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (apache#8442)
  KAFKA-9875: Make integration tests more resilient (apache#8578)
  KAFKA-9932: Don't load configs from ZK when the log has already been loaded (apache#8582)
  KAFKA-9925: decorate pseudo-topics with app id (apache#8574)
  KAFKA-9832: fix attempt to commit non-running tasks (apache#8580)
  KAFKA-9127: don't create StreamThreads for global-only topology (apache#8540)
  MINOR: add support for kafka 2.4 and 2.5 to downgrade test
  KAFKA-9176: Retry on getting local stores from KafkaStreams (apache#8568)
  KAFKA-9823: Follow-up, check state for handling commit error response (apache#8548)
  KAFKA-6145: KIP-441: Add TaskAssignor class config (apache#8541)
  MINOR: Fix partition numbering from 0 to P-1 instead of P in docs (apache#8572)
  KAFKA-9921: disable caching on stores configured to retain duplicates (apache#8564)
  Minor: remove redundant check in auto preferred leader election (apache#8566)
  MINOR: Update the link to the Raft paper in docs (apache#8560)
  MINOR: Fix typos in config properties in MM2 test (apache#8561)
  MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters. (apache#7982)
  MINOR: document how to escape json parameters to ducktape tests (apache#8546)
  ...
ijuma added a commit to confluentinc/kafka that referenced this pull request Apr 30, 2020
There was a minor conflict in gradle.properties because the default
Scala version changed upstream to Scala 2.13. I kept the upstream
change.

Related to this, I have updated Jenkinsfile to compile and validate
with Scala 2.12 in a separate stage so that we ensure we maintain
compatibility. Unlike Apache Kafka, we only run the tests with the
default Scala version, which is now 2.13.

* apache-github/trunk: (45 commits)
MINOR: Fix broken JMX link in docs by adding missing starting double
quote (apache#8587)
KAFKA-9652: Fix throttle metric in RequestChannel and request log due
to KIP-219 (apache#8567)
  KAFKA-9922: Update demo instructions in examples README (apache#8559)
KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses
(apache#8442)
  KAFKA-9875: Make integration tests more resilient (apache#8578)
KAFKA-9932: Don't load configs from ZK when the log has already been
loaded (apache#8582)
  KAFKA-9925: decorate pseudo-topics with app id (apache#8574)
  KAFKA-9832: fix attempt to commit non-running tasks (apache#8580)
KAFKA-9127: don't create StreamThreads for global-only topology
(apache#8540)
  MINOR: add support for kafka 2.4 and 2.5 to downgrade test
  KAFKA-9176: Retry on getting local stores from KafkaStreams (apache#8568)
KAFKA-9823: Follow-up, check state for handling commit error response
(apache#8548)
  KAFKA-6145: KIP-441: Add TaskAssignor class config (apache#8541)
MINOR: Fix partition numbering from 0 to P-1 instead of P in docs
(apache#8572)
KAFKA-9921: disable caching on stores configured to retain duplicates
(apache#8564)
Minor: remove redundant check in auto preferred leader election
(apache#8566)
  MINOR: Update the link to the Raft paper in docs (apache#8560)
  MINOR: Fix typos in config properties in MM2 test (apache#8561)
MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters.
(apache#7982)
MINOR: document how to escape json parameters to ducktape tests
(apache#8546)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants