diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 28945f784a4b5..c697dc1d97fb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -464,6 +464,7 @@ void handleLostAll() { if (task.isActive()) { closeTaskDirty(task); iterator.remove(); + try { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); } catch (final RuntimeException e) { @@ -516,6 +517,10 @@ public Map getTaskOffsetSums() { * assigned the task as a result of the rebalance). This method should be idempotent. */ private void tryToLockAllNonEmptyTaskDirectories() { + // Always clear the set at the beginning as we're always dealing with the + // current set of actually-locked tasks. + lockedTaskDirectories.clear(); + for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) { try { final TaskId id = TaskId.parse(dir.getName()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 941636e7bd114..f8c7deea18f28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -446,7 +446,7 @@ public Map committableOffsetsAndMetadata() { } @Test - public void shouldCloseActiveTasksWhenHandlingLostTasks() { + public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false); @@ -457,6 +457,17 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); expectLastCall().anyTimes(); + makeTaskFolders(taskId00.toString(), taskId01.toString()); + expectLockObtainedFor(taskId00, taskId01); + + // The second attempt will return empty tasks. + makeTaskFolders(); + expectLockObtainedFor(); + replay(stateDirectory); + + taskManager.handleRebalanceStart(emptySet()); + assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01))); + // `handleLostAll` activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall(); @@ -473,6 +484,13 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { assertThat(task01.state(), is(Task.State.RUNNING)); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), is(singletonMap(taskId01, task01))); + + // The locked task map will not be cleared. + assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01))); + + taskManager.handleRebalanceStart(emptySet()); + + assertThat(taskManager.lockedTaskDirectories(), is(emptySet())); } @Test