From 0790fe34c54d1e804d3d00fbab9c702b4b2bca54 Mon Sep 17 00:00:00 2001 From: Sensor Date: Wed, 17 Jan 2024 19:43:49 +0800 Subject: [PATCH 1/5] add pre-check for heavy log --- .../parallel/ParallelIndexSupervisorTask.java | 4 +- .../supervisor/SeekableStreamSupervisor.java | 18 ++++--- .../autoscaler/LagBasedAutoScaler.java | 4 +- .../clients/OverlordResourceTestClient.java | 12 +++-- .../processor/FrameProcessorExecutor.java | 5 +- .../guava/ParallelMergeCombiningSequence.java | 50 ++++++++++--------- .../util/common/io/smoosh/FileSmoosher.java | 12 +++-- .../druid/discovery/DataServerClient.java | 4 +- .../server/coordinator/DruidCoordinator.java | 10 ++-- .../loading/CuratorLoadQueuePeon.java | 14 +++--- .../calcite/parser/DruidSqlParserUtils.java | 4 +- 11 files changed, 82 insertions(+), 55 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 9940dfdcb4de..28a816112e37 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -752,7 +752,9 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except ); // This is for potential debugging in case we suspect bad estimation of cardinalities etc, - LOG.debug("intervalToNumShards: %s", intervalToNumShards.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("intervalToNumShards: %s", intervalToNumShards.toString()); + } } else { intervalToNumShards = CollectionUtils.mapValues( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 37596bffde9c..ba027d112f43 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1159,14 +1159,16 @@ public void tryInit() Instant handleNoticeEndTime = Instant.now(); Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime); String noticeType = notice.getType(); - log.debug( - "Handled notice [%s] from notices queue in [%d] ms, " - + "current notices queue size [%d] for datasource [%s]", - noticeType, - timeElapsed.toMillis(), - getNoticesQueueSize(), - dataSource - ); + if (log.isDebugEnabled()) { + log.debug( + "Handled notice [%s] from notices queue in [%d] ms, " + + "current notices queue size [%d] for datasource [%s]", + noticeType, + timeElapsed.toMillis(), + getNoticesQueueSize(), + dataSource + ); + } emitNoticeProcessTime(noticeType, timeElapsed.toMillis()); } catch (Throwable e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 8c645278d55b..544d6de4a9ac 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -149,7 +149,9 @@ private Runnable computeAndCollectLag() long totalLags = lagStats.getTotalLag(); lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L); } - log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource); + if (log.isDebugEnabled()) { + log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource); + } } else { log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index bf0f549b190f..1b7fec4105d8 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -127,7 +127,9 @@ public TaskStatusPlus getTaskStatus(String taskID) StringUtils.urlEncode(taskID) ) ); - LOG.debug("Index status response" + response.getContent()); + if (LOG.isDebugEnabled()) { + LOG.debug("Index status response" + response.getContent()); + } TaskStatusResponse taskStatusResponse = jsonMapper.readValue( response.getContent(), new TypeReference() @@ -185,7 +187,9 @@ private List getTasks(String identifier) HttpMethod.GET, StringUtils.format("%s%s", getIndexerURL(), identifier) ); - LOG.debug("Tasks %s response %s", identifier, response.getContent()); + if (LOG.isDebugEnabled()) { + LOG.debug("Tasks %s response %s", identifier, response.getContent()); + } return jsonMapper.readValue( response.getContent(), new TypeReference>() { @@ -204,7 +208,9 @@ public TaskPayloadResponse getTaskPayload(String taskId) HttpMethod.GET, StringUtils.format("%stask/%s", getIndexerURL(), StringUtils.urlEncode(taskId)) ); - LOG.debug("Task %s response %s", taskId, response.getContent()); + if (LOG.isDebugEnabled()) { + LOG.debug("Task %s response %s", taskId, response.getContent()); + } return jsonMapper.readValue( response.getContent(), new TypeReference() { diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index 02161d5ad5ad..d43aa537d985 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -595,8 +595,9 @@ private static void logProcessorStatusString( sb.append("; cancel=").append(finishedFuture.isCancelled() ? "y" : "n"); sb.append("; done=").append(finishedFuture.isDone() ? "y" : "n"); - - log.debug(StringUtils.encodeForFormat(sb.toString())); + if (log.isDebugEnabled()) { + log.debug(StringUtils.encodeForFormat(sb.toString())); + } } } diff --git a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 1df3c9a9dc16..0875679e29ef 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -475,20 +475,22 @@ private int computeNumTasks() final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1); - LOG.debug( - "Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " - + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " - + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", - computedNumParallelTasks, - parallelism, - getPool().getActiveThreadCount(), - runningThreadCount, - submissionCount, - getPool().getQueuedTaskCount(), - getPool().getParallelism(), - getPool().getPoolSize(), - getPool().getStealCount() - ); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " + + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", + computedNumParallelTasks, + parallelism, + getPool().getActiveThreadCount(), + runningThreadCount, + submissionCount, + getPool().getQueuedTaskCount(), + getPool().getParallelism(), + getPool().getPoolSize(), + getPool().getStealCount() + ); + } return computedNumParallelTasks; } @@ -632,15 +634,17 @@ protected void compute() (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); final int adjustedNextYieldAfter = (int) Math.ceil(cumulativeMovingAverage); - LOG.debug( - "task recursion %s yielded %s results ran for %s millis (%s nanos), %s cpu nanos, next task yielding every %s operations", - recursionDepth, - yieldAfter, - TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS), - elapsedNanos, - elapsedCpuNanos, - adjustedNextYieldAfter - ); + if (LOG.isDebugEnabled()) { + LOG.debug( + "task recursion %s yielded %s results ran for %s millis (%s nanos), %s cpu nanos, next task yielding every %s operations", + recursionDepth, + yieldAfter, + TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS), + elapsedNanos, + elapsedCpuNanos, + adjustedNextYieldAfter + ); + } getPool().execute(new MergeCombineAction<>( pQueue, outputQueue, diff --git a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java index 17eb469e9497..1ada7dfc78e9 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -490,11 +490,13 @@ public boolean isOpen() public void close() throws IOException { closer.close(); - FileSmoosher.LOG.debug( - "Created smoosh file [%s] of size [%s] bytes.", - outFile.getAbsolutePath(), - outFile.length() - ); + if (LOG.isDebugEnabled()) { + FileSmoosher.LOG.debug( + "Created smoosh file [%s] of size [%s] bytes.", + outFile.getAbsolutePath(), + outFile.length() + ); + } } } } diff --git a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java index 479ba9d4142d..78c4acfe44aa 100644 --- a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java @@ -91,7 +91,9 @@ public Sequence run(Query query, ResponseContext responseContext, Java requestBuilder = requestBuilder.jsonContent(objectMapper, query); } - log.debug("Sending request to servers for query[%s], request[%s]", query.getId(), requestBuilder.toString()); + if (log.isDebugEnabled()) { + log.debug("Sending request to servers for query[%s], request[%s]", query.getId(), requestBuilder.toString()); + } ListenableFuture resultStreamFuture = serviceClient.asyncRequest( requestBuilder, new DataServerResponseHandler(query, responseContext, objectMapper) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 5558d204e81a..29fc00ecf7ce 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -538,10 +538,12 @@ List makeIndexingServiceDuties() if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) { duties.add(compactSegments); } - log.debug( - "Initialized indexing service duties [%s].", - duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) - ); + if (log.isDebugEnabled()) { + log.debug( + "Initialized indexing service duties [%s].", + duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) + ); + } return ImmutableList.copyOf(duties); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java index 2303be26d8b5..7ecedcb46d7b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java @@ -230,12 +230,14 @@ public void run() final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier()); final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - log.debug( - "ZKNode created for server to [%s] %s [%s]", - basePath, - segmentHolder.getAction(), - segmentHolder.getSegmentIdentifier() - ); + if (log.isDebugEnabled()) { + log.debug( + "ZKNode created for server to [%s] %s [%s]", + basePath, + segmentHolder.getAction(), + segmentHolder.getSegmentIdentifier() + ); + } final ScheduledFuture nodeDeletedCheck = scheduleNodeDeletedCheck(path); final Stat stat = curator.checkExists().usingWatcher( (CuratorWatcher) watchedEvent -> { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java index 202df0f705c9..1ac2009698f1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java @@ -90,7 +90,9 @@ public static Granularity convertSqlNodeToGranularityThrowingParseExceptions(Sql throw e; } catch (Exception e) { - log.debug(e, StringUtils.format("Unable to convert %s to a valid granularity.", sqlNode.toString())); + if (log.isDebugEnabled()) { + log.debug(e, StringUtils.format("Unable to convert %s to a valid granularity.", sqlNode.toString())); + } throw new ParseException(e.getMessage()); } } From 61f6ed1a1845f27aa1efe43d76c606efd66c0b0d Mon Sep 17 00:00:00 2001 From: Benedict Jin Date: Thu, 18 Jan 2024 11:35:49 +0800 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Kashif Faraz --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 4 +--- .../apache/druid/sql/calcite/parser/DruidSqlParserUtils.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 28a816112e37..7edc4a555bc7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -752,9 +752,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except ); // This is for potential debugging in case we suspect bad estimation of cardinalities etc, - if (LOG.isDebugEnabled()) { - LOG.debug("intervalToNumShards: %s", intervalToNumShards.toString()); - } + LOG.debug("intervalToNumShards: %s", intervalToNumShards); } else { intervalToNumShards = CollectionUtils.mapValues( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java index 1ac2009698f1..8aab850c119d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java @@ -90,9 +90,7 @@ public static Granularity convertSqlNodeToGranularityThrowingParseExceptions(Sql throw e; } catch (Exception e) { - if (log.isDebugEnabled()) { - log.debug(e, StringUtils.format("Unable to convert %s to a valid granularity.", sqlNode.toString())); - } + log.debug(e, "Unable to convert node[%s] to a valid granularity.", sqlNode); throw new ParseException(e.getMessage()); } } From 42f84c0376a5f5f315c9c04df62b17fffbfa635a Mon Sep 17 00:00:00 2001 From: Sensor Date: Thu, 18 Jan 2024 11:42:07 +0800 Subject: [PATCH 3/5] fix comment --- .../testing/clients/OverlordResourceTestClient.java | 4 +--- .../common/guava/ParallelMergeCombiningSequence.java | 11 ++++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 1b7fec4105d8..95f56ad4b759 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -127,9 +127,7 @@ public TaskStatusPlus getTaskStatus(String taskID) StringUtils.urlEncode(taskID) ) ); - if (LOG.isDebugEnabled()) { - LOG.debug("Index status response" + response.getContent()); - } + LOG.debug("Index status response" + response.getContent()); TaskStatusResponse taskStatusResponse = jsonMapper.readValue( response.getContent(), new TypeReference() diff --git a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 0875679e29ef..33b3e3f4914e 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -476,19 +476,20 @@ private int computeNumTasks() final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1); if (LOG.isDebugEnabled()) { + ForkJoinPool pool = getPool(); LOG.debug( "Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", computedNumParallelTasks, parallelism, - getPool().getActiveThreadCount(), + pool.getActiveThreadCount(), runningThreadCount, submissionCount, - getPool().getQueuedTaskCount(), - getPool().getParallelism(), - getPool().getPoolSize(), - getPool().getStealCount() + pool.getQueuedTaskCount(), + pool.getParallelism(), + pool.getPoolSize(), + pool.getStealCount() ); } From ac203d73419fe76933e9a974069c9c3ee7cb1c4f Mon Sep 17 00:00:00 2001 From: Sensor Date: Mon, 22 Jan 2024 11:37:51 +0800 Subject: [PATCH 4/5] modify the layout to improve test coverage --- .../supervisor/SeekableStreamSupervisor.java | 5 +-- .../processor/FrameProcessorExecutor.java | 4 +-- .../guava/ParallelMergeCombiningSequence.java | 32 +++++++------------ .../util/common/io/smoosh/FileSmoosher.java | 6 +--- .../server/coordinator/DruidCoordinator.java | 5 +-- .../loading/CuratorLoadQueuePeon.java | 7 +--- 6 files changed, 17 insertions(+), 42 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ba027d112f43..c0c0144579b1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1163,10 +1163,7 @@ public void tryInit() log.debug( "Handled notice [%s] from notices queue in [%d] ms, " + "current notices queue size [%d] for datasource [%s]", - noticeType, - timeElapsed.toMillis(), - getNoticesQueueSize(), - dataSource + noticeType, timeElapsed.toMillis(), getNoticesQueueSize(), dataSource ); } emitNoticeProcessTime(noticeType, timeElapsed.toMillis()); diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index d43aa537d985..c0f79d30e581 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -595,9 +595,7 @@ private static void logProcessorStatusString( sb.append("; cancel=").append(finishedFuture.isCancelled() ? "y" : "n"); sb.append("; done=").append(finishedFuture.isDone() ? "y" : "n"); - if (log.isDebugEnabled()) { - log.debug(StringUtils.encodeForFormat(sb.toString())); - } + log.debug(StringUtils.encodeForFormat(sb.toString())); } } diff --git a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 33b3e3f4914e..517235a99f98 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -481,15 +481,9 @@ private int computeNumTasks() "Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", - computedNumParallelTasks, - parallelism, - pool.getActiveThreadCount(), - runningThreadCount, - submissionCount, - pool.getQueuedTaskCount(), - pool.getParallelism(), - pool.getPoolSize(), - pool.getStealCount() + computedNumParallelTasks, parallelism, + pool.getActiveThreadCount(), runningThreadCount, submissionCount, pool.getQueuedTaskCount(), + pool.getParallelism(), pool.getPoolSize(), pool.getStealCount() ); } @@ -635,17 +629,15 @@ protected void compute() (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); final int adjustedNextYieldAfter = (int) Math.ceil(cumulativeMovingAverage); - if (LOG.isDebugEnabled()) { - LOG.debug( - "task recursion %s yielded %s results ran for %s millis (%s nanos), %s cpu nanos, next task yielding every %s operations", - recursionDepth, - yieldAfter, - TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS), - elapsedNanos, - elapsedCpuNanos, - adjustedNextYieldAfter - ); - } + LOG.debug( + "task recursion %s yielded %s results ran for %s millis (%s nanos), %s cpu nanos, next task yielding every %s operations", + recursionDepth, + yieldAfter, + TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS), + elapsedNanos, + elapsedCpuNanos, + adjustedNextYieldAfter + ); getPool().execute(new MergeCombineAction<>( pQueue, outputQueue, diff --git a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java index 1ada7dfc78e9..c3c87e931119 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -491,11 +491,7 @@ public void close() throws IOException { closer.close(); if (LOG.isDebugEnabled()) { - FileSmoosher.LOG.debug( - "Created smoosh file [%s] of size [%s] bytes.", - outFile.getAbsolutePath(), - outFile.length() - ); + FileSmoosher.LOG.debug("Created smoosh file [%s] of size [%s] bytes.", outFile.getAbsolutePath(), outFile.length()); } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 29fc00ecf7ce..caa1c7fb66f0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -539,10 +539,7 @@ List makeIndexingServiceDuties() duties.add(compactSegments); } if (log.isDebugEnabled()) { - log.debug( - "Initialized indexing service duties [%s].", - duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) - ); + log.debug("Initialized indexing service duties [%s].", duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())); } return ImmutableList.copyOf(duties); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java index 7ecedcb46d7b..18b83f391424 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java @@ -231,12 +231,7 @@ public void run() final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); if (log.isDebugEnabled()) { - log.debug( - "ZKNode created for server to [%s] %s [%s]", - basePath, - segmentHolder.getAction(), - segmentHolder.getSegmentIdentifier() - ); + log.debug("ZKNode created for server to [%s] %s [%s]", basePath, segmentHolder.getAction(), segmentHolder.getSegmentIdentifier()); } final ScheduledFuture nodeDeletedCheck = scheduleNodeDeletedCheck(path); final Stat stat = curator.checkExists().usingWatcher( From 2f64b0dd8110b2b69e14c6ab2df988cb8abea9a5 Mon Sep 17 00:00:00 2001 From: Sensor Date: Thu, 29 Feb 2024 11:09:53 +0800 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: Kashif Faraz --- .../supervisor/autoscaler/LagBasedAutoScaler.java | 4 +--- .../druid/java/util/common/io/smoosh/FileSmoosher.java | 2 +- .../java/org/apache/druid/discovery/DataServerClient.java | 2 +- .../server/coordinator/loading/CuratorLoadQueuePeon.java | 5 ++++- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 544d6de4a9ac..a06f358435f8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -149,9 +149,7 @@ private Runnable computeAndCollectLag() long totalLags = lagStats.getTotalLag(); lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L); } - if (log.isDebugEnabled()) { - log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource); - } + log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); } diff --git a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java index c3c87e931119..f4103ec18473 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -491,7 +491,7 @@ public void close() throws IOException { closer.close(); if (LOG.isDebugEnabled()) { - FileSmoosher.LOG.debug("Created smoosh file [%s] of size [%s] bytes.", outFile.getAbsolutePath(), outFile.length()); + LOG.debug("Created smoosh file [%s] of size [%s] bytes.", outFile.getAbsolutePath(), outFile.length()); } } } diff --git a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java index 78c4acfe44aa..ce7ac325b62b 100644 --- a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java @@ -92,7 +92,7 @@ public Sequence run(Query query, ResponseContext responseContext, Java } if (log.isDebugEnabled()) { - log.debug("Sending request to servers for query[%s], request[%s]", query.getId(), requestBuilder.toString()); + log.debug("Sending request to servers for query[%s], request[%s]", query.getId(), requestBuilder); } ListenableFuture resultStreamFuture = serviceClient.asyncRequest( requestBuilder, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java index 18b83f391424..7ea78a132fe9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java @@ -231,7 +231,10 @@ public void run() final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); if (log.isDebugEnabled()) { - log.debug("ZKNode created for server to [%s] %s [%s]", basePath, segmentHolder.getAction(), segmentHolder.getSegmentIdentifier()); + log.debug( + "ZKNode created for server to [%s] %s [%s]", + basePath, segmentHolder.getAction(), segmentHolder.getSegmentIdentifier() + ); } final ScheduledFuture nodeDeletedCheck = scheduleNodeDeletedCheck(path); final Stat stat = curator.checkExists().usingWatcher(