From 29372545441911bd41489063d6ee44902d8c2f60 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 7 Mar 2025 11:22:50 +0530 Subject: [PATCH] More logging cleanup on Overlord --- .../actions/SegmentAllocationQueue.java | 30 +++++++++---------- .../druid/indexing/overlord/TaskLockbox.java | 6 ++-- .../druid/indexing/overlord/TaskQueue.java | 6 ++-- .../supervisor/SeekableStreamSupervisor.java | 2 +- .../autoscaler/LagBasedAutoScaler.java | 6 ++-- .../org/apache/druid/query/DruidMetrics.java | 16 +++++----- .../java/org/apache/druid/query/Druids.java | 5 ---- .../IndexerSQLMetadataStorageCoordinator.java | 3 +- 8 files changed, 35 insertions(+), 39 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 3c86e8e5637d..724364c27413 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -341,7 +341,7 @@ private boolean processBatch(AllocateRequestBatch requestBatch) } log.debug( - "Processing [%d] requests for batch [%s], queue time [%s].", + "Processing [%d] requests for batch[%s], queue time[%s].", requestBatch.size(), requestKey, requestBatch.getQueueTime() ); @@ -355,21 +355,21 @@ private boolean processBatch(AllocateRequestBatch requestBatch) emitBatchMetric("task/action/batch/attempts", 1L, requestKey); emitBatchMetric("task/action/batch/runTime", (System.currentTimeMillis() - startTimeMillis), requestKey); - log.info("Successfully processed [%d / %d] requests in batch [%s].", successCount, batchSize, requestKey); + log.debug("Successfully processed [%d / %d] requests in batch[%s].", successCount, batchSize, requestKey); if (requestBatch.isEmpty()) { return true; } // Requeue the batch only if used segments have changed - log.debug("There are [%d] failed requests in batch [%s].", requestBatch.size(), requestKey); + log.debug("There are [%d] failed requests in batch[%s].", requestBatch.size(), requestKey); final Set updatedUsedSegments = retrieveUsedSegments(requestKey); if (updatedUsedSegments.equals(usedSegments)) { log.warn( - "Completing [%d] failed requests in batch [%s] with null value as there" + "Completing [%d] failed requests in batch[%s] with null value as there" + " are conflicting segments. Cannot retry allocation until the set of" - + " used segments overlapping the allocation interval [%s] changes.", + + " used segments overlapping the allocation interval[%s] changes.", size(), requestKey, requestKey.preferredAllocationInterval ); @@ -452,9 +452,9 @@ private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set allocateSegments( boolean reduceMetadataIO ) { - log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval); + log.debug("Allocating [%d] segments for datasource[%s], interval[%s]", requests.size(), dataSource, interval); final boolean isTimeChunkLock = lockGranularity == LockGranularity.TIME_CHUNK; final AllocationHolderList holderList = new AllocationHolderList(requests, interval); @@ -1132,11 +1132,11 @@ private void unlock(final Task task, final Interval interval, @Nullable Integer if (match) { // Remove task from live list - log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock); + log.debug("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock); final boolean removed = taskLockPosse.removeTask(task); if (taskLockPosse.isTasksEmpty()) { - log.info("TaskLock[%s] is now empty.", taskLock); + log.debug("TaskLock[%s] is now empty.", taskLock); possesHolder.remove(taskLockPosse); } if (possesHolder.isEmpty()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 89bd441328a5..3bbafb749017 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -440,7 +440,7 @@ private void manageInternalCritical( continue; } if (taskIsReady) { - log.info("Asking taskRunner to run: %s", task.getId()); + log.info("Asking taskRunner to run task[%s]", task.getId()); runnerTaskFuture = taskRunner.run(task); } else { // Task.isReady() can internally lock intervals or segments. @@ -469,7 +469,7 @@ private void manageInternalPostCritical( // Kill tasks that shouldn't be running final Set tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds); if (!tasksToKill.isEmpty()) { - log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); + log.info("Asking taskRunner to clean up [%,d] tasks.", tasksToKill.size()); // On large installations running several thousands of tasks, // concatenating the list of known task ids can be compupationally expensive. @@ -483,7 +483,7 @@ private void manageInternalPostCritical( taskRunner.shutdown(taskId, reason); } catch (Exception e) { - log.warn(e, "TaskRunner failed to clean up task: %s", taskId); + log.warn(e, "TaskRunner failed to clean up task[%s].", taskId); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 47781826265a..442dcc35f4c5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3222,7 +3222,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException final boolean stopTasksEarly; if (earlyStopTime != null && !earlyStopTime.isAfterNow()) { - log.info("Early stop requested, signalling tasks to complete."); + log.info("Early stop requested for supervisor[%s], signalling tasks to complete.", supervisorId); earlyStopTime = null; stopTasksEarly = true; } else { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 648d8a655e92..2786860ecad8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -178,7 +178,7 @@ private Runnable computeAndCollectLag() } log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { - log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); + log.debug("Supervisor[%s] is suspended, skipping lag collection", dataSource); } } catch (Exception e) { @@ -237,7 +237,7 @@ private int computeDesiredTaskCount(List lags) int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); if (currentActiveTaskCount == actualTaskCountMax) { - log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", + log.debug("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", dataSource ); emitter.emit(metricBuilder @@ -258,7 +258,7 @@ private int computeDesiredTaskCount(List lags) int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); if (currentActiveTaskCount == actualTaskCountMin) { - log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].", + log.debug("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource[%s].", dataSource ); emitter.emit(metricBuilder diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index 1f478809333b..6551fc5c6004 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -25,27 +25,29 @@ import java.util.List; /** + * Contains dimension names used while emitting metrics. */ public class DruidMetrics { + // Query dimensions public static final String DATASOURCE = "dataSource"; public static final String TYPE = "type"; public static final String INTERVAL = "interval"; public static final String ID = "id"; public static final String SUBQUERY_ID = "subQueryId"; - public static final String TASK_ID = "taskId"; - public static final String GROUP_ID = "groupId"; public static final String STATUS = "status"; - public static final String TASK_INGESTION_MODE = "taskIngestionMode"; - - public static final String PARTITIONING_TYPE = "partitioningType"; - // task metrics + // Task dimensions + public static final String TASK_ID = "taskId"; + public static final String GROUP_ID = "groupId"; public static final String TASK_TYPE = "taskType"; public static final String TASK_STATUS = "taskStatus"; + // Ingestion dimensions + public static final String PARTITIONING_TYPE = "partitioningType"; + public static final String TASK_INGESTION_MODE = "taskIngestionMode"; + public static final String TASK_ACTION_TYPE = "taskActionType"; public static final String STREAM = "stream"; - public static final String PARTITION = "partition"; public static final String TAGS = "tags"; diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index 90feacc004cf..5ce472c9b31e 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -1065,9 +1065,4 @@ public static DataSourceMetadataQueryBuilder newDataSourceMetadataQueryBuilder() { return new DataSourceMetadataQueryBuilder(); } - - public static FilteredDataSource filteredDataSource(DataSource base, DimFilter filter) - { - return FilteredDataSource.create(base, filter); - } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 85667d176dd8..fcdd31d0fa6a 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1261,7 +1261,7 @@ private Map createNewSegments( if (createdSegment != null) { pendingSegments.add(createdSegment.getId()); uniqueRequestToSegment.put(uniqueRequest, createdSegment); - log.info("Created new segment[%s]", createdSegment.getId()); + log.debug("Created new segment[%s]", createdSegment.getId()); } } @@ -1270,7 +1270,6 @@ private Map createNewSegments( } } - log.info("Created [%d] new segments for [%d] allocate requests.", uniqueRequestToSegment.size(), requests.size()); return createdSegments; }