From 0985473342bdb30e3f3d8004c72a997819b6d60e Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 28 Apr 2020 21:39:47 -0500 Subject: [PATCH 1/2] KAFKA-9832: fix attempt to commit non-running tasks --- .../processor/internals/StreamThread.java | 1 + .../processor/internals/StreamThreadTest.java | 59 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1493cdd9bf457..188f0585c2470 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -574,6 +574,7 @@ void runLoop() { taskManager.tasks() .values() .stream() + .filter(t -> t.state() == Task.State.RUNNING) .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) .collect(Collectors.toSet()) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index cd8002f538d00..ff632ebd427e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1920,7 +1920,9 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { mkEntry(taskId1, emptySet()) ); + expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task1.id()).andReturn(taskId1).anyTimes(); + expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task2.id()).andReturn(taskId2).anyTimes(); expect(taskManager.tasks()).andReturn(mkMap( @@ -1961,6 +1963,63 @@ void runOnce() { verify(taskManager); } + @Test + public void shouldNotCommitNonRunningNonCorruptedTasksOnTaskCorruptedException() { + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final Consumer consumer = mock(Consumer.class); + final Task task1 = mock(Task.class); + final Task task2 = mock(Task.class); + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 2); + + final Map> corruptedTasksWithChangelogs = mkMap( + mkEntry(taskId1, emptySet()) + ); + + expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); + expect(task1.id()).andReturn(taskId1).anyTimes(); + expect(task2.state()).andReturn(Task.State.CREATED).anyTimes(); + expect(task2.id()).andReturn(taskId2).anyTimes(); + + expect(taskManager.tasks()).andReturn(mkMap( + mkEntry(taskId1, task1), + mkEntry(taskId2, task2) + )).anyTimes(); + // expect not to try and commit task2, even though it's not corrupted, because it's not running. + expect(taskManager.commit(emptySet())).andReturn(0); + + EasyMock.replay(task1, task2, taskManager); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + null, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger(), + new AtomicLong(Long.MAX_VALUE) + ) { + @Override + void runOnce() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasksWithChangelogs); + } + }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + + thread.setState(StreamThread.State.STARTING); + thread.runLoop(); + + verify(taskManager); + } + @Test public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() { shouldLogAndRecordSkippedRecordsForInvalidTimestamps(StreamsConfig.METRICS_0100_TO_24); From 7429fcfe3a5793037f3cc0fd30d3e908cf7ce11b Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 28 Apr 2020 21:46:12 -0500 Subject: [PATCH 2/2] also commit restoring tasks --- .../apache/kafka/streams/processor/internals/StreamThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 188f0585c2470..649b5125f1b7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -574,7 +574,7 @@ void runLoop() { taskManager.tasks() .values() .stream() - .filter(t -> t.state() == Task.State.RUNNING) + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) .collect(Collectors.toSet()) );