From c27a0ffb26322a78f14ad4fd205b26abfea5d3f6 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jun 2020 15:42:56 -0700 Subject: [PATCH 1/3] validateClean --- .../processor/internals/AbstractTask.java | 1 + .../internals/ActiveTaskCreator.java | 2 +- .../processor/internals/StandbyTask.java | 2 +- .../internals/StandbyTaskCreator.java | 2 +- .../processor/internals/StreamTask.java | 21 ++++++++++++------- .../streams/processor/internals/Task.java | 2 +- .../processor/internals/StandbyTaskTest.java | 6 +++--- .../processor/internals/StreamTaskTest.java | 10 ++++----- .../processor/internals/TaskManagerTest.java | 2 +- 9 files changed, 27 insertions(+), 21 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 044825a21f607..d7c7bd5b8a26a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -108,4 +108,5 @@ public void update(final Set topicPartitions, final Map extractPartitionTimes() { @Override public void closeClean() { + validateClean(); streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(true); log.info("Closed clean"); @@ -482,7 +483,8 @@ public void update(final Set topicPartitions, final Map task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED // Currently, there are no metrics registered for standby tasks. // This is a regression test so that, if we add some, we will be sure to deregister them. diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c6ffe74e6b8ce..01fb2bbafd168 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1761,7 +1761,7 @@ public void shouldUnregisterMetricsInCloseAndRecycle() { task.suspend(); assertThat(getTaskMetrics(), not(empty())); - task.closeAndRecycleState(); + task.closeCleanAndRecycleState(); assertThat(getTaskMetrics(), empty()); } @@ -1805,16 +1805,16 @@ public void shouldOnlyRecycleSuspendedTasks() { EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig(false, "100"), true); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING task.completeRestoration(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED EasyMock.verify(stateManager, recordCollector); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index fcfbb1f455672..23166d14d084b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2723,7 +2723,7 @@ public void closeDirty() { } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { transitionTo(State.CLOSED); } From 3c8967f8f336c653a56575d2c29c15db4b16d33a Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jun 2020 15:51:03 -0700 Subject: [PATCH 2/3] add unit tests --- .../processor/internals/AbstractTask.java | 1 - .../processor/internals/StreamTaskTest.java | 25 ++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index d7c7bd5b8a26a..044825a21f607 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -108,5 +108,4 @@ public void update(final Set topicPartitions, final Map task.closeClean()); + } + + @Test + public void shouldThrowIfRecyclingDirtyTask() { + task = createStatefulTask(createConfig(false, "100"), true); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + task.process(0L); + assertTrue(task.commitNeeded()); + + assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState()); + } + @Test public void shouldOnlyRecycleSuspendedTasks() { stateManager.recycle(); From fc71861dfcb093d5d430f8e7a3fa8786c1d12fb1 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jun 2020 15:55:21 -0700 Subject: [PATCH 3/3] fix test setup --- .../kafka/streams/processor/internals/StreamTaskTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1e55afd799e8f..960747047cfc4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1801,7 +1801,9 @@ public void shouldUpdatePartitions() { @Test public void shouldThrowIfCleanClosingDirtyTask() { - task = createStatefulTask(createConfig(false, "100"), true); + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); task.process(0L); @@ -1812,7 +1814,9 @@ public void shouldThrowIfCleanClosingDirtyTask() { @Test public void shouldThrowIfRecyclingDirtyTask() { - task = createStatefulTask(createConfig(false, "100"), true); + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); task.process(0L);