Skip to content

KAFKA-6145: KIP-441: Enforce Standby Task Stickiness#8696

Merged
vvcephei merged 1 commit intoapache:trunkfrom
vvcephei:kafka-6145-standby-stickiness
May 22, 2020
Merged

KAFKA-6145: KIP-441: Enforce Standby Task Stickiness#8696
vvcephei merged 1 commit intoapache:trunkfrom
vvcephei:kafka-6145-standby-stickiness

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

We should treat standbys similarly to active stateful tasks and
re-assign them to instances that are already caught-up on them
while we warm them up on the desired destination, instead of
immediately moving them to the destination.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

We should treat standbys similarly to active stateful tasks and
re-assign them to instances that are already caught-up on them
while we warm them up on the desired destination, instead of
immediately moving them to the destination.
Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ableegoldman and @cadonna , what do you think about this?


final boolean probingRebalanceNeeded = assignTaskMovements(
tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag),
final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the counter out here because we need to decrement it while assigning both active and standby warmups

Comment on lines +84 to +88
final int neededStandbyTaskMovements = assignStandbyTaskMovements(
tasksToCaughtUpClients,
clientStates,
remainingWarmupReplicas,
warmups
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mechanism by which we enforce "stickiness" is by assigning movements after computing the ideal assignment, so if we want standbys as well as actives to be sticky, we need to assign movements for those as well.

assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));

// We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
// due to being configured for no warmups.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One might wonder whether we should even allow "max_warmups := 0". I think this is actually ok, as someone might want to completely disable this state shuffling mechanism and instead just be 100% sticky. Also factoring into my thinking is that it's pretty obvious what will happen if you configure "no warmups", so I don't think it's going to hurt someone who didn't actually want to completely disable the warmup mechanism.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, according to StreamsConfig, we do NOT allow max.warmup.replicas = 0. It must at least be 1. Or was your statement hypothetical, that it would be OK to allow it? Anyway, I am in favour of keeping the > 0 check here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've just realized this, too. And upon second consideration, I don't think the warmup=0 really provides a good mechanism for what I was thinking of. I think we'd better leave it as "at least one". Thanks!

static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
final Map<UUID, ClientState> clientStates,
final int maxWarmupReplicas) {
static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to return an int just because it made stepping through the assignment in the debugger a bit easier to understand. It serves no algorithmic purpose.

return movementsNeeded;
}

static int assignStandbyTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This algorithm is similar to the active one, but there are also important differences, so I didn't converge them.

Comment on lines +329 to +331
static Matcher<ClientState> hasAssignedTasks(final int taskCount) {
return hasProperty("assignedTasks", ClientState::assignedTaskCount, taskCount);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the other matchers, it just gives us mildly nicer test output.

);

@Test
public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First test for stickiness: we should be 100% sticky and also not schedule a probing rebalance when we are configured for no warmups.

}

@Test
public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main test case for stickiness: we should be sticky for standbys, and also schedule warmups.

}
if (!harness.clientStates.isEmpty()) {
testForConvergence(harness, configs, numStatefulTasks * 2);
testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're warming up standbys also, we need to relax the convergence limit.

new TreeMap<>(),
new AtomicInteger(maxWarmupReplicas)
),
is(0)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just accommodating the new method signature, no semantic changes.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vvcephei, Thank you for the PR.

Here my feeddback.

for (final TaskId task : state.standbyTasks()) {
if (warmups.getOrDefault(destination, Collections.emptySet()).contains(task)) {
// this is a warmup, so we won't move it.
} else if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, tasksToCaughtUpClients)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: Could you add a method taskIsNotCaughtUpOnClientAndCaughtUpClientsExist()? Applying De Morgan's law every time I read this code gives me headache.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, sure :)

assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));

// We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
// due to being configured for no warmups.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, according to StreamsConfig, we do NOT allow max.warmup.replicas = 0. It must at least be 1. Or was your statement hypothetical, that it would be OK to allow it? Anyway, I am in favour of keeping the > 0 check here.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Hey @cadonna , thanks for the feedback! I think I'll go ahead and merge this because I've already fixed the "max warmup" thing in a separate bugfix PR, which builds on this one.

I'll apply your DeMorgan's law suggestion also in the follow-up.

@vvcephei vvcephei merged commit 9fdd877 into apache:trunk May 22, 2020
@vvcephei vvcephei deleted the kafka-6145-standby-stickiness branch May 22, 2020 16:41
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request May 24, 2020
* 'trunk' of github.com:apache/kafka:
  KAFKA-9888: Copy connector configs before passing to REST extensions (apache#8511)
  KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations (apache#8654)
  KAFKA-6145: Add unit tests for assignments of only stateless tasks (apache#8713)
  MINOR: Fix join group request timeout lower bound (apache#8702)
  MINOR: Improve security documentation for Kafka Streams apache#8710
  KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (apache#8696)
  KAFKA-10003: Mark KStream.through() as deprecated and update Scala API (apache#8679)
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants