diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 0a1f47e139e6c..012ff20f8b980 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -184,7 +184,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask, final ProcessorStateManager stateManager = standbyTask.stateMgr; final LogContext logContext = getLogContext(standbyTask.id); - standbyTask.closeAndRecycleState(); + standbyTask.closeCleanAndRecycleState(); stateManager.transitionTaskType(TaskType.ACTIVE, logContext); return createActiveTask( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index ffd09f14059c8..1aa68bc721f2e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -189,7 +189,7 @@ public void closeDirty() { } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); if (state() == State.SUSPENDED) { stateMgr.recycle(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 443db8e8dd94a..b5f2b74f3f453 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -112,7 +112,7 @@ StandbyTask createStandbyTaskFromActive(final StreamTask streamTask, final InternalProcessorContext context = streamTask.processorContext(); final ProcessorStateManager stateManager = streamTask.stateMgr; - streamTask.closeAndRecycleState(); + streamTask.closeCleanAndRecycleState(); stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id())); return createStandbyTask( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6e8bf40ab79c6..4b27436023b4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -463,6 +463,7 @@ private Map extractPartitionTimes() { @Override public void closeClean() { + validateClean(); streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(true); log.info("Closed clean"); @@ -482,7 +483,8 @@ public void update(final Set topicPartitions, final Map task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED // Currently, there are no metrics registered for standby tasks. // This is a regression test so that, if we add some, we will be sure to deregister them. diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c6ffe74e6b8ce..960747047cfc4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; @@ -1752,7 +1753,7 @@ public void shouldUnregisterMetricsInCloseDirty() { } @Test - public void shouldUnregisterMetricsInCloseAndRecycle() { + public void shouldUnregisterMetricsInCloseCleanAndRecycleState() { EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); EasyMock.replay(stateManager, recordCollector); @@ -1761,7 +1762,7 @@ public void shouldUnregisterMetricsInCloseAndRecycle() { task.suspend(); assertThat(getTaskMetrics(), not(empty())); - task.closeAndRecycleState(); + task.closeCleanAndRecycleState(); assertThat(getTaskMetrics(), empty()); } @@ -1798,6 +1799,32 @@ public void shouldUpdatePartitions() { assertThat(task.inputPartitions(), equalTo(newPartitions)); } + @Test + public void shouldThrowIfCleanClosingDirtyTask() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + task.process(0L); + assertTrue(task.commitNeeded()); + + assertThrows(TaskMigratedException.class, () -> task.closeClean()); + } + + @Test + public void shouldThrowIfRecyclingDirtyTask() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + task.process(0L); + assertTrue(task.commitNeeded()); + + assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState()); + } + @Test public void shouldOnlyRecycleSuspendedTasks() { stateManager.recycle(); @@ -1805,16 +1832,16 @@ public void shouldOnlyRecycleSuspendedTasks() { EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig(false, "100"), true); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING task.completeRestoration(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED EasyMock.verify(stateManager, recordCollector); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index fcfbb1f455672..23166d14d084b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2723,7 +2723,7 @@ public void closeDirty() { } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { transitionTo(State.CLOSED); }