Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ void handleLostAll() {
if (task.isActive()) {
closeTaskDirty(task);
iterator.remove();

try {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
} catch (final RuntimeException e) {
Expand Down Expand Up @@ -516,6 +517,10 @@ public Map<TaskId, Long> getTaskOffsetSums() {
* assigned the task as a result of the rebalance). This method should be idempotent.
*/
private void tryToLockAllNonEmptyTaskDirectories() {
// Always clear the set at the beginning as we're always dealing with the
// current set of actually-locked tasks.
lockedTaskDirectories.clear();

for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) {
try {
final TaskId id = TaskId.parse(dir.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
}

@Test
public void shouldCloseActiveTasksWhenHandlingLostTasks() {
public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false);

Expand All @@ -457,6 +457,17 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();

makeTaskFolders(taskId00.toString(), taskId01.toString());
expectLockObtainedFor(taskId00, taskId01);

// The second attempt will return empty tasks.
makeTaskFolders();
expectLockObtainedFor();
replay(stateDirectory);

taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01)));

// `handleLostAll`
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall();
Expand All @@ -473,6 +484,13 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() {
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), is(singletonMap(taskId01, task01)));

// The locked task map will not be cleared.
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));

taskManager.handleRebalanceStart(emptySet());

assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
}

@Test
Expand Down