Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ private KafkaFutureImpl<Uuid> restoreConsumerInstanceId(final Duration timeout)


private void handleRuntimeException(final RuntimeException runtimeException) {
log.error("An unexpected error occurred within the state updater thread: " + runtimeException);
log.error("An unexpected error occurred within the state updater thread: {}", String.valueOf(runtimeException));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix on the side.

addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);
isRunning.set(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ private void shouldThrowIfAddingTasksWithSameId(final Task task1, final Task tas
stateUpdater.add(task2);

verifyFailedTasks(IllegalStateException.class, task1);
assertFalse(stateUpdater.isRunning());
waitForCondition(
() -> !stateUpdater.isRunning(),
VERIFICATION_TIMEOUT,
"Did not switch to non-running within the given timeout!"
);
}

@Test
Expand Down Expand Up @@ -1015,6 +1019,8 @@ public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
verifyRestoredActiveTasks();
verifyUpdatingTasks(task2);
verifyExceptionsAndFailedTasks();
// shutdown ensures that the test does not end before changelog reader methods verified below are called
stateUpdater.shutdown(Duration.ofMinutes(1));
verify(changelogReader, times(1)).enforceRestoreActive();
verify(changelogReader, times(1)).transitToUpdateStandby();
}
Expand Down Expand Up @@ -1152,6 +1158,8 @@ public void shouldIdleWhenAllTasksPaused() throws Exception {
public void shouldResumeStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldResumeStatefulTask(task);
// shutdown ensures that the test does not end before changelog reader methods verified below are called
stateUpdater.shutdown(Duration.ofMinutes(1));
verify(changelogReader, times(2)).transitToUpdateStandby();
}

Expand Down