From 3427d347d2767354942b6e8d39553c352a5c65c9 Mon Sep 17 00:00:00 2001 From: Harini Rajendran Date: Tue, 6 Jul 2021 17:01:02 -0500 Subject: [PATCH] Adding more debug logs to increase visibility into StreamSupervisor notices queue size and processing time. --- .../supervisor/SeekableStreamSupervisor.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f593f7c68b41..d5e7257b0bcd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -93,6 +93,8 @@ import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -825,10 +827,10 @@ public void stop(boolean stopGracefully) synchronized (stopLock) { if (stopGracefully) { log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish"); - notices.add(new GracefulShutdownNotice()); + addNotice(new GracefulShutdownNotice()); } else { log.info("Posting ShutdownNotice"); - notices.add(new ShutdownNotice()); + addNotice(new ShutdownNotice()); } long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis(); @@ -865,7 +867,7 @@ public void stop(boolean stopGracefully) public void reset(DataSourceMetadata dataSourceMetadata) { log.info("Posting ResetNotice"); - notices.add(new ResetNotice(dataSourceMetadata)); + addNotice(new ResetNotice(dataSourceMetadata)); } public ReentrantLock getRecordSupplierLock() @@ -902,7 +904,11 @@ public void tryInit() } try { + Instant handleNoticeStartTime = Instant.now(); notice.handle(); + Instant handleNoticeEndTime = Instant.now(); + Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime); + log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d]", notice.getClass().getName(), timeElapsed.toMillis(), getNoticesQueueSize()); } catch (Throwable e) { stateManager.recordThrowableEvent(e); @@ -956,7 +962,7 @@ public Runnable buildDynamicAllocationTask(Callable scaleAction) private Runnable buildRunTask() { - return () -> notices.add(new RunNotice()); + return () -> addNotice(new RunNotice()); } @Override @@ -1274,7 +1280,7 @@ public void locationChanged(final String taskId, final TaskLocation newLocation) @Override public void statusChanged(String taskId, TaskStatus status) { - notices.add(new RunNotice()); + addNotice(new RunNotice()); } }, Execs.directExecutor() ); @@ -3109,6 +3115,7 @@ private void createNewTasks() throws JsonProcessingException private void addNotice(Notice notice) { + log.debug("Adding notice [%s] to notices queue", notice.getClass().getName()); notices.add(notice); }