From 8206b1ce959e6f9cb403331cff9d7dc74e044b4e Mon Sep 17 00:00:00 2001 From: abbccdda Date: Tue, 5 Jan 2021 21:03:11 -0800 Subject: [PATCH 1/2] add restoration time tracking --- .../kafka/streams/processor/internals/StreamThread.java | 8 +++++++- .../processor/internals/StreamsRebalanceListener.java | 1 + .../kafka/streams/processor/internals/TaskManager.java | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 0d6a42ba98e44..3aaa9239312ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -193,6 +193,10 @@ public State state() { return state; } + void setPartitionAssignedTime(final long lastPartitionAssignedMs) { + this.lastPartitionAssignedMs = lastPartitionAssignedMs; + } + /** * Sets the state * @@ -273,6 +277,7 @@ public boolean isRunning() { private long now; private long lastPollMs; private long lastCommitMs; + private long lastPartitionAssignedMs = 0; private int numIterations; private volatile State state = State.CREATED; private volatile ThreadMetadata threadMetadata; @@ -777,7 +782,8 @@ private void initializeAndRestorePhase() { if (taskManager.tryToCompleteRestoration(now)) { changelogReader.transitToUpdateStandby(); - + log.info("Restoration completed for all tasks {}, which took {} ms", taskManager.tasks().keySet(), + time.milliseconds() - lastPartitionAssignedMs); setState(State.RUNNING); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java index eb0fe013248db..ab8a5ff86e976 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java @@ -73,6 +73,7 @@ public void onPartitionsAssigned(final Collection partitions) { } streamThread.setState(State.PARTITIONS_ASSIGNED); + streamThread.setPartitionAssignedTime(time.milliseconds()); taskManager.handleRebalanceComplete(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 593717b1b97f0..1bf06ba67c92c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -505,7 +505,7 @@ boolean tryToCompleteRestoration(final long now) { task.maybeInitTaskTimeoutOrThrow(now, timeoutException); log.debug( String.format( - "Could not complete restoration for %s due to the follosing exception; will retry", + "Could not complete restoration for %s due to the following exception; will retry", task.id()), timeoutException ); From 128bc75c0db129411e38245553ffc0270ff4adcc Mon Sep 17 00:00:00 2001 From: abbccdda Date: Tue, 12 Jan 2021 19:10:10 -0800 Subject: [PATCH 2/2] test fix --- .../kafka/streams/processor/internals/StreamThread.java | 6 +++--- .../processor/internals/StreamsRebalanceListenerTest.java | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3aaa9239312ff..ca7b9f10bfe0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -277,7 +277,7 @@ public boolean isRunning() { private long now; private long lastPollMs; private long lastCommitMs; - private long lastPartitionAssignedMs = 0; + private long lastPartitionAssignedMs = -1L; private int numIterations; private volatile State state = State.CREATED; private volatile ThreadMetadata threadMetadata; @@ -782,8 +782,8 @@ private void initializeAndRestorePhase() { if (taskManager.tryToCompleteRestoration(now)) { changelogReader.transitToUpdateStandby(); - log.info("Restoration completed for all tasks {}, which took {} ms", taskManager.tasks().keySet(), - time.milliseconds() - lastPartitionAssignedMs); + log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, + taskManager.tasks().keySet()); setState(State.RUNNING); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java index 3bdf83757b999..31177dd848a80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java @@ -45,8 +45,9 @@ public class StreamsRebalanceListenerTest { private final TaskManager taskManager = mock(TaskManager.class); private final StreamThread streamThread = mock(StreamThread.class); private final AtomicInteger assignmentErrorCode = new AtomicInteger(); + private final MockTime time = new MockTime(); private final StreamsRebalanceListener streamsRebalanceListener = new StreamsRebalanceListener( - new MockTime(), + time, taskManager, streamThread, LoggerFactory.getLogger(StreamsRebalanceListenerTest.class), @@ -78,6 +79,7 @@ public void shouldThrowMissingSourceTopicException() { @Test public void shouldSwallowVersionProbingError() { expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(State.PARTITIONS_REVOKED); + streamThread.setPartitionAssignedTime(time.milliseconds()); taskManager.handleRebalanceComplete(); replay(taskManager, streamThread); assignmentErrorCode.set(AssignorError.VERSION_PROBING.code()); @@ -129,6 +131,8 @@ public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() { public void shouldHandleAssignedPartitions() { taskManager.handleRebalanceComplete(); expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING); + streamThread.setPartitionAssignedTime(time.milliseconds()); + replay(taskManager, streamThread); assignmentErrorCode.set(AssignorError.NONE.code());