From bda8d060d1996bfbe1b4e112eaf37fffa4b087d6 Mon Sep 17 00:00:00 2001 From: Boyang Chen Date: Wed, 13 Jan 2021 09:05:23 -0800 Subject: [PATCH] MINOR: Add restoration time tracking (#9830) Add Stream restoration time tracking log Reviewers: John Roesler --- .../kafka/streams/processor/internals/StreamThread.java | 8 +++++++- .../processor/internals/StreamsRebalanceListener.java | 1 + .../processor/internals/StreamsRebalanceListenerTest.java | 6 +++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 332a035eb6874..7d5e3d0456738 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -190,6 +190,10 @@ public State state() { return state; } + void setPartitionAssignedTime(final long lastPartitionAssignedMs) { + this.lastPartitionAssignedMs = lastPartitionAssignedMs; + } + /** * Sets the state * @@ -269,6 +273,7 @@ public boolean isRunning() { private long now; private long lastPollMs; private long lastCommitMs; + private long lastPartitionAssignedMs = -1L; private int numIterations; private volatile State state = State.CREATED; private volatile ThreadMetadata threadMetadata; @@ -735,7 +740,8 @@ private void initializeAndRestorePhase() { if (taskManager.tryToCompleteRestoration()) { changelogReader.transitToUpdateStandby(); - + log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, + taskManager.tasks().keySet()); setState(State.RUNNING); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java index a46a652e09236..c23eab61c5c35 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java @@ -66,6 +66,7 @@ public void onPartitionsAssigned(final Collection partitions) { } streamThread.setState(State.PARTITIONS_ASSIGNED); + streamThread.setPartitionAssignedTime(time.milliseconds()); taskManager.handleRebalanceComplete(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java index aecab44c04cef..76a1673e0b416 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java @@ -43,8 +43,9 @@ public class StreamsRebalanceListenerTest { private final TaskManager taskManager = mock(TaskManager.class); private final StreamThread streamThread = mock(StreamThread.class); private final AtomicInteger assignmentErrorCode = new AtomicInteger(); + private final MockTime time = new MockTime(); private final StreamsRebalanceListener streamsRebalanceListener = new StreamsRebalanceListener( - new MockTime(), + time, taskManager, streamThread, LoggerFactory.getLogger(StreamsRebalanceListenerTest.class), @@ -74,6 +75,7 @@ public void shouldThrowMissingSourceTopicException() { @Test public void shouldSwallowVersionProbingError() { expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(State.PARTITIONS_REVOKED); + streamThread.setPartitionAssignedTime(time.milliseconds()); taskManager.handleRebalanceComplete(); replay(taskManager, streamThread); assignmentErrorCode.set(AssignorError.VERSION_PROBING.code()); @@ -111,6 +113,8 @@ public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() { public void shouldHandleAssignedPartitions() { taskManager.handleRebalanceComplete(); expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING); + streamThread.setPartitionAssignedTime(time.milliseconds()); + replay(taskManager, streamThread); assignmentErrorCode.set(AssignorError.NONE.code());