From c7412f83e29251fa1bc44e7931c8d9fe0c565a79 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Wed, 9 Dec 2020 12:49:55 -0800 Subject: [PATCH 01/20] init commit, one test is still failing --- .../apache/kafka/streams/KafkaStreams.java | 42 +++++------ .../kafka/streams/KafkaStreamsTest.java | 73 +------------------ .../EOSUncleanShutdownIntegrationTest.java | 4 - .../EosBetaUpgradeIntegrationTest.java | 7 +- .../StandbyTaskEOSIntegrationTest.java | 4 - ...caughtExceptionHandlerIntegrationTest.java | 4 +- .../SuppressionIntegrationTest.java | 3 +- ...ingSourceTopicDeletionIntegrationTest.java | 12 ++- 8 files changed, 36 insertions(+), 113 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ad9cdbeb7001a..3096e6da82692 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -227,7 +227,8 @@ public enum State { RUNNING(1, 2, 3, 5), // 2 PENDING_SHUTDOWN(4), // 3 NOT_RUNNING, // 4 - ERROR(3); // 5 + PENDING_ERROR(6), // 5 + ERROR; // 6 private final Set validTransitions = new HashSet<>(); @@ -469,7 +470,7 @@ private void handleStreamsUncaughtException(final Throwable throwable, log.error("Encountered the following exception during processing " + "and the registered exception handler opted to " + action + "." + " The streams client is going to shut down now. ", throwable); - close(Duration.ZERO); + closeToError(); break; case SHUTDOWN_APPLICATION: if (throwable instanceof Error) { @@ -482,7 +483,7 @@ private void handleStreamsUncaughtException(final Throwable throwable, log.error("Exception in global thread caused the application to attempt to shutdown." + " This action will succeed only if there is at least one StreamThread running on this client." + " Currently there are no running threads so will now close the client."); - close(Duration.ZERO); + closeToError(); } else { processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED)); log.error("Encountered the following exception during processing " + @@ -552,22 +553,6 @@ final class StreamStateListener implements StreamThread.StateListener { this.threadStatesLock = new Object(); } - /** - * If all threads are dead set to ERROR - */ - private void maybeSetError() { - // check if we have at least one thread running - for (final StreamThread.State state : threadState.values()) { - if (state != StreamThread.State.DEAD) { - return; - } - } - - if (setState(State.ERROR)) { - log.error("All stream threads have died. The instance will be in error state and should be closed."); - } - } - /** * If all threads are up, including the global thread, set to RUNNING */ @@ -603,8 +588,6 @@ public synchronized void onChange(final Thread thread, setState(State.REBALANCING); } else if (newState == StreamThread.State.RUNNING) { maybeSetRunning(); - } else if (newState == StreamThread.State.DEAD) { - maybeSetError(); } } else if (thread instanceof GlobalStreamThread) { // global stream thread has different invariants @@ -614,9 +597,8 @@ public synchronized void onChange(final Thread thread, if (newState == GlobalStreamThread.State.RUNNING) { maybeSetRunning(); } else if (newState == GlobalStreamThread.State.DEAD) { - if (setState(State.ERROR)) { - log.error("Global thread has died. The instance will be in error state and should be closed."); - } + log.error("Global thread has died. The streams application or client will now close to ERROR."); + closeToError(); } } } @@ -1204,11 +1186,21 @@ private Thread shutdownHelper(final boolean error) { metrics.close(); if (!error) { setState(State.NOT_RUNNING); + } else { + setState(State.ERROR); } }, "kafka-streams-close-thread"); } private boolean close(final long timeoutMs) { + if (state == State.ERROR) { + log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); + return false; + } + if (state == State.PENDING_ERROR) { + log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); + return false; + } if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN // or NOT_RUNNING already; just check that all threads have been stopped @@ -1230,7 +1222,7 @@ private boolean close(final long timeoutMs) { } private void closeToError() { - if (!setState(State.ERROR)) { + if (!setState(State.PENDING_ERROR)) { log.info("Skipping shutdown since we are already in " + state()); } else { final Thread shutdownThread = shutdownHelper(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index ddcb138b819c7..eb15cdce5a413 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -428,70 +428,6 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); } - @Test - public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.setStateListener(streamsStateListener); - - Assert.assertEquals(0, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.CREATED, streams.state()); - - streams.start(); - - TestUtils.waitForCondition( - () -> streamsStateListener.numChanges == 2, - "Streams never started."); - Assert.assertEquals(KafkaStreams.State.RUNNING, streams.state()); - - for (final StreamThread thread : streams.threads) { - threadStatelistenerCapture.getValue().onChange( - thread, - StreamThread.State.PARTITIONS_REVOKED, - StreamThread.State.RUNNING); - } - - Assert.assertEquals(3, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, streams.state()); - - threadStatelistenerCapture.getValue().onChange( - streams.threads.get(NUM_THREADS - 1), - StreamThread.State.PENDING_SHUTDOWN, - StreamThread.State.PARTITIONS_REVOKED); - - threadStatelistenerCapture.getValue().onChange( - streams.threads.get(NUM_THREADS - 1), - StreamThread.State.DEAD, - StreamThread.State.PENDING_SHUTDOWN); - - Assert.assertEquals(3, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, streams.state()); - - for (final StreamThread thread : streams.threads) { - if (thread != streams.threads.get(NUM_THREADS - 1)) { - threadStatelistenerCapture.getValue().onChange( - thread, - StreamThread.State.PENDING_SHUTDOWN, - StreamThread.State.PARTITIONS_REVOKED); - - threadStatelistenerCapture.getValue().onChange( - thread, - StreamThread.State.DEAD, - StreamThread.State.PENDING_SHUTDOWN); - } - } - - Assert.assertEquals(4, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.ERROR, streams.state()); - - streams.close(); - - // the state should not stuck with ERROR, but transit to NOT_RUNNING in the end - TestUtils.waitForCondition( - () -> streamsStateListener.numChanges == 6, - "Streams never closed."); - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); - } - @Test public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception { final StreamsBuilder builder = getBuilderWithSource(); @@ -535,7 +471,7 @@ public void testStateThreadClose() throws Exception { streams.threads.get(i).join(); } TestUtils.waitForCondition( - () -> streams.state() == KafkaStreams.State.ERROR, + () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never stopped."); } finally { streams.close(); @@ -567,12 +503,12 @@ public void testStateGlobalThreadClose() throws Exception { () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, "Thread never stopped."); globalStreamThread.join(); - assertEquals(streams.state(), KafkaStreams.State.ERROR); + assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR); } finally { streams.close(); } - assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); + assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR); } @Test @@ -635,8 +571,7 @@ public void shouldNotAddThreadWhenError() { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); final int oldSize = streams.threads.size(); streams.start(); - streamThreadOne.shutdown(); - streamThreadTwo.shutdown(); + globalStreamThread.shutdown(); assertThat(streams.addStreamThread(), equalTo(Optional.empty())); assertThat(streams.threads.size(), equalTo(oldSize)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index 4cc0268cf8632..dc1875efa260d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -154,9 +153,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL, "Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get()); } finally { - TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR), - "Expected ERROR state but driver is on " + driver.state()); - driver.close(); // the state directory should still exist with the empty checkpoint file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 91609a6575b92..782fe7c0b9721 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -113,13 +113,13 @@ public static Collection data() { private static final List> CRASH = Collections.unmodifiableList( Collections.singletonList( - KeyValue.pair(KafkaStreams.State.RUNNING, KafkaStreams.State.ERROR) + KeyValue.pair(KafkaStreams.State.RUNNING, State.PENDING_ERROR) ) ); private static final List> CLOSE_CRASHED = Collections.unmodifiableList( Arrays.asList( - KeyValue.pair(KafkaStreams.State.ERROR, KafkaStreams.State.PENDING_SHUTDOWN), + KeyValue.pair(State.RUNNING, KafkaStreams.State.PENDING_SHUTDOWN), KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING) ) ); @@ -533,8 +533,6 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForStateTransition(stateTransitions2, CRASH); - commitErrorInjectedClient2.set(false); stateTransitions2.clear(); streams2Alpha.close(); @@ -624,7 +622,6 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { verifyUncommitted(expectedUncommittedResult); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForStateTransition(stateTransitions1, CRASH); commitErrorInjectedClient1.set(false); stateTransitions1.clear(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index a8d1663887348..4b771b73ee470 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -174,7 +174,6 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, } @Test - @Deprecated public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { final String base = TestUtils.tempDirectory(appId).getPath(); @@ -198,9 +197,6 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1") ) { // start first instance and wait for processing - streamInstanceOne.setUncaughtExceptionHandler((t, e) -> { }); - streamInstanceTwo.setUncaughtExceptionHandler((t, e) -> { }); - streamInstanceOneRecovery.setUncaughtExceptionHandler((t, e) -> { }); startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30)); IntegrationTestUtils.waitUntilMinRecordsReceived( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 73f90f8653056..11dfdb907a296 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -129,7 +129,7 @@ public void shouldShutdownThreadUsingOldHandler() throws InterruptedException { // should call the UncaughtExceptionHandler after rebalancing to another thread TestUtils.waitForCondition(() -> counter.get() == 2, DEFAULT_DURATION.toMillis(), "Handler was called 2nd time"); // the stream should now turn into ERROR state after 2 threads are dead - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(2)); } @@ -145,7 +145,7 @@ public void shouldShutdownClient() throws InterruptedException { StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); produceMessages(0L, inputTopic, "A"); - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index a1bb82876154c..8aba3ec38d71f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -526,6 +526,7 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); - assertThat(driver.state(), is(KafkaStreams.State.PENDING_SHUTDOWN)); + assertThat(driver.state(), is(KafkaStreams.State.PENDING_ERROR)); + waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "finish shutdown"); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java index c2b77bc299cd1..29e75243f6527 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; @@ -68,7 +69,6 @@ public void after() throws InterruptedException { } @Test - @Deprecated //A single thread should no longer die public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String())) @@ -88,11 +88,17 @@ public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedExceptio final Topology topology = builder.build(); final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsConfiguration); final AtomicBoolean calledUncaughtExceptionHandler1 = new AtomicBoolean(false); - kafkaStreams1.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler1.set(true)); + kafkaStreams1.setUncaughtExceptionHandler(exception -> { + calledUncaughtExceptionHandler1.set(true); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); kafkaStreams1.start(); final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, streamsConfiguration); final AtomicBoolean calledUncaughtExceptionHandler2 = new AtomicBoolean(false); - kafkaStreams2.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler2.set(true)); + kafkaStreams2.setUncaughtExceptionHandler(exception -> { + calledUncaughtExceptionHandler2.set(true); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); kafkaStreams2.start(); TestUtils.waitForCondition( From 42aa534515309ee728e4c5019668673c8be6a364 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 08:23:32 -0800 Subject: [PATCH 02/20] debug --- .../StandbyTaskEOSIntegrationTest.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 4b771b73ee470..8209f6cf91cd9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -38,11 +38,13 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -67,6 +69,7 @@ * task towards a standby task is safe across restarts of the application. */ @RunWith(Parameterized.class) +@Category(IntegrationTest.class) public class StandbyTaskEOSIntegrationTest { private final static long REBALANCE_TIMEOUT = Duration.ofMinutes(2L).toMillis(); @@ -175,6 +178,7 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, @Test public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { + final long time = System.currentTimeMillis(); final String base = TestUtils.tempDirectory(appId).getPath(); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( @@ -188,7 +192,7 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc IntegerSerializer.class, new Properties() ), - 10L + 10L + time ); try ( @@ -244,7 +248,7 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc IntegerSerializer.class, new Properties() ), - 10L + 10L + time ); waitForCondition( () -> streamInstanceOne.state() == KafkaStreams.State.ERROR, @@ -290,6 +294,16 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc "Could not get key from recovered main store" ); +// waitForCondition( +// () -> streamInstanceOneRecovery.store( +// StoreQueryParameters.fromNameAndType( +// storeName, +// QueryableStoreTypes.keyValueStore() +// ).enableStaleStores() +// ).get(KEY_1) != null, +// "Could not get key from recovered standby store" +// ); + // re-inject poison pill and wait for crash of first instance skipRecord.set(false); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( @@ -303,11 +317,11 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc IntegerSerializer.class, new Properties() ), - 10L + 10L + time ); waitForCondition( () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR, - "Stream instance 1 did not go into error state" + "Stream instance 1 did not go into error state. Is in " + streamInstanceOneRecovery.state() + " state." ); } } From 7c3e014d30b40d7658fe8253d43d7fb3088354e3 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 12:39:42 -0800 Subject: [PATCH 03/20] fix state store wipe --- .../org/apache/kafka/streams/KafkaStreams.java | 8 ++++++-- .../streams/processor/internals/StreamThread.java | 15 +++++++++++---- .../integration/SuppressionIntegrationTest.java | 1 - 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 3096e6da82692..68eb784592fd7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -291,8 +291,12 @@ private boolean setState(final State newState) { } else if (state == State.REBALANCING && newState == State.REBALANCING) { // when the state is already in REBALANCING, it should not transit to REBALANCING again return false; - } else if (state == State.ERROR && newState == State.ERROR) { - // when the state is already in ERROR, it should not transit to ERROR again + } else if (state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR)) { + // when the state is already in ERROR, its transition to PENDING_ERROR or ERROR (due to consecutive close calls) + return false; + } else if (state == State.PENDING_ERROR && newState != State.ERROR) { + // when the state is already in PENDING_ERROR, all other transitions than ERROR (due to thread dying) will be + // refused but we do not throw exception here, to allow appropriate error handling return false; } else if (!state.isValidTransition(newState)) { throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 12c24aac2ba7f..40470cbc015d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -295,6 +295,7 @@ public boolean isRunning() { private java.util.function.Consumer streamsUncaughtExceptionHandler; private Runnable shutdownErrorHook; private AtomicInteger assignmentErrorCode; + private final ProcessingMode processingMode; public static StreamThread create(final InternalTopologyBuilder builder, final StreamsConfig config, @@ -482,7 +483,7 @@ public StreamThread(final Time time, this.shutdownErrorHook = shutdownErrorHook; this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; this.cacheResizer = cacheResizer; - + this.processingMode = processingMode(config); // The following sensors are created here but their references are not stored in this object, since within // this object they are not recorded. The sensors are created here so that the stream threads starts with all @@ -539,8 +540,7 @@ public void run() { } boolean cleanRun = false; try { - runLoop(); - cleanRun = true; + cleanRun = runLoop(); } finally { completeShutdown(cleanRun); } @@ -552,7 +552,7 @@ public void run() { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void runLoop() { + boolean runLoop() { subscribeConsumer(); // if the thread is still in the middle of a rebalance, we should keep polling @@ -587,11 +587,18 @@ void runLoop() { } failedStreamThreadSensor.record(); this.streamsUncaughtExceptionHandler.accept(e); + if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) { + return false; + } } catch (final Throwable e) { failedStreamThreadSensor.record(); this.streamsUncaughtExceptionHandler.accept(e); + if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) { + return false; + } } } + return true; } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 8aba3ec38d71f..c18d08cc1386a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -526,7 +526,6 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); - assertThat(driver.state(), is(KafkaStreams.State.PENDING_ERROR)); waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "finish shutdown"); } } From ae8ea329e0302621f1085435d18e4ebcb60750a0 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 13:00:15 -0800 Subject: [PATCH 04/20] undo dep --- .../test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 3 --- .../integration/EOSUncleanShutdownIntegrationTest.java | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index eb15cdce5a413..d281f58359d44 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -470,9 +470,6 @@ public void testStateThreadClose() throws Exception { "Thread never stopped."); streams.threads.get(i).join(); } - TestUtils.waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never stopped."); } finally { streams.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index dc1875efa260d..11300c85f0436 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -98,7 +99,6 @@ public static void setupConfigsAndUtils() { } @Test - @SuppressWarnings("deprecation") public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException { final String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore"; STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); @@ -134,7 +134,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE )); final KafkaStreams driver = new KafkaStreams(builder.build(), STREAMS_CONFIG); driver.cleanUp(); - driver.setUncaughtExceptionHandler((t, e) -> { }); driver.start(); final File stateDir = new File(String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0")); @@ -153,6 +152,8 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL, "Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get()); } finally { + TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR), + "Expected ERROR state but driver is on " + driver.state()); driver.close(); // the state directory should still exist with the empty checkpoint file From 9aff9a7564156cbd77fdffc2610a90ac6ef5cd41 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 15:32:27 -0800 Subject: [PATCH 05/20] undo dep --- .../EosBetaUpgradeIntegrationTest.java | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 782fe7c0b9721..5d6d637f7a4a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener; @@ -116,13 +117,6 @@ public static Collection data() { KeyValue.pair(KafkaStreams.State.RUNNING, State.PENDING_ERROR) ) ); - private static final List> CLOSE_CRASHED = - Collections.unmodifiableList( - Arrays.asList( - KeyValue.pair(State.RUNNING, KafkaStreams.State.PENDING_SHUTDOWN), - KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING) - ) - ); @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( @@ -395,11 +389,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { new HashMap<>(committedState) )); verifyUncommitted(expectedUncommittedResult); + waitForStateTransitionContains(stateTransitions1, CRASH); errorInjectedClient1.set(false); stateTransitions1.clear(); streams1Alpha.close(); - waitForStateTransition(stateTransitions1, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } @@ -533,10 +527,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); + waitForStateTransitionContains(stateTransitions2, CRASH); + commitErrorInjectedClient2.set(false); stateTransitions2.clear(); streams2Alpha.close(); - waitForStateTransition(stateTransitions2, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = @@ -623,10 +618,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); + waitForStateTransitionContains(stateTransitions1, CRASH); + commitErrorInjectedClient1.set(false); stateTransitions1.clear(); streams1Beta.close(); - waitForStateTransition(stateTransitions1, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = @@ -744,11 +740,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { new HashMap<>(committedState) )); verifyUncommitted(expectedUncommittedResult); + waitForStateTransitionContains(stateTransitions2, CRASH); errorInjectedClient2.set(false); stateTransitions2.clear(); streams2AlphaTwo.close(); - waitForStateTransition(stateTransitions2, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } @@ -853,7 +849,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { } } } - @SuppressWarnings("deprecation") //Thread should no longer die by themselves + private KafkaStreams getKafkaStreams(final String appDir, final String processingGuarantee) { final StreamsBuilder builder = new StreamsBuilder(); @@ -949,7 +945,7 @@ public void close() {} ); final KafkaStreams streams = new KafkaStreams(builder.build(), config, new TestKafkaClientSupplier()); - streams.setUncaughtExceptionHandler((t, e) -> { + streams.setUncaughtExceptionHandler(e -> { if (!injectError) { // we don't expect any exception thrown in stop case e.printStackTrace(System.err); @@ -966,6 +962,7 @@ public void close() {} } exceptionCounts.put(appDir, exceptionCount); } + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; }); return streams; @@ -990,6 +987,17 @@ private void waitForStateTransition(final List> observed, + final List> expected) + throws Exception { + + waitForCondition( + () -> observed.containsAll(expected), + MAX_WAIT_TIME_MS, + () -> "Client did not startup on time. Observers transitions: " + observed + ); + } + private List> prepareData(final long fromInclusive, final long toExclusive, final Long... keys) { From 836e9aeadc1b865452f2f7ed8312b5aaf7db5683 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 15:48:21 -0800 Subject: [PATCH 06/20] quick fixes --- .../integration/StandbyTaskEOSIntegrationTest.java | 10 ---------- ...StreamsUncaughtExceptionHandlerIntegrationTest.java | 2 +- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 8209f6cf91cd9..2506ed7f0314a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -294,16 +294,6 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc "Could not get key from recovered main store" ); -// waitForCondition( -// () -> streamInstanceOneRecovery.store( -// StoreQueryParameters.fromNameAndType( -// storeName, -// QueryableStoreTypes.keyValueStore() -// ).enableStaleStores() -// ).get(KEY_1) != null, -// "Could not get key from recovered standby store" -// ); - // re-inject poison pill and wait for crash of first instance skipRecord.set(false); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 11dfdb907a296..e922e9d9e4323 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -128,7 +128,7 @@ public void shouldShutdownThreadUsingOldHandler() throws InterruptedException { TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time"); // should call the UncaughtExceptionHandler after rebalancing to another thread TestUtils.waitForCondition(() -> counter.get() == 2, DEFAULT_DURATION.toMillis(), "Handler was called 2nd time"); - // the stream should now turn into ERROR state after 2 threads are dead + // there is no threads running but the client is still in running waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(2)); From 280dab10ec1e0b66af9ebcc7d6c80325cd5c0959 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Fri, 11 Dec 2020 13:24:06 -0800 Subject: [PATCH 07/20] rebase replace thread so global threads close correctly --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- .../StreamsUncaughtExceptionHandlerIntegrationTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 68eb784592fd7..a0104237c3422 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -444,7 +444,7 @@ private void replaceStreamThread(final Throwable throwable) { log.warn("The global thread cannot be replaced. Reverting to shutting down the client."); log.error("Encountered the following exception during processing " + " The streams client is going to shut down now. ", throwable); - close(Duration.ZERO); + closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); threads.remove(deadThread); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index e922e9d9e4323..eed626feee21f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -194,7 +194,7 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); produceMessages(0L, inputTopic, "A"); - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); } From 4e0b5659b5dd6a61e45fe47c8401bd6ed6408850 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Wed, 9 Dec 2020 12:49:55 -0800 Subject: [PATCH 08/20] init commit, one test is still failing --- .../test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 3 +++ .../kafka/streams/integration/SuppressionIntegrationTest.java | 1 + 2 files changed, 4 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index d281f58359d44..eb15cdce5a413 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -470,6 +470,9 @@ public void testStateThreadClose() throws Exception { "Thread never stopped."); streams.threads.get(i).join(); } + TestUtils.waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never stopped."); } finally { streams.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index c18d08cc1386a..8aba3ec38d71f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -526,6 +526,7 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); + assertThat(driver.state(), is(KafkaStreams.State.PENDING_ERROR)); waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "finish shutdown"); } } From 5f73045da8a02d287bceb4d8b5deee96e96fe03b Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 08:23:32 -0800 Subject: [PATCH 09/20] debug --- .../integration/StandbyTaskEOSIntegrationTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 2506ed7f0314a..8209f6cf91cd9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -294,6 +294,16 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc "Could not get key from recovered main store" ); +// waitForCondition( +// () -> streamInstanceOneRecovery.store( +// StoreQueryParameters.fromNameAndType( +// storeName, +// QueryableStoreTypes.keyValueStore() +// ).enableStaleStores() +// ).get(KEY_1) != null, +// "Could not get key from recovered standby store" +// ); + // re-inject poison pill and wait for crash of first instance skipRecord.set(false); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( From e1b6bb08aafaf73edee5bc420d924b69aab76281 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 12:39:42 -0800 Subject: [PATCH 10/20] fix state store wipe --- .../kafka/streams/integration/SuppressionIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 8aba3ec38d71f..c18d08cc1386a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -526,7 +526,6 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); - assertThat(driver.state(), is(KafkaStreams.State.PENDING_ERROR)); waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "finish shutdown"); } } From 468a0a62d49a51ed71887bb349913fefb64fa55a Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 13:00:15 -0800 Subject: [PATCH 11/20] undo dep --- .../test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index eb15cdce5a413..d281f58359d44 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -470,9 +470,6 @@ public void testStateThreadClose() throws Exception { "Thread never stopped."); streams.threads.get(i).join(); } - TestUtils.waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never stopped."); } finally { streams.close(); } From 960365970af371dcff7cc08dfdc68ea7654ef496 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 15:32:27 -0800 Subject: [PATCH 12/20] undo dep --- .../streams/integration/EOSUncleanShutdownIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index 11300c85f0436..67f0c9056b732 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -153,7 +153,8 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE "Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get()); } finally { TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR), - "Expected ERROR state but driver is on " + driver.state()); + "Expected ERROR state but driver is on " + driver.state()); + driver.close(); // the state directory should still exist with the empty checkpoint file From 0304647545c089b9572700625679a9461fee194b Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 10 Dec 2020 15:48:21 -0800 Subject: [PATCH 13/20] quick fixes --- .../integration/StandbyTaskEOSIntegrationTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 8209f6cf91cd9..2506ed7f0314a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -294,16 +294,6 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc "Could not get key from recovered main store" ); -// waitForCondition( -// () -> streamInstanceOneRecovery.store( -// StoreQueryParameters.fromNameAndType( -// storeName, -// QueryableStoreTypes.keyValueStore() -// ).enableStaleStores() -// ).get(KEY_1) != null, -// "Could not get key from recovered standby store" -// ); - // re-inject poison pill and wait for crash of first instance skipRecord.set(false); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( From a0a51d94ac65d4218572185c8c6d1cb44d755344 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Mon, 4 Jan 2021 10:05:49 -0800 Subject: [PATCH 14/20] address comments --- .../apache/kafka/streams/KafkaStreams.java | 22 +++++++++---------- .../kafka/streams/KafkaStreamsTest.java | 15 +++++++++++++ .../SuppressionIntegrationTest.java | 2 +- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index a0104237c3422..3e3d6455e1c4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -202,14 +202,14 @@ public class KafkaStreams implements AutoCloseable { * | | | * | v | * | +------+-------+ +----+-------+ - * +-----> | Pending |<--- | Error (5) | - * | Shutdown (3) | +------------+ - * +------+-------+ - * | - * v - * +------+-------+ - * | Not | - * | Running (4) | + * +-----> | Pending | | Pending | + * | Shutdown (3) | | Error (5) | + * +------+-------+ +-----+------+ + * | | + * v v + * +------+-------+ +-----+--------+ + * | Not | | Error (6) | + * | Running (4) | +--------------+ * +--------------+ * * @@ -217,9 +217,9 @@ public class KafkaStreams implements AutoCloseable { * Note the following: * - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED or PARTITIONS_ASSIGNED state * - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state - * - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called) + * - Any state except NOT_RUNNING, PENDING_ERROR or ERROR can go to PENDING_SHUTDOWN (whenever close is called) * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then - * the instance will be in the ERROR state. The user will need to close it. + * the instance will be in the ERROR state. The user will not need to close it. */ public enum State { CREATED(1, 3), // 0 @@ -228,7 +228,7 @@ public enum State { PENDING_SHUTDOWN(4), // 3 NOT_RUNNING, // 4 PENDING_ERROR(6), // 5 - ERROR; // 6 + ERROR; // 6 private final Set validTransitions = new HashSet<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index d281f58359d44..6c5f90db6710f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -319,6 +319,17 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin StreamThread.State.PARTITIONS_ASSIGNED); return null; }).anyTimes(); + EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata( + "newThead", + "DEAD", + "", + "", + Collections.emptySet(), + "", + Collections.emptySet(), + Collections.emptySet() + ) + ).anyTimes(); EasyMock.expect(thread.threadMetadata()).andStubReturn(threadMetadata); thread.waitOnThreadState(StreamThread.State.DEAD); EasyMock.expectLastCall().anyTimes(); @@ -470,6 +481,10 @@ public void testStateThreadClose() throws Exception { "Thread never stopped."); streams.threads.get(i).join(); } + TestUtils.waitForCondition( + () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), + "Streams never stopped" + ); } finally { streams.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index c18d08cc1386a..6d37a0a392e7d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -526,6 +526,6 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); - waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "finish shutdown"); + waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "Streams didn't transit to ERROR state"); } } From 35241f4494836fc30c01795322a053b0735ccaf7 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Tue, 12 Jan 2021 18:50:58 -0800 Subject: [PATCH 15/20] returned checks --- .../streams/integration/EosBetaUpgradeIntegrationTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 5d6d637f7a4a1..cec518b038451 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -114,7 +114,7 @@ public static Collection data() { private static final List> CRASH = Collections.unmodifiableList( Collections.singletonList( - KeyValue.pair(KafkaStreams.State.RUNNING, State.PENDING_ERROR) + KeyValue.pair(State.PENDING_ERROR, State.ERROR) ) ); @@ -135,7 +135,6 @@ public static Collection data() { "there are too many exceptions thrown, please check standard error log for more info."; private final String storeName = "store"; - private final StableAssignmentListener assignmentListener = new StableAssignmentListener(); private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false); @@ -983,12 +982,12 @@ private void waitForStateTransition(final List observed.equals(expected), MAX_WAIT_TIME_MS, - () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + () -> "Client did not startup on time. Observers transitions: " + observed ); } private void waitForStateTransitionContains(final List> observed, - final List> expected) + final List> expected) throws Exception { waitForCondition( From 8078e962ba05d37251d11c0960df223fdfa24cfd Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Tue, 19 Jan 2021 14:08:54 -0800 Subject: [PATCH 16/20] update fail msg --- .../streams/integration/EosBetaUpgradeIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index cec518b038451..7b6ae6a454ddc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -971,7 +971,7 @@ private void waitForRunning(final List !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING), MAX_WAIT_TIME_MS, - () -> "Client did not startup on time. Observers transitions: " + observed + () -> "Client did not have the expected state transition on time. Observers transitions: " + observed ); } @@ -982,7 +982,7 @@ private void waitForStateTransition(final List observed.equals(expected), MAX_WAIT_TIME_MS, - () -> "Client did not startup on time. Observers transitions: " + observed + () -> "Client did not have the expected state transition on time. Observers transitions: " + observed ); } From de99a7c297143372eccfc35bc48752d2bfbd2e8f Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Tue, 19 Jan 2021 14:11:09 -0800 Subject: [PATCH 17/20] update fail msg --- .../streams/integration/EosBetaUpgradeIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 7b6ae6a454ddc..8ca4b9ac9327c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -993,7 +993,7 @@ private void waitForStateTransitionContains(final List observed.containsAll(expected), MAX_WAIT_TIME_MS, - () -> "Client did not startup on time. Observers transitions: " + observed + () -> "Client did not have the expected state transition on time. Observers transitions: " + observed ); } From 93beb2f2437365270c5526946d51e3bb4961dd9c Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 21 Jan 2021 09:45:15 -0800 Subject: [PATCH 18/20] close semantics --- .../java/org/apache/kafka/streams/KafkaStreams.java | 10 ++++++++-- .../org/apache/kafka/streams/KafkaStreamsTest.java | 10 ++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 3e3d6455e1c4f..8e9410f1a5428 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1199,11 +1199,17 @@ private Thread shutdownHelper(final boolean error) { private boolean close(final long timeoutMs) { if (state == State.ERROR) { log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); - return false; + return true; } if (state == State.PENDING_ERROR) { log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); - return false; + if (waitOnState(State.ERROR, timeoutMs)) { + log.info("Streams client stopped to ERROR completely"); + return true; + } else { + log.info("Streams client cannot transition to ERROR completely within the timeout"); + return false; + } } if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 6c5f90db6710f..f22b3adf44e97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -515,12 +515,18 @@ public void testStateGlobalThreadClose() throws Exception { () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, "Thread never stopped."); globalStreamThread.join(); - assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR); + TestUtils.waitForCondition( + () -> streams.state() == KafkaStreams.State.PENDING_ERROR, + "Thread never stopped." + ); } finally { streams.close(); } - assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR); + TestUtils.waitForCondition( + () -> streams.state() == KafkaStreams.State.ERROR, + "Thread never stopped." + ); } @Test From 034520223141396471406feb1f3786f91fca822d Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 21 Jan 2021 14:10:59 -0800 Subject: [PATCH 19/20] nits --- .../streams/integration/EosBetaUpgradeIntegrationTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 8ca4b9ac9327c..020423b883734 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -971,7 +971,7 @@ private void waitForRunning(final List !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING), MAX_WAIT_TIME_MS, - () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + () -> "Client did not startup on time. Observers transitions: " + observed ); } @@ -983,17 +983,19 @@ private void waitForStateTransition(final List observed.equals(expected), MAX_WAIT_TIME_MS, () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + + "Expected transitions: " + expected ); } private void waitForStateTransitionContains(final List> observed, - final List> expected) + final List> expected) throws Exception { waitForCondition( () -> observed.containsAll(expected), MAX_WAIT_TIME_MS, () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + + "Expected transitions: " + expected ); } From e235b62f67fd1b2ab2323750f748e28ad87f9a98 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Thu, 21 Jan 2021 14:43:21 -0800 Subject: [PATCH 20/20] extract system time change fix --- .../streams/integration/StandbyTaskEOSIntegrationTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 2506ed7f0314a..f27ddc3712fa8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -178,7 +178,6 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, @Test public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { - final long time = System.currentTimeMillis(); final String base = TestUtils.tempDirectory(appId).getPath(); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( @@ -192,7 +191,7 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc IntegerSerializer.class, new Properties() ), - 10L + time + 10L ); try ( @@ -248,7 +247,7 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc IntegerSerializer.class, new Properties() ), - 10L + time + 10L ); waitForCondition( () -> streamInstanceOne.state() == KafkaStreams.State.ERROR, @@ -307,7 +306,7 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc IntegerSerializer.class, new Properties() ), - 10L + time + 10L ); waitForCondition( () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR,