KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector#10367
KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector#10367showuon wants to merge 4 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
change (1): remove unused canRevoke
There was a problem hiding this comment.
This is not used anywhere else, so delete it.
There was a problem hiding this comment.
change (2): compute the current worker assignment excluding deletions and duplicated assignments. If after excluding deletion and duplicated assignments, there are still workers have assignment higher than totalTasksWeHave / totalWorkers, we still need to revoke more tasks.
There was a problem hiding this comment.
change (3): pass configured (total tasks we have), and currentWorkerAssignmentWithoutDuplication into performTaskRevocation method.
There was a problem hiding this comment.
Can you clarify why this change is necessary? I ran the new testTaskAssignmentWhenWorkerJoinAfterRevocation test case with and without it, and although it fails without this change, it looks like that's more due to frail testing logic with the assertAssignment method than an actual bug in the rebalancing logic here. If I remove the assertAssignment calls but manually check on the distribution of C/T across the cluster after the fifth rebalance, everything is balanced.
I've also produced a test case that fails with this change but succeeds without it:
@Test
public void testNewWorkerAndNewTasksInSameRound() {
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
// Start with 40 tasks
configState = clusterConfigState(offset, 1, 40);
when(coordinator.configSnapshot()).thenReturn(configState);
// Start with three workers
memberConfigs = memberConfigs(leader, offset, 0, 2);
expectGeneration();
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertNoReassignments(memberConfigs, expectedMemberConfigs);
applyAssignments(returnedAssignments);
memberConfigs = memberConfigs(leader, offset, assignments);
// Add 2 tasks
configState = clusterConfigState(offset, 1, 42);
when(coordinator.configSnapshot()).thenReturn(configState);
// Add a worker
memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
expectGeneration();
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertNoReassignments(memberConfigs, expectedMemberConfigs);
applyAssignments(returnedAssignments);
memberConfigs = memberConfigs(leader, offset, assignments);
// Rebalance once more as a follow-up to task revocation
expectGeneration();
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertNoReassignments(memberConfigs, expectedMemberConfigs);
applyAssignments(returnedAssignments);
memberConfigs = memberConfigs(leader, offset, assignments);
assertBalancedAssignments(memberConfigs);
verify(coordinator, times(rebalanceNum)).configSnapshot();
verify(coordinator, times(rebalanceNum)).leaderState(any());
verify(coordinator, times(2 * rebalanceNum)).generationId();
verify(coordinator, times(rebalanceNum)).memberId();
verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
}
private void assertBalancedAssignments(Map<String, ExtendedWorkerState> existingAssignments) {
List<Integer> connectorCounts = existingAssignments.values().stream()
.map(e -> e.assignment().connectors().size())
.sorted()
.collect(Collectors.toList());
List<Integer> taskCounts = existingAssignments.values().stream()
.map(e -> e.assignment().tasks().size())
.sorted()
.collect(Collectors.toList());
int minConnectors = connectorCounts.get(0);
int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);
int minTasks = taskCounts.get(0);
int maxTasks = taskCounts.get(taskCounts.size() - 1);
assertTrue(
"Assignments are imbalanced. The spread of connectors across each worker is: " + connectorCounts,
maxConnectors - minConnectors <= 1
);
assertTrue(
"Assignments are imbalanced. The spread of tasks across each worker is: " + taskCounts,
maxTasks - minTasks <= 1
);
}There was a problem hiding this comment.
As the comment said, this else block means scheduledRebalance == 0, so we don't need to log the scheduledRebalance and now value
There was a problem hiding this comment.
This revocation is unnecessary because we revoked the connecor1 and all of the tasks in connector1 in previous round, so when entering this round, the assignment is:
W1: connectors: [C0], tasks: [T0-0, T0-1, T0-2, T0-3]
W2: connectors: [], tasks: []
We can just assign the connector1 and 4 of his tasks into W2, and complete the rebalance.
However, before my change, in performTasksRevocation method, we use activeAssignment as the total tasks, so, we'll get the avg number of tasks each worker can have = 4 (total active tasks) / 2 (total worker) = 2, then revoke 2 tasks, and then assign to 2 workers in next round.
After my change, the avg number of tasks each worker can have will be: 8 (total tasks) / 2 (total workers) = 4, so no tasks will be revoked.
There was a problem hiding this comment.
Same as above comments. Before my change, we revoke 8 tasks in 1st round, and then 2 tasks in next round. After my change, we revoke total 10 tasks in 1 round.
…gnor in connector
| Collection<WorkerLoad> completeWorkerAssignmentWithoutDuplication) { | ||
| int totalConnectorsNum = allConnectorsAndTasks.connectors().size(); | ||
| int totalTasksNum = allConnectorsAndTasks.tasks().size(); | ||
| Collection<WorkerLoad> existingWorkers = completeWorkerAssignmentWithoutDuplication.stream() |
There was a problem hiding this comment.
Use allConnectorsAndTasks (configured) to compute the total connectors/tasks number, and use the completeWorkerAssignmentWithoutDuplication to compute the existing workers. So it'll always compute the correct expected connector/task number for each worker
There was a problem hiding this comment.
change (4): improve test readability by adding the final assignment for each phase of rebalance. It'll let other devs/users better understanding how the tests go and how the algorithm works.
|
@kkonstantine @rhauch @ramesh-muthusamy , could you help review this PR to fix the uneven distribution in incremental cooperative assignor in connector? Thanks. |
…arding multiple worker joins during consecutive rebalance 2. Extended IncrementalCooperativeAssignor to IncrementalCooperativeAPMAssignor. Added appropriate protocols etc. to be able to use the new assignor 3. Currently it refers to the same code as assignment logic is yet to be changed
|
@kkonstantine , could you please check this PR when available? Thank you. |
|
@kkonstantine , could you check this PR? Or I should find another guy to review this PR since it's been 3 months? Thanks. |
|
@kkonstantine , I suddenly found this is a V3.0 blocker bug. Could you help take a look. Thanks. |
|
@kkonstantine , call for review and comments. Thanks. |
There was a problem hiding this comment.
Thanks @showuon, and sorry that this hasn't gotten more attention sooner.
This is my first time going through this part of the code base in detail and it's taken a bit longer than expected to get up to speed here. I've left some comments on specific parts of the changes here, and I have a few other general thoughts that have come up while getting acquainted with this logic that aren't directly related to your PR but that I think might be worth discussing since we're in the neighborhood. If you'd prefer to keep things as focused as possible feel free to ignore :)
-
Might it make sense to change the order of events so that we assign new connectors and tasks first, as evenly as possible, and only perform a revocation afterward if still necessary? In practice I don't think this will make a difference very often (would require the number of workers and the set of currently-configured C/T in a cluster to change in the same round of rebalance, I think), but it may provide benefit in clusters with frequent churn. Covered by KAFKA-13764.
-
If the number of tasks assigned to a worker decreases to the point where the cluster becomes imbalanced, will we ever revoke tasks from other workers in order to assign them to that worker and balance the cluster? It looks like
performTaskRevocationonly does anything if the number of workers in the cluster has changed; should we consider updating or refining that logic? I can imagine a case where there areWworkers in a cluster andCconnectors running in that cluster, each withWtasks. If the assignment of tasks across that cluster is in perfect round-robin fashion, then for each connector, its final task will be running on the same worker (workerw); if each of those connectors is then reconfigured to useW-1tasks, that would lead to workerwnow havingCfewer tasks running on it, which could lead to an imbalanced cluster. Of course this particular example may be too specific to be practical, but the general concept of being able to respond to changes in connector size over time while preserving balanced allocation seems worth considering. Covered by KAFKA-13764. -
It looks like we're explicitly revoking deleted C/T during rebalance, and that this is the only mechanism by which workers in the cluster learn that they should stop running those deleted C/T. This seems a little strange, and potentially unsafe. If a worker misses a rebalance and then rejoins the group without having already revoked its C/T, it appears that there's no check in place right now to revoke C/T from that worker that have been deleted in the meantime, since the set of deleted C/T is derived by taking the diff of the previous assignment made by the leader and the set of C/T in the latest view of the config topic. The information about which C/T should be running across the entire cluster is already consistently available to every worker in the cluster after a rebalance by distributing an offset in the config topic that every worker should have read up to--should we add a check in theI also think there's a potential case where a worker might lose contact with the config topic for long enough that a connector deletion (which is recorded by a tombstone message in the config topic) ends up being missed by the worker if topic compaction takes place and that tombstone (and all preceding records with the same key) is dropped from the config topic before the worker is able to resume reading from it. EDIT: It looks like there actually is logic in theDistributedHerderto ensure that every C/T it's running after a rebalance is still present in its view of the config topic? We don't necessarily have to do this instead of explicitly revoking deleted C/T during rebalance, it'd probably be safer to just do both if we decide to add this check at all just to avoid increasing the risk of a regression.DistributedHerderclass to pick up on connector deletions from the config topic and apply them directly, which may be redundant (see KAFKA-13631 for more detail). However, this check may still not be sufficient to catch dropped tombstones from the config topic. -
I had a pretty hard time reading and understanding the new test case you introduced, although I think you wrote it as clearly and concisely as possible while following the patterns of existing
IncrementalCooperativeAssignorTesttest cases. The comments you added certainly helped, but there's also a ton of duplicated code that could be simplified into testing utility functions, and some variable names that are at best misleading and at worst inaccurate. Since you've taken the initiative to improve the readability of these tests in this PR with your comments, what do you think about refactoring some of the testing logic as well? I have a local draft that I'd be happy to share if you'd be interested in adding it directly to this PR, or if you think it's worth pursuing but out of scope, I can file a Jira and separate PR. Covered by KAFKA-13764. -
I think there's a bug in this line; shouldn't we be combining the two maps (like we do here) instead of potentially overwriting the contents of one with the other via
Map::putAll? Obviously this isn't caused by your change but it's simple enough it felt worth pointing out. Covered by KAFKA-13764.
| * @param completeWorkerAssignment | ||
| * @return | ||
| * @param allConnectorsAndTasks all the connectors and tasks we need to distribute | ||
| * @param completeWorkerAssignmentWithoutDuplication current workers assignment without duplication |
There was a problem hiding this comment.
I think the existing variable name is fine; we can definitely update the Javadoc to clarify that this assignment should exclude duplicated and to-be-deleted C/T, but something this long is a little hard to read.
| // W1: assignedTasks:[], assignedTasks:[], | ||
| // revokedConnectors:[], revokedTasks:[T0-3] | ||
| // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1] | ||
| // revokedConnectors:[] revokedTasks:[] | ||
| // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3] | ||
| // revokedConnectors:[] revokedTasks:[] |
There was a problem hiding this comment.
| // W1: assignedTasks:[], assignedTasks:[], | |
| // revokedConnectors:[], revokedTasks:[T0-3] | |
| // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W1: assignedConnectors:[], assignedTasks:[], | |
| // revokedConnectors:[], revokedTasks:[T0-3] | |
| // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W3: assignedConnectors:[], assignedTasks:[T1-2, T1-3] | |
| // revokedConnectors:[] revokedTasks:[] |
| // W1: assignedTasks:[], assignedTasks:[], | ||
| // revokedConnectors:[], revokedTasks:[T0-2] | ||
| // W2: assignedTasks:[], assignedTasks:[] | ||
| // revokedConnectors:[] revokedTasks:[] | ||
| // W3: assignedTasks:[], assignedTasks:[] | ||
| // revokedConnectors:[] revokedTasks:[] | ||
| // W4: assignedTasks:[], assignedTasks:[T0-3] | ||
| // revokedConnectors:[] revokedTasks:[] |
There was a problem hiding this comment.
| // W1: assignedTasks:[], assignedTasks:[], | |
| // revokedConnectors:[], revokedTasks:[T0-2] | |
| // W2: assignedTasks:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W3: assignedTasks:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W4: assignedTasks:[], assignedTasks:[T0-3] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W1: assignedConnectors:[], assignedTasks:[], | |
| // revokedConnectors:[], revokedTasks:[T0-2] | |
| // W2: assignedConnectors:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W3: assignedConnectors:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W4: assignedConnectors:[], assignedTasks:[T0-3] | |
| // revokedConnectors:[] revokedTasks:[] |
| // W1: assignedTasks:[], assignedTasks:[], | ||
| // revokedConnectors:[], revokedTasks:[] | ||
| // W2: assignedTasks:[], assignedTasks:[] | ||
| // revokedConnectors:[] revokedTasks:[] | ||
| // W3: assignedTasks:[], assignedTasks:[] | ||
| // revokedConnectors:[] revokedTasks:[] | ||
| // W4: assignedTasks:[], assignedTasks:[T0-2] | ||
| // revokedConnectors:[] revokedTasks:[] |
There was a problem hiding this comment.
| // W1: assignedTasks:[], assignedTasks:[], | |
| // revokedConnectors:[], revokedTasks:[] | |
| // W2: assignedTasks:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W3: assignedTasks:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W4: assignedTasks:[], assignedTasks:[T0-2] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W1: assignedConnectors:[], assignedTasks:[], | |
| // revokedConnectors:[], revokedTasks:[] | |
| // W2: assignedConnectors:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W3: assignedConnectors:[], assignedTasks:[] | |
| // revokedConnectors:[] revokedTasks:[] | |
| // W4: assignedConnectors:[], assignedTasks:[T0-2] | |
| // revokedConnectors:[] revokedTasks:[] |
| assertDelay(0, returnedAssignments); | ||
| expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); | ||
| assertNoReassignments(memberConfigs, expectedMemberConfigs); | ||
| assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3"); |
There was a problem hiding this comment.
This assertion style is useful for straightforward test cases but I wonder if we might want something more granular that allows us to assert how many C/T were assigned/revoked from individual workers (instead of across the entire cluster) for cases like this? Or, if that's difficult because of non-deterministic behavior caused by things like Java collections with undefined iteration order, could we at least have something that asserts how many workers should have a given total count of C/T in the cluster (e.g., "assert that 3 workers have 2 connectors assigned to them and 4 tasks, and that 1 worker has 1 connector assigned to it and 3 tasks") or how many workers were assigned/revoked a given number of C/T during the rebalance (e.g., "assert that 2 workers were assigned 2 tasks and revoked 3 tasks, and that 1 worker was assigned 0 tasks and revoked 4 tasks")?
The comments are useful for illustrating what the expectations are on that front, but they aren't testable and so there's no guarantee that they're actually correct. And in fact, after running this through a debugger, I was seeing the correct number of C/T being allocated/revoked during each round, but the actual C/T names (i.e., T0-0 vs T1-1) were different from what's described in the comments.
| assertDelay(0, returnedAssignments); | ||
| expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); | ||
| assertNoReassignments(memberConfigs, expectedMemberConfigs); | ||
| assertAssignment(0, 1, 0, 0, "worker1", "worker2", "worker3", "worker4"); |
There was a problem hiding this comment.
By this point, we should be completely balanced, but there's no explicit testing logic to verify that. What do you think about adding a utility method like assertBalancedAssignments and then invoking it here (and possibly other places in this test suite)?
private void assertBalancedAssignments(Map<String, ExtendedWorkerState> existingAssignments) {
List<Integer> connectorCounts = existingAssignments.values().stream()
.map(e -> e.assignment().connectors().size())
.sorted()
.collect(Collectors.toList());
List<Integer> taskCounts = existingAssignments.values().stream()
.map(e -> e.assignment().tasks().size())
.sorted()
.collect(Collectors.toList());
int minConnectors = connectorCounts.get(0);
int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);
int minTasks = taskCounts.get(0);
int maxTasks = taskCounts.get(taskCounts.size() - 1);
assertTrue(
"Assignments are imbalanced. The spread of connectors across each worker is: " + connectorCounts,
maxConnectors - minConnectors <= 1
);
assertTrue(
"Assignments are imbalanced. The spread of tasks across each worker is: " + taskCounts,
maxTasks - minTasks <= 1
);
}It might also make these tests easier to write and modify (if we need to tweak rebalancing logic again in the future) if we used this type of method instead of the existing assertAssignment one, since in many cases all that really matters is that we achieve a balanced allocation after a specific series of rebalances, instead of exactly how many C/T were assigned/revoked in the interim.
There was a problem hiding this comment.
Can you clarify why this change is necessary? I ran the new testTaskAssignmentWhenWorkerJoinAfterRevocation test case with and without it, and although it fails without this change, it looks like that's more due to frail testing logic with the assertAssignment method than an actual bug in the rebalancing logic here. If I remove the assertAssignment calls but manually check on the distribution of C/T across the cluster after the fifth rebalance, everything is balanced.
I've also produced a test case that fails with this change but succeeds without it:
@Test
public void testNewWorkerAndNewTasksInSameRound() {
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
// Start with 40 tasks
configState = clusterConfigState(offset, 1, 40);
when(coordinator.configSnapshot()).thenReturn(configState);
// Start with three workers
memberConfigs = memberConfigs(leader, offset, 0, 2);
expectGeneration();
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertNoReassignments(memberConfigs, expectedMemberConfigs);
applyAssignments(returnedAssignments);
memberConfigs = memberConfigs(leader, offset, assignments);
// Add 2 tasks
configState = clusterConfigState(offset, 1, 42);
when(coordinator.configSnapshot()).thenReturn(configState);
// Add a worker
memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
expectGeneration();
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertNoReassignments(memberConfigs, expectedMemberConfigs);
applyAssignments(returnedAssignments);
memberConfigs = memberConfigs(leader, offset, assignments);
// Rebalance once more as a follow-up to task revocation
expectGeneration();
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertNoReassignments(memberConfigs, expectedMemberConfigs);
applyAssignments(returnedAssignments);
memberConfigs = memberConfigs(leader, offset, assignments);
assertBalancedAssignments(memberConfigs);
verify(coordinator, times(rebalanceNum)).configSnapshot();
verify(coordinator, times(rebalanceNum)).leaderState(any());
verify(coordinator, times(2 * rebalanceNum)).generationId();
verify(coordinator, times(rebalanceNum)).memberId();
verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
}
private void assertBalancedAssignments(Map<String, ExtendedWorkerState> existingAssignments) {
List<Integer> connectorCounts = existingAssignments.values().stream()
.map(e -> e.assignment().connectors().size())
.sorted()
.collect(Collectors.toList());
List<Integer> taskCounts = existingAssignments.values().stream()
.map(e -> e.assignment().tasks().size())
.sorted()
.collect(Collectors.toList());
int minConnectors = connectorCounts.get(0);
int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);
int minTasks = taskCounts.get(0);
int maxTasks = taskCounts.get(taskCounts.size() - 1);
assertTrue(
"Assignments are imbalanced. The spread of connectors across each worker is: " + connectorCounts,
maxConnectors - minConnectors <= 1
);
assertTrue(
"Assignments are imbalanced. The spread of tasks across each worker is: " + taskCounts,
maxTasks - minTasks <= 1
);
}|
@C0urante , thanks for your comments. TBH, I need some time to revisit the code (since it's long time ago...), and answering your comments later. Thank you. |
|
Thanks @showuon. In that case, I can file separate issues for a lot of the comments I've made here, and we can try to keep this PR as focused as possible for the sake of moving forward. |
|
@C0urante , sure, please file another PR for other comments. And thanks for the comments. However, I'm still concerned that @kkonstantine doesn't like the current solution, and would like to have another proposal as mentioned here. So, I think we still need to get his approval before we can continue. WDYT? cc @kkonstantine , we need your suggestions here, please! |
|
@kkonstantine , sorry to keep pinging you, but we need your advice before we can continue. Thanks. |
|
@showuon FYI, I've just opened #12019, which should address KAFKA-12495 and some other issues with rebalancing, but without using consecutive revocations. |
|
Great! I'll take a look next week. Also cc @kkonstantine . Thanks. |
|
@showuon @C0urante @kkonstantine What is the status of this PR? As far as I understand this PR might resolve KAFKA-8391, KAFKA-12283 and KAFKA-12495. Is this correct? Those tickets block the 3.2.0 release. |
@cadonna , it's correct (Only KAFKA-8391 is not 100% sure). So far, we are waiting for @kkonstantine 's comments about what's the better solution for this issue, since he was concerned about the current solution, and has a better solution for that (commented here). |
Hi, It very probably also resolves KAFKA-10413. |
|
Thanks for working on this fix @showuon. Apologies for taking me so long to return here. My main concern is related to the proposed change to apply consecutive rebalances that will perform revocations. The current incremental cooperative rebalancing algorithm, is using two consecutive rebalances in order to move tasks between workers. One rebalance during which revocations are happening and one during which the revoked tasks are reassigned. Although clearly this is not an atomic process (as this issue also demonstrates) I find that it's a good property to maintain and reason about. Allowing for consecutive revocations that happen immediately when an imbalance is detected might mean that the workers overreact to external circumstances that have caused an imbalanced between the initial calculation of task assignments of the revocation rebalance and the subsequent rebalance for the assignment of revoked tasks. Such circumstances might have to do with rolling upgrades, scaling a cluster up or down or simply might be caused by temporary instability. We were first able to reproduce this issue in integration tests by the test that is currently disabled. My main thought was that, instead of risking shuffling tasks too aggressively within a short period of time and open the door to bugs that will make workers oscillate between imbalanced task assignments continuously and in a tight loop, we could use the existing mechanism of scheduling delayed rebalances to program workers to perform a pair of rebalanced (revocation + reassignment) soon after an imbalance is detected. Regarding when an imbalance is detected, the good news is that the leader worker sending the assignment during the second rebalance of a pair of rebalances knows that it will send an imbalanced assignment (there's no code to detect right now that but can be easily added just before the assignment is sent). The idea here would be to send this assignment anyways, but also schedule a follow up rebalance that will have the opportunity to balance tasks soon with our standard pair of rebalances that works dependably as long as no new workers are added or removed between the two rebalances. We can discuss what is a good setting for the delay. One obvious possibility is to reuse the existing property. Adding another config just for that seems unwarranted. To shield ourselves from infinite such rebalances the leader should also keep track of how many such attempts have been made and stop attempting to balance out tasks after a certain number of tries. Of course every other normal rebalance should reset both this counter and possibly the delay. I'd be interested to hear what do you think of this approach that is quite similar to what you have demonstrated already but potentially less risky in terms of changes in the assignor logic and how aggressively the leader attempts to fix an imbalance. |
|
@kkonstantine Thank you for your thoughts! From a 3.2.0 release perspective your proposal seems to be a change that we should postpone to a later release since feature freeze and code freeze has passed. Or is this a regression? If it is a regression is there a quick intermediate fix that we can include into 3.2.0 to unblock the release? If it is not a regression, I would propose to link the corresponding tickets one to each other and then to move them to the next release. Ideally with an assignee. |
|
My comment is put in jira ticket. Thanks. |
jira: https://issues.apache.org/jira/browse/KAFKA-12495
Allow consecutive revoke in incremental cooperative assignor in connector, to fix the issue that when new members joined right after revocation round, it causes uneven distribution. (please check the jira for better understanding) What I did are:
canRevokevariable, since we allow consecutive revoking (as lone asdelay == 0) nowcurrentWorkerAssignmentWithoutDuplication, to remove duplicated connectors/tasks fromcurrentWorkerAssignment, so that we can use it inperformTaskRevocationmethod, to compute if we need to revoke more connectors/tasks in this round by computing if the remaining assignments in each worker is higher than totalTasks/totalWorkersperformTaskRevocationinstead ofactiveAssignment, so that we can compute the correct expected max assignment number (totalSize / workerSize), instead of the activeTotalSize / workerSize, because the activeTotalSize doesn't include thenewAssignments, which will cause the wrong computation, and cause uneven rebalance, or need more round of revoking rebalance.With the change of (2) and (3), we can still make sure the revocation is always correct no matter if this is a consecutive revoking, or we have duplicated assignment.
Committer Checklist (excluded from commit message)