Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public State state() {
return state;
}

void setPartitionAssignedTime(final long lastPartitionAssignedMs) {
this.lastPartitionAssignedMs = lastPartitionAssignedMs;
}

/**
* Sets the state
*
Expand Down Expand Up @@ -273,6 +277,7 @@ public boolean isRunning() {
private long now;
private long lastPollMs;
private long lastCommitMs;
private long lastPartitionAssignedMs = -1L;
private int numIterations;
private volatile State state = State.CREATED;
private volatile ThreadMetadata threadMetadata;
Expand Down Expand Up @@ -777,7 +782,8 @@ private void initializeAndRestorePhase() {

if (taskManager.tryToCompleteRestoration(now)) {
changelogReader.transitToUpdateStandby();

log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs,
taskManager.tasks().keySet());
setState(State.RUNNING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
}

streamThread.setState(State.PARTITIONS_ASSIGNED);
streamThread.setPartitionAssignedTime(time.milliseconds());
taskManager.handleRebalanceComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ boolean tryToCompleteRestoration(final long now) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.debug(
String.format(
"Could not complete restoration for %s due to the follosing exception; will retry",
"Could not complete restoration for %s due to the following exception; will retry",
task.id()),
timeoutException
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ public class StreamsRebalanceListenerTest {
private final TaskManager taskManager = mock(TaskManager.class);
private final StreamThread streamThread = mock(StreamThread.class);
private final AtomicInteger assignmentErrorCode = new AtomicInteger();
private final MockTime time = new MockTime();
private final StreamsRebalanceListener streamsRebalanceListener = new StreamsRebalanceListener(
new MockTime(),
time,
taskManager,
streamThread,
LoggerFactory.getLogger(StreamsRebalanceListenerTest.class),
Expand Down Expand Up @@ -78,6 +79,7 @@ public void shouldThrowMissingSourceTopicException() {
@Test
public void shouldSwallowVersionProbingError() {
expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(State.PARTITIONS_REVOKED);
streamThread.setPartitionAssignedTime(time.milliseconds());
taskManager.handleRebalanceComplete();
replay(taskManager, streamThread);
assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
Expand Down Expand Up @@ -129,6 +131,8 @@ public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() {
public void shouldHandleAssignedPartitions() {
taskManager.handleRebalanceComplete();
expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING);
streamThread.setPartitionAssignedTime(time.milliseconds());

replay(taskManager, streamThread);
assignmentErrorCode.set(AssignorError.NONE.code());

Expand Down