From 00a4a336f41536eb404ddc01dff943bbc2d1ce21 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sat, 20 Mar 2021 10:23:50 +0800 Subject: [PATCH 1/2] KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector --- .../IncrementalCooperativeAssignor.java | 88 ++++++------- .../IncrementalCooperativeAssignorTest.java | 118 ++++++++++++++---- 2 files changed, 135 insertions(+), 71 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 2cfaf1d3eb6ca..462996251d118 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -61,7 +61,6 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { private final int maxDelay; private ConnectorsAndTasks previousAssignment; private ConnectorsAndTasks previousRevocation; - private boolean canRevoke; // visible for testing protected final Set candidateWorkersForReassignment; protected long scheduledRebalance; @@ -75,7 +74,6 @@ public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxD this.maxDelay = maxDelay; this.previousAssignment = ConnectorsAndTasks.EMPTY; this.previousRevocation = new ConnectorsAndTasks.Builder().build(); - this.canRevoke = true; this.scheduledRebalance = 0; this.candidateWorkersForReassignment = new LinkedHashSet<>(); this.delay = 0; @@ -196,24 +194,20 @@ protected Map performTaskAssignment(String leaderId, long ma // appropriately and be ready to re-apply revocation of tasks if (!previousRevocation.isEmpty()) { if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c)) - || previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) { + || previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) { previousAssignment = activeAssignments; - canRevoke = true; } previousRevocation.connectors().clear(); previousRevocation.tasks().clear(); } + + // Derived set: The set of deleted connectors-and-tasks is a derived set from the set // difference of previous - configured ConnectorsAndTasks deleted = diff(previousAssignment, configured); log.debug("Deleted assignments: {}", deleted); - // Derived set: The set of remaining active connectors-and-tasks is a derived set from the - // set difference of active - deleted - ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted); - log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive); - // Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from // the set difference of previous - active - deleted ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted); @@ -248,6 +242,10 @@ protected Map performTaskAssignment(String leaderId, long ma toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); + List currentWorkerAssignmentWithoutDuplication = + removeDuplicated(workerAssignment(memberConfigs, deleted), toRevoke); + log.debug("Complete (excluding deletions and remove duplicated assignments) worker assignments: {}", currentWorkerAssignmentWithoutDuplication); + // Recompute the complete assignment excluding the deleted connectors-and-tasks completeWorkerAssignment = workerAssignment(memberConfigs, deleted); connectorAssignments = @@ -258,17 +256,12 @@ protected Map performTaskAssignment(String leaderId, long ma handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs); // Do not revoke resources for re-assignment while a delayed rebalance is active - // Also we do not revoke in two consecutive rebalances by the same leader - canRevoke = delay == 0 && canRevoke; - - // Compute the connectors-and-tasks to be revoked for load balancing without taking into - // account the deleted ones. - log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay); - if (canRevoke) { + log.debug("Can leader revoke tasks in this assignment? (delay: {})", delay); + if (delay == 0) { + // Compute the connectors-and-tasks to be revoked for load balancing without taking into + // account the deleted ones. Map toExplicitlyRevoke = - performTaskRevocation(activeAssignments, currentWorkerAssignment); - - log.debug("Connector and task to revoke assignments: {}", toRevoke); + performTaskRevocation(configured, currentWorkerAssignmentWithoutDuplication); toExplicitlyRevoke.forEach( (worker, assignment) -> { @@ -279,9 +272,8 @@ protected Map performTaskAssignment(String leaderId, long ma existing.tasks().addAll(assignment.tasks()); } ); - canRevoke = toExplicitlyRevoke.size() == 0; - } else { - canRevoke = delay == 0; + + log.debug("Connector and task to revoke assignments: {}", toRevoke); } assignConnectors(completeWorkerAssignment, newSubmissions.connectors()); @@ -314,6 +306,15 @@ protected Map performTaskAssignment(String leaderId, long ma return serializeAssignments(assignments); } + private List removeDuplicated(List currentWorkerAssignmentWithoutDuplication, + Map toRevoke) { + for (WorkerLoad wl: currentWorkerAssignmentWithoutDuplication) { + wl.connectors().removeAll(toRevoke.getOrDefault(wl.worker(), ConnectorsAndTasks.EMPTY).connectors()); + wl.tasks().removeAll(toRevoke.getOrDefault(wl.worker(), ConnectorsAndTasks.EMPTY).tasks()); + } + return currentWorkerAssignmentWithoutDuplication; + } + private Map computeDeleted(ConnectorsAndTasks deleted, Map> connectorAssignments, Map> taskAssignments) { @@ -451,7 +452,7 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, } if (!candidateWorkerLoad.isEmpty()) { - log.debug("Assigning lost tasks to {} candidate workers: {}", + log.debug("Assigning lost tasks to {} candidate workers: {}", candidateWorkerLoad.size(), candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(","))); Iterator candidateWorkerIterator = candidateWorkerLoad.iterator(); @@ -489,12 +490,10 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, log.debug("Delayed rebalance in progress. Task reassignment is postponed. New computed rebalance delay: {}", delay); } else { // This means scheduledRebalance == 0 - // We could also also extract the current minimum delay from the group, to make - // independent of consecutive leader failures, but this optimization is skipped - // at the moment + // We could also extract the current minimum delay from the group, to make + // independent of consecutive leader failures, but this optimization is skipped at the moment delay = maxDelay; - log.debug("Resetting rebalance delay to the max: {}. scheduledRebalance: {} now: {} diff scheduledRebalance - now: {}", - delay, scheduledRebalance, now, scheduledRebalance - now); + log.debug("Resetting rebalance delay to the max: {}.", delay); } scheduledRebalance = now + delay; } @@ -532,23 +531,23 @@ private List pickCandidateWorkerForReassignment(List com * each existing worker. The revoked tasks, once assigned to the new workers will maintain * a balanced load among the group. * - * @param activeAssignments - * @param completeWorkerAssignment - * @return + * @param allConnectorsAndTasks all the connectors and tasks we need to distribute + * @param completeWorkerAssignmentWithoutDuplication current workers assignment without duplication + * @return the connectors/tasks that needed to be revoked on each worker, if no revoke is required, will return empty map */ - private Map performTaskRevocation(ConnectorsAndTasks activeAssignments, - Collection completeWorkerAssignment) { - int totalActiveConnectorsNum = activeAssignments.connectors().size(); - int totalActiveTasksNum = activeAssignments.tasks().size(); - Collection existingWorkers = completeWorkerAssignment.stream() + private Map performTaskRevocation(ConnectorsAndTasks allConnectorsAndTasks, + Collection completeWorkerAssignmentWithoutDuplication) { + int totalConnectorsNum = allConnectorsAndTasks.connectors().size(); + int totalTasksNum = allConnectorsAndTasks.tasks().size(); + Collection existingWorkers = completeWorkerAssignmentWithoutDuplication.stream() .filter(wl -> wl.size() > 0) .collect(Collectors.toList()); int existingWorkersNum = existingWorkers.size(); - int totalWorkersNum = completeWorkerAssignment.size(); + int totalWorkersNum = completeWorkerAssignmentWithoutDuplication.size(); int newWorkersNum = totalWorkersNum - existingWorkersNum; if (log.isDebugEnabled()) { - completeWorkerAssignment.forEach(wl -> log.debug( + completeWorkerAssignmentWithoutDuplication.forEach(wl -> log.debug( "Per worker current load size; worker: {} connectors: {} tasks: {}", wl.worker(), wl.connectorsSize(), wl.tasksSize())); } @@ -570,21 +569,22 @@ private Map performTaskRevocation(ConnectorsAndTasks existingWorkersNum, newWorkersNum, totalWorkersNum); // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0 - log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum); - int floorConnectors = totalActiveConnectorsNum / totalWorkersNum; - int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1); + log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalConnectorsNum / existingWorkersNum); + int floorConnectors = totalConnectorsNum / totalWorkersNum; + int ceilConnectors = floorConnectors + ((totalConnectorsNum % totalWorkersNum == 0) ? 0 : 1); log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors); - log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum); - int floorTasks = totalActiveTasksNum / totalWorkersNum; - int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1); + log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalTasksNum / existingWorkersNum); + int floorTasks = totalTasksNum / totalWorkersNum; + int ceilTasks = floorTasks + ((totalTasksNum % totalWorkersNum == 0) ? 0 : 1); log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks); int numToRevoke; for (WorkerLoad existing : existingWorkers) { Iterator connectors = existing.connectors().iterator(); numToRevoke = existing.connectorsSize() - ceilConnectors; + log.debug("Connectors on worker {} is higher than ceiling, so revoking {} Connectors", existing, numToRevoke); for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) { ConnectorsAndTasks resources = revoking.computeIfAbsent( existing.worker(), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 0fe153132eb93..3968ddf588c33 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -180,6 +180,92 @@ public void testTaskAssignmentWhenWorkerJoins() { verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); } + @Test + public void testTaskAssignmentWhenWorkerJoinAfterRevocation() { + when(coordinator.configSnapshot()).thenReturn(configState); + doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); + + // First assignment with 1 worker and 2 connectors configured but not yet assigned + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(2, 8, 0, 0, "worker1"); + + // Second assignment with a second worker joining and all connectors running on previous worker + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 1, 4, "worker1", "worker2"); + + // Third assignment after revocations, and a third worker joining + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null)); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3"); + + // Forth assignment after revocations, and a forth worker joining + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + memberConfigs.put("worker4", new ExtendedWorkerState(leaderUrl, offset, null)); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 1, 0, 1, "worker1", "worker2", "worker3", "worker4"); + + // Fifth assignment after revocations + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 1, 0, 0, "worker1", "worker2", "worker3", "worker4"); + + + // A sixth rebalance should not change assignments + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + expectGeneration(); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 0, 0, "worker1", "worker2", "worker3", "worker4"); + + verify(coordinator, times(rebalanceNum)).configSnapshot(); + verify(coordinator, times(rebalanceNum)).leaderState(any()); + verify(coordinator, times(2 * rebalanceNum)).generationId(); + verify(coordinator, times(rebalanceNum)).memberId(); + verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); + } + @Test public void testTaskAssignmentWhenWorkerLeavesPermanently() { // Customize assignor for this test case @@ -1133,20 +1219,9 @@ public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() { assertDelay(0, returnedAssignments); expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(1, 4, 0, 2, "worker1", "worker2"); - - // fourth rebalance after revocations - applyAssignments(returnedAssignments); - memberConfigs = memberConfigs(leader, offset, assignments); - assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); - ++rebalanceNum; - returnedAssignments = assignmentsCapture.getValue(); - assertDelay(0, returnedAssignments); - expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); - assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(0, 2, 0, 0, "worker1", "worker2"); + assertAssignment(1, 4, 0, 0, "worker1", "worker2"); - // Fifth rebalance should not change assignments + // Fourth rebalance should not change assignments applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -1195,20 +1270,9 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() assertDelay(0, returnedAssignments); expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(0, 0, 2, 8, "worker1", "worker2"); - - // Third assignment after revocations - applyAssignments(returnedAssignments); - memberConfigs = memberConfigs(leader, offset, assignments); - assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); - ++rebalanceNum; - returnedAssignments = assignmentsCapture.getValue(); - assertDelay(0, returnedAssignments); - expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); - assertNoReassignments(memberConfigs, expectedMemberConfigs); - assertAssignment(0, 0, 0, 2, "worker1", "worker2"); + assertAssignment(0, 0, 2, 10, "worker1", "worker2"); - // fourth rebalance after revocations + // Third rebalance after revocations applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -1219,7 +1283,7 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() assertNoReassignments(memberConfigs, expectedMemberConfigs); assertAssignment(0, 2, 0, 0, "worker1", "worker2"); - // Fifth rebalance should not change assignments + // Fourth rebalance should not change assignments applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); From f9ba6da73b580e52117f7285c9209d6500dce45f Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 22 Mar 2021 15:48:29 +0800 Subject: [PATCH 2/2] KAFKA-12495: add assignment result in each phase for tests --- .../IncrementalCooperativeAssignorTest.java | 387 +++++++++++++++++- 1 file changed, 385 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 3968ddf588c33..f1a9b09d0e7fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -127,6 +127,14 @@ public void testTaskAssignmentWhenWorkerJoins() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 1 worker and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: + // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); ++rebalanceNum; @@ -137,6 +145,16 @@ public void testTaskAssignmentWhenWorkerJoins() { assertAssignment(2, 8, 0, 0, "worker1"); // Second assignment with a second worker joining and all connectors running on previous worker + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3] + // W2: assignedConnectors:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[], tasks:[] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); @@ -150,6 +168,16 @@ public void testTaskAssignmentWhenWorkerJoins() { assertAssignment(0, 0, 1, 4, "worker1", "worker2"); // Third assignment after revocations + // + // assignment after this phase: + // W1: assignedTasks:[], assignedTasks:[], + // revokedConnectors:[], revokedTasks:[] + // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); expectGeneration(); @@ -186,6 +214,14 @@ public void testTaskAssignmentWhenWorkerJoinAfterRevocation() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 1 worker and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: + // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); ++rebalanceNum; @@ -196,6 +232,16 @@ public void testTaskAssignmentWhenWorkerJoinAfterRevocation() { assertAssignment(2, 8, 0, 0, "worker1"); // Second assignment with a second worker joining and all connectors running on previous worker + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3] + // W2: assignedConnectors:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[], tasks:[] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); @@ -209,6 +255,19 @@ public void testTaskAssignmentWhenWorkerJoinAfterRevocation() { assertAssignment(0, 0, 1, 4, "worker1", "worker2"); // Third assignment after revocations, and a third worker joining + // + // assignment after this phase: + // W1: assignedTasks:[], assignedTasks:[], + // revokedConnectors:[], revokedTasks:[T0-3] + // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1] + // revokedConnectors:[] revokedTasks:[] + // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2] + // W2: connectors:[C1], tasks:[T1-0, T1-1] + // W3: connectors:[], tasks:[T1-2, T1-3] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null)); @@ -222,6 +281,22 @@ public void testTaskAssignmentWhenWorkerJoinAfterRevocation() { assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3"); // Forth assignment after revocations, and a forth worker joining + // + // assignment after this phase: + // W1: assignedTasks:[], assignedTasks:[], + // revokedConnectors:[], revokedTasks:[T0-2] + // W2: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W3: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W4: assignedTasks:[], assignedTasks:[T0-3] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1] + // W2: connectors:[C1], tasks:[T1-0, T1-1] + // W3: connectors:[], tasks:[T1-2, T1-3] + // W4: connectors:[], tasks:[T0-3] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); memberConfigs.put("worker4", new ExtendedWorkerState(leaderUrl, offset, null)); @@ -235,6 +310,22 @@ public void testTaskAssignmentWhenWorkerJoinAfterRevocation() { assertAssignment(0, 1, 0, 1, "worker1", "worker2", "worker3", "worker4"); // Fifth assignment after revocations + // + // assignment after this phase: + // W1: assignedTasks:[], assignedTasks:[], + // revokedConnectors:[], revokedTasks:[] + // W2: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W3: assignedTasks:[], assignedTasks:[] + // revokedConnectors:[] revokedTasks:[] + // W4: assignedTasks:[], assignedTasks:[T0-2] + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1] + // W2: connectors:[C1], tasks:[T1-0, T1-1] + // W3: connectors:[], tasks:[T1-2, T1-3] + // W4: connectors:[], tasks:[T0-3, T0-2] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); expectGeneration(); @@ -276,6 +367,17 @@ public void testTaskAssignmentWhenWorkerLeavesPermanently() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 2 workers and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2, T0-3], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -319,6 +421,13 @@ public void testTaskAssignmentWhenWorkerLeavesPermanently() { time.sleep(rebalanceDelay / 2 + 1); // Fourth assignment after delay expired + // + // assignment after this phase: + // W1: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); expectGeneration(); @@ -347,6 +456,17 @@ public void testTaskAssignmentWhenWorkerBounces() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 2 workers and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2, T0-3], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -407,6 +527,16 @@ public void testTaskAssignmentWhenWorkerBounces() { // Fifth assignment with the same two workers. The delay has expired, so the lost // assignments ought to be assigned to the worker that has appeared as returned. + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); expectGeneration(); @@ -435,6 +565,20 @@ public void testTaskAssignmentWhenLeaderLeavesPermanently() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 3 workers and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2], + // revokedConnectors:[] revokedTasks:[] + // W3: assignedConnectors:[], assignedTasks:[T0-3, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2] + // W3: connectors:[], tasks:[T0-3, T1-3] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null)); expectGeneration(); @@ -449,6 +593,16 @@ public void testTaskAssignmentWhenLeaderLeavesPermanently() { // Second assignment with two workers remaining in the group. The worker that left the // group was the leader. The new leader has no previous assignments and is not tracking a // delay upon a leader's exit + // + // assignment after this phase: (suppose W2 is the leader) + // W2: assignedConnectors:[], assignedTasks:[T0-0], + // revokedConnectors:[] revokedTasks:[] + // W3: assignedConnectors:[C0], assignedTasks:[T0-1, T0-2], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T0-0] + // W3: connectors:[C0], tasks:[T0-1, T0-2, T0-3, T1-3] applyAssignments(returnedAssignments); assignments.remove("worker1"); leader = "worker2"; @@ -468,7 +622,7 @@ public void testTaskAssignmentWhenLeaderLeavesPermanently() { assertNoReassignments(memberConfigs, expectedMemberConfigs); assertAssignment(1, 3, 0, 0, "worker2", "worker3"); - // Third (incidental) assignment with still only one worker in the group. + // Third (incidental) assignment with still only 2 workers in the group. applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); expectGeneration(); @@ -496,6 +650,20 @@ public void testTaskAssignmentWhenLeaderBounces() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 3 workers and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2], + // revokedConnectors:[] revokedTasks:[] + // W3: assignedConnectors:[], assignedTasks:[T0-3, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2] + // W3: connectors:[], tasks:[T0-3, T1-3] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null)); expectGeneration(); @@ -510,6 +678,16 @@ public void testTaskAssignmentWhenLeaderBounces() { // Second assignment with two workers remaining in the group. The worker that left the // group was the leader. The new leader has no previous assignments and is not tracking a // delay upon a leader's exit + // + // assignment after this phase: (suppose W2 is the leader) + // W2: assignedConnectors:[], assignedTasks:[T0-0], + // revokedConnectors:[] revokedTasks:[] + // W3: assignedConnectors:[C0], assignedTasks:[T0-1, T0-2], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T0-0] + // W3: connectors:[C0], tasks:[T0-1, T0-2, T0-3, T1-3] applyAssignments(returnedAssignments); assignments.remove("worker1"); leader = "worker2"; @@ -532,6 +710,19 @@ public void testTaskAssignmentWhenLeaderBounces() { // Third assignment with the previous leader returning as a follower. In this case, the // arrival of the previous leader is treated as an arrival of a new worker. Reassignment // happens immediately, first with a revocation + // + // assignment after this phase: (suppose W2 is the leader) + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T0-0] + // W3: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T0-2] + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2] + // W3: connectors:[C0], tasks:[T0-1, T0-3, T1-3] + // W1: connectors:[], tasks:[] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); memberConfigs.put("worker1", new ExtendedWorkerState(leaderUrl, offset, null)); @@ -544,6 +735,19 @@ public void testTaskAssignmentWhenLeaderBounces() { assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3"); // Fourth assignment after revocations + // + // assignment after this phase: (suppose W2 is the leader) + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // W3: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // W1: assignedConnectors:[], assignedTasks:[T0-0, T0-2], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2] + // W3: connectors:[C0], tasks:[T0-1, T0-3, T1-3] + // W1: connectors:[], tasks:[T0-0, T0-2] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); expectGeneration(); @@ -573,6 +777,17 @@ public void testTaskAssignmentWhenFirstAssignmentAttemptFails() { .when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 2 workers and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2, T0-3], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase due to failure: + // W1: connectors:[], tasks:[] + // W2: connectors:[], tasks:[] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); try { expectGeneration(); @@ -591,6 +806,16 @@ public void testTaskAssignmentWhenFirstAssignmentAttemptFails() { // Second assignment happens with members returning the same assignments (memberConfigs) // as the first time. The assignor detects that the number of members did not change and // avoids the rebalance delay, treating the lost assignments as new assignments. + // + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2, T0-3], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -618,6 +843,17 @@ public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 2 workers and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2, T0-3], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -634,6 +870,19 @@ public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() { // Second assignment triggered by a third worker joining. The computed assignment should // revoke tasks from the existing group. But the assignment won't be correctly delivered. + // + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T0-3] + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T1-3] + // W3: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase due to failure: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] + // W3: connectors:[], tasks:[] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null)); @@ -653,6 +902,19 @@ public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() { // Third assignment happens with members returning the same assignments (memberConfigs) // as the first time. + // + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T0-3] + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T1-3] + // W3: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2] + // W3: connectors:[], tasks:[] doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -681,6 +943,17 @@ public void testTaskAssignmentWhenSubsequentAssignmentAttemptFailsOutsideTheAssi doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 2 workers and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[C0], assignedTasks:[T0-0, T0-1, T0-2, T0-3], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -694,6 +967,19 @@ public void testTaskAssignmentWhenSubsequentAssignmentAttemptFailsOutsideTheAssi // Second assignment triggered by a third worker joining. The computed assignment should // revoke tasks from the existing group. But the assignment won't be correctly delivered // and sync group with fail on the leader worker. + // + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T0-3] + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T1-3] + // W3: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase due to failure: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] + // W3: connectors:[], tasks:[] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null)); @@ -712,6 +998,19 @@ public void testTaskAssignmentWhenSubsequentAssignmentAttemptFailsOutsideTheAssi // Third assignment happens with members returning the same assignments (memberConfigs) // as the first time. + // + // assignment after this phase: (suppose W1 is the leader) + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T0-3] + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T1-3] + // W3: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase due to failure: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2] + // W3: connectors:[], tasks:[] when(coordinator.lastCompletedGenerationId()).thenReturn(assignor.previousGenerationId - 1); doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); expectGeneration(); @@ -736,7 +1035,18 @@ public void testTaskAssignmentWhenConnectorsAreDeleted() { when(coordinator.configSnapshot()).thenReturn(configState); doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); - // First assignment with 1 worker and 2 connectors configured but not yet assigned + // First assignment with 2 worker and 3 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: + // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C2], assignedTasks:[T1-2, T1-3, T2-0, T2-1, T2-2, T2-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1] + // W2: connectors:[C2], tasks:[T1-2, T1-3, T2-0, T2-1, T2-2, T2-3] memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null)); expectGeneration(); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -748,6 +1058,16 @@ public void testTaskAssignmentWhenConnectorsAreDeleted() { assertAssignment(3, 12, 0, 0, "worker1", "worker2"); // Second assignment with an updated config state that reflects removal of a connector + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1] revokedTasks:[T1-0, T1-1] + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[T1-2, T1-3] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C2], tasks:[T2-0, T2-1, T2-2, T2-3] configState = clusterConfigState(offset + 1, 2, 4); when(coordinator.configSnapshot()).thenReturn(configState); applyAssignments(returnedAssignments); @@ -1187,6 +1507,14 @@ public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() { doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 1 worker and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: + // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); ++rebalanceNum; returnedAssignments = assignmentsCapture.getValue(); @@ -1196,6 +1524,19 @@ public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() { assertAssignment(2, 8, 0, 0, "worker1"); // Second assignment with a second worker with duplicate assignment joining and all connectors running on previous worker + // + // W1 joined with assignment: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] + // W2 joined with assignment: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1] revokedTasks:[T1-0, T1-1, T1-2, T1-3] + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1] revokedTasks:[T1-0, T1-1, T1-2, T1-3] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[], tasks:[] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment(); @@ -1211,6 +1552,16 @@ public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() { assertAssignment(0, 0, 2, 8, "worker1", "worker2"); // Third assignment after revocations + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3] + // W2: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); @@ -1245,6 +1596,14 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); // First assignment with 1 worker and 2 connectors configured but not yet assigned + // + // note: the assigned/revoked Connectors/tasks might be different, but the amount should be the same + // assignment after this phase: + // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); ++rebalanceNum; returnedAssignments = assignmentsCapture.getValue(); @@ -1258,6 +1617,20 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() when(coordinator.configSnapshot()).thenReturn(configState); // Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time + // + // W1 joined with assignment: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, T1-2, T1-3] + // W2 joined with assignment: connectors:[C1], tasks:[T1-0, T1-1, T1-2, T1-3] + // Connector "C1" is deleted + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1] revokedTasks:[T1-0, T1-1, T1-2, T1-3, T0-2, T0-3] + // W2: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[C1] revokedTasks:[T1-0, T1-1, T1-2, T1-3] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1] + // W2: connectors:[], tasks:[] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment(); @@ -1273,6 +1646,16 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() assertAssignment(0, 0, 2, 10, "worker1", "worker2"); // Third rebalance after revocations + // + // assignment after this phase: + // W1: assignedConnectors:[], assignedTasks:[], + // revokedConnectors:[] revokedTasks:[] + // W2: assignedConnectors:[], assignedTasks:[T0-2, T0-3], + // revokedConnectors:[] revokedTasks:[] + // + // Final distribution after this phase: + // W1: connectors:[C0], tasks:[T0-0, T0-1] + // W2: connectors:[], tasks:[T0-2, T0-3] applyAssignments(returnedAssignments); memberConfigs = memberConfigs(leader, offset, assignments); assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);