diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index addef5a9f1565..bb2f2ddd5b755 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -320,7 +320,7 @@ private KafkaFutureImpl restoreConsumerInstanceId(final Duration timeout) private void handleRuntimeException(final RuntimeException runtimeException) { - log.error("An unexpected error occurred within the state updater thread: " + runtimeException); + log.error("An unexpected error occurred within the state updater thread: {}", String.valueOf(runtimeException)); addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException); isRunning.set(false); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 25f3f0e587f75..adc71ebc116e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -290,7 +290,11 @@ private void shouldThrowIfAddingTasksWithSameId(final Task task1, final Task tas stateUpdater.add(task2); verifyFailedTasks(IllegalStateException.class, task1); - assertFalse(stateUpdater.isRunning()); + waitForCondition( + () -> !stateUpdater.isRunning(), + VERIFICATION_TIMEOUT, + "Did not switch to non-running within the given timeout!" + ); } @Test @@ -1015,6 +1019,8 @@ public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception { verifyRestoredActiveTasks(); verifyUpdatingTasks(task2); verifyExceptionsAndFailedTasks(); + // shutdown ensures that the test does not end before changelog reader methods verified below are called + stateUpdater.shutdown(Duration.ofMinutes(1)); verify(changelogReader, times(1)).enforceRestoreActive(); verify(changelogReader, times(1)).transitToUpdateStandby(); } @@ -1152,6 +1158,8 @@ public void shouldIdleWhenAllTasksPaused() throws Exception { public void shouldResumeStandbyTask() throws Exception { final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldResumeStatefulTask(task); + // shutdown ensures that the test does not end before changelog reader methods verified below are called + stateUpdater.shutdown(Duration.ofMinutes(1)); verify(changelogReader, times(2)).transitToUpdateStandby(); }