diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1493cdd9bf457..649b5125f1b7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -574,6 +574,7 @@ void runLoop() { taskManager.tasks() .values() .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) .collect(Collectors.toSet()) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index cd8002f538d00..ff632ebd427e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1920,7 +1920,9 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { mkEntry(taskId1, emptySet()) ); + expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task1.id()).andReturn(taskId1).anyTimes(); + expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task2.id()).andReturn(taskId2).anyTimes(); expect(taskManager.tasks()).andReturn(mkMap( @@ -1961,6 +1963,63 @@ void runOnce() { verify(taskManager); } + @Test + public void shouldNotCommitNonRunningNonCorruptedTasksOnTaskCorruptedException() { + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final Consumer consumer = mock(Consumer.class); + final Task task1 = mock(Task.class); + final Task task2 = mock(Task.class); + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 2); + + final Map> corruptedTasksWithChangelogs = mkMap( + mkEntry(taskId1, emptySet()) + ); + + expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); + expect(task1.id()).andReturn(taskId1).anyTimes(); + expect(task2.state()).andReturn(Task.State.CREATED).anyTimes(); + expect(task2.id()).andReturn(taskId2).anyTimes(); + + expect(taskManager.tasks()).andReturn(mkMap( + mkEntry(taskId1, task1), + mkEntry(taskId2, task2) + )).anyTimes(); + // expect not to try and commit task2, even though it's not corrupted, because it's not running. + expect(taskManager.commit(emptySet())).andReturn(0); + + EasyMock.replay(task1, task2, taskManager); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + null, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger(), + new AtomicLong(Long.MAX_VALUE) + ) { + @Override + void runOnce() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasksWithChangelogs); + } + }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + + thread.setState(StreamThread.State.STARTING); + thread.runLoop(); + + verify(taskManager); + } + @Test public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() { shouldLogAndRecordSkippedRecordsForInvalidTimestamps(StreamsConfig.METRICS_0100_TO_24);