Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
private final Time time;
private final int maxDelay;
private ConnectorsAndTasks previousAssignment;
private final ConnectorsAndTasks previousRevocation;
private boolean canRevoke;
Copy link
Copy Markdown
Member Author

@showuon showuon Mar 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change (1): remove unused canRevoke

private ConnectorsAndTasks previousRevocation;
// visible for testing
protected final Set<String> candidateWorkersForReassignment;
protected long scheduledRebalance;
Expand All @@ -75,7 +74,6 @@ public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxD
this.maxDelay = maxDelay;
this.previousAssignment = ConnectorsAndTasks.EMPTY;
this.previousRevocation = new ConnectorsAndTasks.Builder().build();
this.canRevoke = true;
this.scheduledRebalance = 0;
this.candidateWorkersForReassignment = new LinkedHashSet<>();
this.delay = 0;
Expand Down Expand Up @@ -196,24 +194,20 @@ protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long ma
// appropriately and be ready to re-apply revocation of tasks
if (!previousRevocation.isEmpty()) {
if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
|| previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
|| previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
previousAssignment = activeAssignments;
canRevoke = true;
}
previousRevocation.connectors().clear();
previousRevocation.tasks().clear();
}



// Derived set: The set of deleted connectors-and-tasks is a derived set from the set
// difference of previous - configured
ConnectorsAndTasks deleted = diff(previousAssignment, configured);
log.debug("Deleted assignments: {}", deleted);

// Derived set: The set of remaining active connectors-and-tasks is a derived set from the
// set difference of active - deleted
ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used anywhere else, so delete it.


// Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
// the set difference of previous - active - deleted
ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted);
Expand Down Expand Up @@ -248,6 +242,10 @@ protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long ma
toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments));
log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);

List<WorkerLoad> currentWorkerAssignmentWithoutDuplication =
removeDuplicated(workerAssignment(memberConfigs, deleted), toRevoke);
log.debug("Complete (excluding deletions and remove duplicated assignments) worker assignments: {}", currentWorkerAssignmentWithoutDuplication);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change (2): compute the current worker assignment excluding deletions and duplicated assignments. If after excluding deletion and duplicated assignments, there are still workers have assignment higher than totalTasksWeHave / totalWorkers, we still need to revoke more tasks.


// Recompute the complete assignment excluding the deleted connectors-and-tasks
completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
connectorAssignments =
Expand All @@ -258,17 +256,12 @@ protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long ma
handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs);

// Do not revoke resources for re-assignment while a delayed rebalance is active
// Also we do not revoke in two consecutive rebalances by the same leader
canRevoke = delay == 0 && canRevoke;

// Compute the connectors-and-tasks to be revoked for load balancing without taking into
// account the deleted ones.
log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
if (canRevoke) {
log.debug("Can leader revoke tasks in this assignment? (delay: {})", delay);
if (delay == 0) {
// Compute the connectors-and-tasks to be revoked for load balancing without taking into
// account the deleted ones.
Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
performTaskRevocation(activeAssignments, currentWorkerAssignment);

log.debug("Connector and task to revoke assignments: {}", toRevoke);
performTaskRevocation(configured, currentWorkerAssignmentWithoutDuplication);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change (3): pass configured (total tasks we have), and currentWorkerAssignmentWithoutDuplication into performTaskRevocation method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why this change is necessary? I ran the new testTaskAssignmentWhenWorkerJoinAfterRevocation test case with and without it, and although it fails without this change, it looks like that's more due to frail testing logic with the assertAssignment method than an actual bug in the rebalancing logic here. If I remove the assertAssignment calls but manually check on the distribution of C/T across the cluster after the fifth rebalance, everything is balanced.

I've also produced a test case that fails with this change but succeeds without it:

    @Test
    public void testNewWorkerAndNewTasksInSameRound() {
        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());

        // Start with 40 tasks
        configState = clusterConfigState(offset, 1, 40);
        when(coordinator.configSnapshot()).thenReturn(configState);

        // Start with three workers
        memberConfigs = memberConfigs(leader, offset, 0, 2);
        expectGeneration();
        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
        ++rebalanceNum;
        returnedAssignments = assignmentsCapture.getValue();
        assertDelay(0, returnedAssignments);
        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
        assertNoReassignments(memberConfigs, expectedMemberConfigs);

        applyAssignments(returnedAssignments);
        memberConfigs = memberConfigs(leader, offset, assignments);
        // Add 2 tasks
        configState = clusterConfigState(offset, 1, 42);
        when(coordinator.configSnapshot()).thenReturn(configState);
        // Add a worker
        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
        expectGeneration();
        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
        ++rebalanceNum;
        returnedAssignments = assignmentsCapture.getValue();
        assertDelay(0, returnedAssignments);
        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
        assertNoReassignments(memberConfigs, expectedMemberConfigs);

        applyAssignments(returnedAssignments);
        memberConfigs = memberConfigs(leader, offset, assignments);
        // Rebalance once more as a follow-up to task revocation
        expectGeneration();
        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion);
        ++rebalanceNum;
        returnedAssignments = assignmentsCapture.getValue();
        assertDelay(0, returnedAssignments);
        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
        assertNoReassignments(memberConfigs, expectedMemberConfigs);

        applyAssignments(returnedAssignments);
        memberConfigs = memberConfigs(leader, offset, assignments);
        assertBalancedAssignments(memberConfigs);

        verify(coordinator, times(rebalanceNum)).configSnapshot();
        verify(coordinator, times(rebalanceNum)).leaderState(any());
        verify(coordinator, times(2 * rebalanceNum)).generationId();
        verify(coordinator, times(rebalanceNum)).memberId();
        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
    }

    private void assertBalancedAssignments(Map<String, ExtendedWorkerState> existingAssignments) {
        List<Integer> connectorCounts = existingAssignments.values().stream()
                .map(e -> e.assignment().connectors().size())
                .sorted()
                .collect(Collectors.toList());
        List<Integer> taskCounts = existingAssignments.values().stream()
                .map(e -> e.assignment().tasks().size())
                .sorted()
                .collect(Collectors.toList());

        int minConnectors = connectorCounts.get(0);
        int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);

        int minTasks = taskCounts.get(0);
        int maxTasks = taskCounts.get(taskCounts.size() - 1);

        assertTrue(
                "Assignments are imbalanced. The spread of connectors across each worker is: " + connectorCounts,
                maxConnectors - minConnectors <= 1
        );
        assertTrue(
                "Assignments are imbalanced. The spread of tasks across each worker is: " + taskCounts,
                maxTasks - minTasks <= 1
        );
    }


toExplicitlyRevoke.forEach(
(worker, assignment) -> {
Expand All @@ -279,9 +272,8 @@ protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long ma
existing.tasks().addAll(assignment.tasks());
}
);
canRevoke = toExplicitlyRevoke.size() == 0;
} else {
canRevoke = delay == 0;

log.debug("Connector and task to revoke assignments: {}", toRevoke);
}

assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
Expand Down Expand Up @@ -314,6 +306,15 @@ protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long ma
return serializeAssignments(assignments);
}

private List<WorkerLoad> removeDuplicated(List<WorkerLoad> currentWorkerAssignmentWithoutDuplication,
Map<String, ConnectorsAndTasks> toRevoke) {
for (WorkerLoad wl: currentWorkerAssignmentWithoutDuplication) {
wl.connectors().removeAll(toRevoke.getOrDefault(wl.worker(), ConnectorsAndTasks.EMPTY).connectors());
wl.tasks().removeAll(toRevoke.getOrDefault(wl.worker(), ConnectorsAndTasks.EMPTY).tasks());
}
return currentWorkerAssignmentWithoutDuplication;
}

private Map<String, ConnectorsAndTasks> computeDeleted(ConnectorsAndTasks deleted,
Map<String, Collection<String>> connectorAssignments,
Map<String, Collection<ConnectorTaskId>> taskAssignments) {
Expand Down Expand Up @@ -451,7 +452,7 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
}

if (!candidateWorkerLoad.isEmpty()) {
log.debug("Assigning lost tasks to {} candidate workers: {}",
log.debug("Assigning lost tasks to {} candidate workers: {}",
candidateWorkerLoad.size(),
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
Iterator<WorkerLoad> candidateWorkerIterator = candidateWorkerLoad.iterator();
Expand Down Expand Up @@ -489,12 +490,10 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
log.debug("Delayed rebalance in progress. Task reassignment is postponed. New computed rebalance delay: {}", delay);
} else {
// This means scheduledRebalance == 0
// We could also also extract the current minimum delay from the group, to make
// independent of consecutive leader failures, but this optimization is skipped
// at the moment
// We could also extract the current minimum delay from the group, to make
// independent of consecutive leader failures, but this optimization is skipped at the moment
delay = maxDelay;
log.debug("Resetting rebalance delay to the max: {}. scheduledRebalance: {} now: {} diff scheduledRebalance - now: {}",
delay, scheduledRebalance, now, scheduledRebalance - now);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment said, this else block means scheduledRebalance == 0, so we don't need to log the scheduledRebalance and now value

log.debug("Resetting rebalance delay to the max: {}.", delay);
}
scheduledRebalance = now + delay;
}
Expand Down Expand Up @@ -532,23 +531,23 @@ private List<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> com
* each existing worker. The revoked tasks, once assigned to the new workers will maintain
* a balanced load among the group.
*
* @param activeAssignments
* @param completeWorkerAssignment
* @return
* @param allConnectorsAndTasks all the connectors and tasks we need to distribute
* @param completeWorkerAssignmentWithoutDuplication current workers assignment without duplication
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the existing variable name is fine; we can definitely update the Javadoc to clarify that this assignment should exclude duplicated and to-be-deleted C/T, but something this long is a little hard to read.

* @return the connectors/tasks that needed to be revoked on each worker, if no revoke is required, will return empty map
*/
private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks activeAssignments,
Collection<WorkerLoad> completeWorkerAssignment) {
int totalActiveConnectorsNum = activeAssignments.connectors().size();
int totalActiveTasksNum = activeAssignments.tasks().size();
Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream()
private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks allConnectorsAndTasks,
Collection<WorkerLoad> completeWorkerAssignmentWithoutDuplication) {
int totalConnectorsNum = allConnectorsAndTasks.connectors().size();
int totalTasksNum = allConnectorsAndTasks.tasks().size();
Collection<WorkerLoad> existingWorkers = completeWorkerAssignmentWithoutDuplication.stream()
Copy link
Copy Markdown
Member Author

@showuon showuon Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use allConnectorsAndTasks (configured) to compute the total connectors/tasks number, and use the completeWorkerAssignmentWithoutDuplication to compute the existing workers. So it'll always compute the correct expected connector/task number for each worker

.filter(wl -> wl.size() > 0)
.collect(Collectors.toList());
int existingWorkersNum = existingWorkers.size();
int totalWorkersNum = completeWorkerAssignment.size();
int totalWorkersNum = completeWorkerAssignmentWithoutDuplication.size();
int newWorkersNum = totalWorkersNum - existingWorkersNum;

if (log.isDebugEnabled()) {
completeWorkerAssignment.forEach(wl -> log.debug(
completeWorkerAssignmentWithoutDuplication.forEach(wl -> log.debug(
"Per worker current load size; worker: {} connectors: {} tasks: {}",
wl.worker(), wl.connectorsSize(), wl.tasksSize()));
}
Expand All @@ -570,21 +569,22 @@ private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks
existingWorkersNum, newWorkersNum, totalWorkersNum);

// We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalConnectorsNum / existingWorkersNum);
int floorConnectors = totalConnectorsNum / totalWorkersNum;
int ceilConnectors = floorConnectors + ((totalConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);


log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
int floorTasks = totalActiveTasksNum / totalWorkersNum;
int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalTasksNum / existingWorkersNum);
int floorTasks = totalTasksNum / totalWorkersNum;
int ceilTasks = floorTasks + ((totalTasksNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
int numToRevoke;

for (WorkerLoad existing : existingWorkers) {
Iterator<String> connectors = existing.connectors().iterator();
numToRevoke = existing.connectorsSize() - ceilConnectors;
log.debug("Connectors on worker {} is higher than ceiling, so revoking {} Connectors", existing, numToRevoke);
for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
ConnectorsAndTasks resources = revoking.computeIfAbsent(
existing.worker(),
Expand Down
Loading