From af05a97c9d9749d63ae7d712dd6ab2ffe634e841 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 18 Dec 2024 11:58:39 +0100 Subject: [PATCH] MINOR: Fix flaky state updater test The tests are flaky because the tests end before the verified calls are executed. This happens because the state updater thread executes the verified calls, but the thread that executes the tests with the verifications is a different thread. This commit fixes the flaky tests by enusring that the calls were performed by the state updater by either shutting down the state updater or waiting for the condition. --- .../processor/internals/DefaultStateUpdater.java | 2 +- .../processor/internals/DefaultStateUpdaterTest.java | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index addef5a9f1565..bb2f2ddd5b755 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -320,7 +320,7 @@ private KafkaFutureImpl restoreConsumerInstanceId(final Duration timeout) private void handleRuntimeException(final RuntimeException runtimeException) { - log.error("An unexpected error occurred within the state updater thread: " + runtimeException); + log.error("An unexpected error occurred within the state updater thread: {}", String.valueOf(runtimeException)); addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException); isRunning.set(false); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 25f3f0e587f75..adc71ebc116e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -290,7 +290,11 @@ private void shouldThrowIfAddingTasksWithSameId(final Task task1, final Task tas stateUpdater.add(task2); verifyFailedTasks(IllegalStateException.class, task1); - assertFalse(stateUpdater.isRunning()); + waitForCondition( + () -> !stateUpdater.isRunning(), + VERIFICATION_TIMEOUT, + "Did not switch to non-running within the given timeout!" + ); } @Test @@ -1015,6 +1019,8 @@ public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception { verifyRestoredActiveTasks(); verifyUpdatingTasks(task2); verifyExceptionsAndFailedTasks(); + // shutdown ensures that the test does not end before changelog reader methods verified below are called + stateUpdater.shutdown(Duration.ofMinutes(1)); verify(changelogReader, times(1)).enforceRestoreActive(); verify(changelogReader, times(1)).transitToUpdateStandby(); } @@ -1152,6 +1158,8 @@ public void shouldIdleWhenAllTasksPaused() throws Exception { public void shouldResumeStandbyTask() throws Exception { final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldResumeStatefulTask(task); + // shutdown ensures that the test does not end before changelog reader methods verified below are called + stateUpdater.shutdown(Duration.ofMinutes(1)); verify(changelogReader, times(2)).transitToUpdateStandby(); }