diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ad9cdbeb7001a..8e9410f1a5428 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -202,14 +202,14 @@ public class KafkaStreams implements AutoCloseable { * | | | * | v | * | +------+-------+ +----+-------+ - * +-----> | Pending |<--- | Error (5) | - * | Shutdown (3) | +------------+ - * +------+-------+ - * | - * v - * +------+-------+ - * | Not | - * | Running (4) | + * +-----> | Pending | | Pending | + * | Shutdown (3) | | Error (5) | + * +------+-------+ +-----+------+ + * | | + * v v + * +------+-------+ +-----+--------+ + * | Not | | Error (6) | + * | Running (4) | +--------------+ * +--------------+ * * @@ -217,9 +217,9 @@ public class KafkaStreams implements AutoCloseable { * Note the following: * - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED or PARTITIONS_ASSIGNED state * - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state - * - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called) + * - Any state except NOT_RUNNING, PENDING_ERROR or ERROR can go to PENDING_SHUTDOWN (whenever close is called) * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then - * the instance will be in the ERROR state. The user will need to close it. + * the instance will be in the ERROR state. The user will not need to close it. */ public enum State { CREATED(1, 3), // 0 @@ -227,7 +227,8 @@ public enum State { RUNNING(1, 2, 3, 5), // 2 PENDING_SHUTDOWN(4), // 3 NOT_RUNNING, // 4 - ERROR(3); // 5 + PENDING_ERROR(6), // 5 + ERROR; // 6 private final Set validTransitions = new HashSet<>(); @@ -290,8 +291,12 @@ private boolean setState(final State newState) { } else if (state == State.REBALANCING && newState == State.REBALANCING) { // when the state is already in REBALANCING, it should not transit to REBALANCING again return false; - } else if (state == State.ERROR && newState == State.ERROR) { - // when the state is already in ERROR, it should not transit to ERROR again + } else if (state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR)) { + // when the state is already in ERROR, its transition to PENDING_ERROR or ERROR (due to consecutive close calls) + return false; + } else if (state == State.PENDING_ERROR && newState != State.ERROR) { + // when the state is already in PENDING_ERROR, all other transitions than ERROR (due to thread dying) will be + // refused but we do not throw exception here, to allow appropriate error handling return false; } else if (!state.isValidTransition(newState)) { throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState); @@ -439,7 +444,7 @@ private void replaceStreamThread(final Throwable throwable) { log.warn("The global thread cannot be replaced. Reverting to shutting down the client."); log.error("Encountered the following exception during processing " + " The streams client is going to shut down now. ", throwable); - close(Duration.ZERO); + closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); threads.remove(deadThread); @@ -469,7 +474,7 @@ private void handleStreamsUncaughtException(final Throwable throwable, log.error("Encountered the following exception during processing " + "and the registered exception handler opted to " + action + "." + " The streams client is going to shut down now. ", throwable); - close(Duration.ZERO); + closeToError(); break; case SHUTDOWN_APPLICATION: if (throwable instanceof Error) { @@ -482,7 +487,7 @@ private void handleStreamsUncaughtException(final Throwable throwable, log.error("Exception in global thread caused the application to attempt to shutdown." + " This action will succeed only if there is at least one StreamThread running on this client." + " Currently there are no running threads so will now close the client."); - close(Duration.ZERO); + closeToError(); } else { processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED)); log.error("Encountered the following exception during processing " + @@ -552,22 +557,6 @@ final class StreamStateListener implements StreamThread.StateListener { this.threadStatesLock = new Object(); } - /** - * If all threads are dead set to ERROR - */ - private void maybeSetError() { - // check if we have at least one thread running - for (final StreamThread.State state : threadState.values()) { - if (state != StreamThread.State.DEAD) { - return; - } - } - - if (setState(State.ERROR)) { - log.error("All stream threads have died. The instance will be in error state and should be closed."); - } - } - /** * If all threads are up, including the global thread, set to RUNNING */ @@ -603,8 +592,6 @@ public synchronized void onChange(final Thread thread, setState(State.REBALANCING); } else if (newState == StreamThread.State.RUNNING) { maybeSetRunning(); - } else if (newState == StreamThread.State.DEAD) { - maybeSetError(); } } else if (thread instanceof GlobalStreamThread) { // global stream thread has different invariants @@ -614,9 +601,8 @@ public synchronized void onChange(final Thread thread, if (newState == GlobalStreamThread.State.RUNNING) { maybeSetRunning(); } else if (newState == GlobalStreamThread.State.DEAD) { - if (setState(State.ERROR)) { - log.error("Global thread has died. The instance will be in error state and should be closed."); - } + log.error("Global thread has died. The streams application or client will now close to ERROR."); + closeToError(); } } } @@ -1204,11 +1190,27 @@ private Thread shutdownHelper(final boolean error) { metrics.close(); if (!error) { setState(State.NOT_RUNNING); + } else { + setState(State.ERROR); } }, "kafka-streams-close-thread"); } private boolean close(final long timeoutMs) { + if (state == State.ERROR) { + log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); + return true; + } + if (state == State.PENDING_ERROR) { + log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); + if (waitOnState(State.ERROR, timeoutMs)) { + log.info("Streams client stopped to ERROR completely"); + return true; + } else { + log.info("Streams client cannot transition to ERROR completely within the timeout"); + return false; + } + } if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN // or NOT_RUNNING already; just check that all threads have been stopped @@ -1230,7 +1232,7 @@ private boolean close(final long timeoutMs) { } private void closeToError() { - if (!setState(State.ERROR)) { + if (!setState(State.PENDING_ERROR)) { log.info("Skipping shutdown since we are already in " + state()); } else { final Thread shutdownThread = shutdownHelper(true); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 12c24aac2ba7f..40470cbc015d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -295,6 +295,7 @@ public boolean isRunning() { private java.util.function.Consumer streamsUncaughtExceptionHandler; private Runnable shutdownErrorHook; private AtomicInteger assignmentErrorCode; + private final ProcessingMode processingMode; public static StreamThread create(final InternalTopologyBuilder builder, final StreamsConfig config, @@ -482,7 +483,7 @@ public StreamThread(final Time time, this.shutdownErrorHook = shutdownErrorHook; this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; this.cacheResizer = cacheResizer; - + this.processingMode = processingMode(config); // The following sensors are created here but their references are not stored in this object, since within // this object they are not recorded. The sensors are created here so that the stream threads starts with all @@ -539,8 +540,7 @@ public void run() { } boolean cleanRun = false; try { - runLoop(); - cleanRun = true; + cleanRun = runLoop(); } finally { completeShutdown(cleanRun); } @@ -552,7 +552,7 @@ public void run() { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void runLoop() { + boolean runLoop() { subscribeConsumer(); // if the thread is still in the middle of a rebalance, we should keep polling @@ -587,11 +587,18 @@ void runLoop() { } failedStreamThreadSensor.record(); this.streamsUncaughtExceptionHandler.accept(e); + if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) { + return false; + } } catch (final Throwable e) { failedStreamThreadSensor.record(); this.streamsUncaughtExceptionHandler.accept(e); + if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) { + return false; + } } } + return true; } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index ddcb138b819c7..f22b3adf44e97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -319,6 +319,17 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin StreamThread.State.PARTITIONS_ASSIGNED); return null; }).anyTimes(); + EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata( + "newThead", + "DEAD", + "", + "", + Collections.emptySet(), + "", + Collections.emptySet(), + Collections.emptySet() + ) + ).anyTimes(); EasyMock.expect(thread.threadMetadata()).andStubReturn(threadMetadata); thread.waitOnThreadState(StreamThread.State.DEAD); EasyMock.expectLastCall().anyTimes(); @@ -428,70 +439,6 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); } - @Test - public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.setStateListener(streamsStateListener); - - Assert.assertEquals(0, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.CREATED, streams.state()); - - streams.start(); - - TestUtils.waitForCondition( - () -> streamsStateListener.numChanges == 2, - "Streams never started."); - Assert.assertEquals(KafkaStreams.State.RUNNING, streams.state()); - - for (final StreamThread thread : streams.threads) { - threadStatelistenerCapture.getValue().onChange( - thread, - StreamThread.State.PARTITIONS_REVOKED, - StreamThread.State.RUNNING); - } - - Assert.assertEquals(3, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, streams.state()); - - threadStatelistenerCapture.getValue().onChange( - streams.threads.get(NUM_THREADS - 1), - StreamThread.State.PENDING_SHUTDOWN, - StreamThread.State.PARTITIONS_REVOKED); - - threadStatelistenerCapture.getValue().onChange( - streams.threads.get(NUM_THREADS - 1), - StreamThread.State.DEAD, - StreamThread.State.PENDING_SHUTDOWN); - - Assert.assertEquals(3, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, streams.state()); - - for (final StreamThread thread : streams.threads) { - if (thread != streams.threads.get(NUM_THREADS - 1)) { - threadStatelistenerCapture.getValue().onChange( - thread, - StreamThread.State.PENDING_SHUTDOWN, - StreamThread.State.PARTITIONS_REVOKED); - - threadStatelistenerCapture.getValue().onChange( - thread, - StreamThread.State.DEAD, - StreamThread.State.PENDING_SHUTDOWN); - } - } - - Assert.assertEquals(4, streamsStateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.ERROR, streams.state()); - - streams.close(); - - // the state should not stuck with ERROR, but transit to NOT_RUNNING in the end - TestUtils.waitForCondition( - () -> streamsStateListener.numChanges == 6, - "Streams never closed."); - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); - } - @Test public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception { final StreamsBuilder builder = getBuilderWithSource(); @@ -535,8 +482,9 @@ public void testStateThreadClose() throws Exception { streams.threads.get(i).join(); } TestUtils.waitForCondition( - () -> streams.state() == KafkaStreams.State.ERROR, - "Streams never stopped."); + () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), + "Streams never stopped" + ); } finally { streams.close(); } @@ -567,12 +515,18 @@ public void testStateGlobalThreadClose() throws Exception { () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, "Thread never stopped."); globalStreamThread.join(); - assertEquals(streams.state(), KafkaStreams.State.ERROR); + TestUtils.waitForCondition( + () -> streams.state() == KafkaStreams.State.PENDING_ERROR, + "Thread never stopped." + ); } finally { streams.close(); } - assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); + TestUtils.waitForCondition( + () -> streams.state() == KafkaStreams.State.ERROR, + "Thread never stopped." + ); } @Test @@ -635,8 +589,7 @@ public void shouldNotAddThreadWhenError() { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); final int oldSize = streams.threads.size(); streams.start(); - streamThreadOne.shutdown(); - streamThreadTwo.shutdown(); + globalStreamThread.shutdown(); assertThat(streams.addStreamThread(), equalTo(Optional.empty())); assertThat(streams.threads.size(), equalTo(oldSize)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index 4cc0268cf8632..67f0c9056b732 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() { } @Test - @SuppressWarnings("deprecation") public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException { final String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore"; STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); @@ -135,7 +134,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE )); final KafkaStreams driver = new KafkaStreams(builder.build(), STREAMS_CONFIG); driver.cleanUp(); - driver.setUncaughtExceptionHandler((t, e) -> { }); driver.start(); final File stateDir = new File(String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0")); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 91609a6575b92..020423b883734 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener; @@ -113,14 +114,7 @@ public static Collection data() { private static final List> CRASH = Collections.unmodifiableList( Collections.singletonList( - KeyValue.pair(KafkaStreams.State.RUNNING, KafkaStreams.State.ERROR) - ) - ); - private static final List> CLOSE_CRASHED = - Collections.unmodifiableList( - Arrays.asList( - KeyValue.pair(KafkaStreams.State.ERROR, KafkaStreams.State.PENDING_SHUTDOWN), - KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING) + KeyValue.pair(State.PENDING_ERROR, State.ERROR) ) ); @@ -141,7 +135,6 @@ public static Collection data() { "there are too many exceptions thrown, please check standard error log for more info."; private final String storeName = "store"; - private final StableAssignmentListener assignmentListener = new StableAssignmentListener(); private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false); @@ -395,11 +388,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { new HashMap<>(committedState) )); verifyUncommitted(expectedUncommittedResult); + waitForStateTransitionContains(stateTransitions1, CRASH); errorInjectedClient1.set(false); stateTransitions1.clear(); streams1Alpha.close(); - waitForStateTransition(stateTransitions1, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } @@ -533,12 +526,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForStateTransition(stateTransitions2, CRASH); + waitForStateTransitionContains(stateTransitions2, CRASH); commitErrorInjectedClient2.set(false); stateTransitions2.clear(); streams2Alpha.close(); - waitForStateTransition(stateTransitions2, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = @@ -624,12 +616,12 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { verifyUncommitted(expectedUncommittedResult); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForStateTransition(stateTransitions1, CRASH); + + waitForStateTransitionContains(stateTransitions1, CRASH); commitErrorInjectedClient1.set(false); stateTransitions1.clear(); streams1Beta.close(); - waitForStateTransition(stateTransitions1, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = @@ -747,11 +739,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { new HashMap<>(committedState) )); verifyUncommitted(expectedUncommittedResult); + waitForStateTransitionContains(stateTransitions2, CRASH); errorInjectedClient2.set(false); stateTransitions2.clear(); streams2AlphaTwo.close(); - waitForStateTransition(stateTransitions2, CLOSE_CRASHED); assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } @@ -856,7 +848,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { } } } - @SuppressWarnings("deprecation") //Thread should no longer die by themselves + private KafkaStreams getKafkaStreams(final String appDir, final String processingGuarantee) { final StreamsBuilder builder = new StreamsBuilder(); @@ -952,7 +944,7 @@ public void close() {} ); final KafkaStreams streams = new KafkaStreams(builder.build(), config, new TestKafkaClientSupplier()); - streams.setUncaughtExceptionHandler((t, e) -> { + streams.setUncaughtExceptionHandler(e -> { if (!injectError) { // we don't expect any exception thrown in stop case e.printStackTrace(System.err); @@ -969,6 +961,7 @@ public void close() {} } exceptionCounts.put(appDir, exceptionCount); } + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; }); return streams; @@ -990,6 +983,19 @@ private void waitForStateTransition(final List observed.equals(expected), MAX_WAIT_TIME_MS, () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + + "Expected transitions: " + expected + ); + } + + private void waitForStateTransitionContains(final List> observed, + final List> expected) + throws Exception { + + waitForCondition( + () -> observed.containsAll(expected), + MAX_WAIT_TIME_MS, + () -> "Client did not have the expected state transition on time. Observers transitions: " + observed + + "Expected transitions: " + expected ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index a8d1663887348..f27ddc3712fa8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -38,11 +38,13 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -67,6 +69,7 @@ * task towards a standby task is safe across restarts of the application. */ @RunWith(Parameterized.class) +@Category(IntegrationTest.class) public class StandbyTaskEOSIntegrationTest { private final static long REBALANCE_TIMEOUT = Duration.ofMinutes(2L).toMillis(); @@ -174,7 +177,6 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, } @Test - @Deprecated public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { final String base = TestUtils.tempDirectory(appId).getPath(); @@ -198,9 +200,6 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1") ) { // start first instance and wait for processing - streamInstanceOne.setUncaughtExceptionHandler((t, e) -> { }); - streamInstanceTwo.setUncaughtExceptionHandler((t, e) -> { }); - streamInstanceOneRecovery.setUncaughtExceptionHandler((t, e) -> { }); startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30)); IntegrationTestUtils.waitUntilMinRecordsReceived( @@ -311,7 +310,7 @@ public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exc ); waitForCondition( () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR, - "Stream instance 1 did not go into error state" + "Stream instance 1 did not go into error state. Is in " + streamInstanceOneRecovery.state() + " state." ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 73f90f8653056..eed626feee21f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -128,8 +128,8 @@ public void shouldShutdownThreadUsingOldHandler() throws InterruptedException { TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time"); // should call the UncaughtExceptionHandler after rebalancing to another thread TestUtils.waitForCondition(() -> counter.get() == 2, DEFAULT_DURATION.toMillis(), "Handler was called 2nd time"); - // the stream should now turn into ERROR state after 2 threads are dead - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); + // there is no threads running but the client is still in running + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(2)); } @@ -145,7 +145,7 @@ public void shouldShutdownClient() throws InterruptedException { StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); produceMessages(0L, inputTopic, "A"); - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); } @@ -194,7 +194,7 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); produceMessages(0L, inputTopic, "A"); - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index a1bb82876154c..6d37a0a392e7d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -526,6 +526,6 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); - assertThat(driver.state(), is(KafkaStreams.State.PENDING_SHUTDOWN)); + waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "Streams didn't transit to ERROR state"); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java index c2b77bc299cd1..29e75243f6527 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; @@ -68,7 +69,6 @@ public void after() throws InterruptedException { } @Test - @Deprecated //A single thread should no longer die public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String())) @@ -88,11 +88,17 @@ public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedExceptio final Topology topology = builder.build(); final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsConfiguration); final AtomicBoolean calledUncaughtExceptionHandler1 = new AtomicBoolean(false); - kafkaStreams1.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler1.set(true)); + kafkaStreams1.setUncaughtExceptionHandler(exception -> { + calledUncaughtExceptionHandler1.set(true); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); kafkaStreams1.start(); final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, streamsConfiguration); final AtomicBoolean calledUncaughtExceptionHandler2 = new AtomicBoolean(false); - kafkaStreams2.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler2.set(true)); + kafkaStreams2.setUncaughtExceptionHandler(exception -> { + calledUncaughtExceptionHandler2.set(true); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); kafkaStreams2.start(); TestUtils.waitForCondition(