From d2f26677f150951d9a6ae3d09ed6a058b936c050 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 14 May 2020 10:21:55 -0700 Subject: [PATCH 1/7] Handle task migrated inside corruption path --- .../processor/internals/StreamThread.java | 41 +++++++----- .../processor/internals/StreamThreadTest.java | 64 +++++++++++++++++++ 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d4079f1228f33..fb73a6656d0ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -555,28 +555,35 @@ void runLoop() { } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); - - taskManager.commit( - taskManager.tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) - .collect(Collectors.toSet()) - ); - taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); + try { + taskManager.commit( + taskManager.tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) + .collect(Collectors.toSet()) + ); + taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); + } catch (final TaskMigratedException taskMigrated) { + handleTaskMigrated(taskMigrated); + } } catch (final TaskMigratedException e) { - log.warn("Detected that the thread is being fenced. " + - "This implies that this thread missed a rebalance and dropped out of the consumer group. " + - "Will close out all assigned tasks and rejoin the consumer group.", e); - - taskManager.handleLostAll(); - mainConsumer.unsubscribe(); - subscribeConsumer(); + handleTaskMigrated(e); } } } + private void handleTaskMigrated(final TaskMigratedException e) { + log.warn("Detected that the thread is being fenced. " + + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + + "Will close out all assigned tasks and rejoin the consumer group.", e); + + taskManager.handleLostAll(); + mainConsumer.unsubscribe(); + subscribeConsumer(); + } + private void subscribeConsumer() { if (builder.usesPatternSubscription()) { mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5542067ffa5ef..1643e03bd9c08 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -107,6 +107,7 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; @@ -1932,6 +1933,69 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { )).anyTimes(); expect(taskManager.commit(singleton(task2))).andReturn(0); + taskManager.handleCorruption(singletonMap(taskId1, emptySet())); + expectLastCall(); + + EasyMock.replay(task1, task2, taskManager); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + null, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger(), + new AtomicLong(Long.MAX_VALUE) + ) { + @Override + void runOnce() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasksWithChangelogs); + } + }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + + thread.setState(StreamThread.State.STARTING); + thread.runLoop(); + + verify(taskManager); + } + + @Test + public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() { + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final Consumer consumer = mock(Consumer.class); + final Task task1 = mock(Task.class); + final Task task2 = mock(Task.class); + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 2); + + final Map> corruptedTasksWithChangelogs = mkMap( + mkEntry(taskId1, emptySet()) + ); + + expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); + expect(task1.id()).andReturn(taskId1).anyTimes(); + expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes(); + expect(task2.id()).andReturn(taskId2).anyTimes(); + + expect(taskManager.tasks()).andReturn(mkMap( + mkEntry(taskId1, task1), + mkEntry(taskId2, task2) + )).anyTimes(); + expect(taskManager.commit(singleton(task2))).andThrow(new TaskMigratedException("Task migrated", + new RuntimeException("non-corrupted task migrated"))); + + taskManager.handleLostAll(); + expectLastCall(); + EasyMock.replay(task1, task2, taskManager); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); From d9b38a6c67b075e3a03a6c314aa8b422bbc59297 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Sun, 17 May 2020 11:59:12 -0700 Subject: [PATCH 2/7] locked task directory --- .../streams/processor/internals/TaskManager.java | 2 ++ .../streams/processor/internals/TaskManagerTest.java | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 28945f784a4b5..fa96338306e51 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -464,6 +464,8 @@ void handleLostAll() { if (task.isActive()) { closeTaskDirty(task); iterator.remove(); + lockedTaskDirectories.remove(task.id()); + try { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); } catch (final RuntimeException e) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 941636e7bd114..920d4cb046f6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -446,7 +446,7 @@ public Map committableOffsetsAndMetadata() { } @Test - public void shouldCloseActiveTasksWhenHandlingLostTasks() { + public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false); @@ -457,6 +457,13 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); expectLastCall().anyTimes(); + makeTaskFolders(taskId00.toString(), taskId01.toString()); + expectLockObtainedFor(taskId00, taskId01); + replay(stateDirectory); + + taskManager.handleRebalanceStart(emptySet()); + assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01))); + // `handleLostAll` activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall(); @@ -473,6 +480,8 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { assertThat(task01.state(), is(Task.State.RUNNING)); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), is(singletonMap(taskId01, task01))); + + assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } @Test From 48a2594dc802ead063f58bec3fa9ff5596661de0 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Mon, 18 May 2020 20:15:54 -0700 Subject: [PATCH 3/7] address comments --- .../streams/processor/internals/TaskManager.java | 5 ++++- .../streams/processor/internals/TaskManagerTest.java | 11 ++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index fa96338306e51..c697dc1d97fb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -464,7 +464,6 @@ void handleLostAll() { if (task.isActive()) { closeTaskDirty(task); iterator.remove(); - lockedTaskDirectories.remove(task.id()); try { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); @@ -518,6 +517,10 @@ public Map getTaskOffsetSums() { * assigned the task as a result of the rebalance). This method should be idempotent. */ private void tryToLockAllNonEmptyTaskDirectories() { + // Always clear the set at the beginning as we're always dealing with the + // current set of actually-locked tasks. + lockedTaskDirectories.clear(); + for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) { try { final TaskId id = TaskId.parse(dir.getName()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 920d4cb046f6d..f8c7deea18f28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -459,6 +459,10 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { makeTaskFolders(taskId00.toString(), taskId01.toString()); expectLockObtainedFor(taskId00, taskId01); + + // The second attempt will return empty tasks. + makeTaskFolders(); + expectLockObtainedFor(); replay(stateDirectory); taskManager.handleRebalanceStart(emptySet()); @@ -481,7 +485,12 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), is(singletonMap(taskId01, task01))); - assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); + // The locked task map will not be cleared. + assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01))); + + taskManager.handleRebalanceStart(emptySet()); + + assertThat(taskManager.lockedTaskDirectories(), is(emptySet())); } @Test From a27de557a86fcd877d7640ce86a39fd313d6758e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2020 13:40:02 -0700 Subject: [PATCH 4/7] log warn --- .../kafka/streams/processor/internals/TaskManager.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index c697dc1d97fb4..cfd865fd352c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -567,6 +567,11 @@ private void releaseLockedUnassignedTaskDirectories() { if (fatalException != null) { throw fatalException; } + + if (!lockedTaskDirectories.isEmpty()) { + log.warn("The following tasks {} are no longer owned by the thread while trying to release their directory " + + "tasks. It could potentially because those tasks are closed due to errors during the rebalance.", lockedTaskDirectories); + } } private long sumOfChangelogOffsets(final TaskId id, final Map changelogOffsets) { From a90e1345f100c3fdf0772cf713caab745f5867b5 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2020 13:42:31 -0700 Subject: [PATCH 5/7] typo --- .../apache/kafka/streams/processor/internals/TaskManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index cfd865fd352c6..4d17cff4e7bb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -570,7 +570,7 @@ private void releaseLockedUnassignedTaskDirectories() { if (!lockedTaskDirectories.isEmpty()) { log.warn("The following tasks {} are no longer owned by the thread while trying to release their directory " + - "tasks. It could potentially because those tasks are closed due to errors during the rebalance.", lockedTaskDirectories); + "locks. It could potentially because those tasks are closed due to errors during the rebalance.", lockedTaskDirectories); } } From 01a5c556c8f93e8daf091cb60880538cfd3f2c8f Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2020 13:44:04 -0700 Subject: [PATCH 6/7] reword a bit, change warn to info --- .../apache/kafka/streams/processor/internals/TaskManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 4d17cff4e7bb5..2216274406e20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -569,8 +569,8 @@ private void releaseLockedUnassignedTaskDirectories() { } if (!lockedTaskDirectories.isEmpty()) { - log.warn("The following tasks {} are no longer owned by the thread while trying to release their directory " + - "locks. It could potentially because those tasks are closed due to errors during the rebalance.", lockedTaskDirectories); + log.info("The following tasks {} are no longer owned by the thread while trying to release their directory " + + "locks. They are closed likely due to errors during the rebalance.", lockedTaskDirectories); } } From 32975420101c4a3f994c00f79aded2883211441a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2020 13:59:00 -0700 Subject: [PATCH 7/7] revert --- .../kafka/streams/processor/internals/TaskManager.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 2216274406e20..c697dc1d97fb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -567,11 +567,6 @@ private void releaseLockedUnassignedTaskDirectories() { if (fatalException != null) { throw fatalException; } - - if (!lockedTaskDirectories.isEmpty()) { - log.info("The following tasks {} are no longer owned by the thread while trying to release their directory " + - "locks. They are closed likely due to errors during the rebalance.", lockedTaskDirectories); - } } private long sumOfChangelogOffsets(final TaskId id, final Map changelogOffsets) {