From fda07a41b6c5a15b601a309d0206df76ebe4d34e Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 20 Feb 2025 23:26:47 -0800 Subject: [PATCH] cleanup formatting --- .../apache/druid/msq/exec/ControllerImpl.java | 136 ++++++++++-------- 1 file changed, 78 insertions(+), 58 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index cb021813c5d9..4c8cc067294e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -542,8 +542,10 @@ private void emitSummaryMetrics(final MSQTaskReportPayload msqTaskReportPayload, final MSQStagesReport stagesReport = msqTaskReportPayload.getStages(); if (stagesReport != null) { for (MSQStagesReport.Stage stage : stagesReport.getStages()) { - boolean hasParentStage = stage.getStageDefinition().getInputSpecs().stream() - .anyMatch(stageInput -> stageInput instanceof StageInputSpec); + boolean hasParentStage = stage.getStageDefinition() + .getInputSpecs() + .stream() + .anyMatch(stageInput -> stageInput instanceof StageInputSpec); if (!hasParentStage) { stagesToInclude.add(stage.getStageNumber()); } @@ -552,19 +554,20 @@ private void emitSummaryMetrics(final MSQTaskReportPayload msqTaskReportPayload, long totalProcessedBytes = 0; if (msqTaskReportPayload.getCounters() != null) { - totalProcessedBytes = msqTaskReportPayload.getCounters() - .copyMap() - .entrySet() - .stream() - .filter(entry -> stagesReport == null || stagesToInclude.contains(entry.getKey())) - .flatMap(counterSnapshotsMap -> counterSnapshotsMap.getValue().values().stream()) - .flatMap(counterSnapshots -> counterSnapshots.getMap().entrySet().stream()) - .filter(entry -> entry.getKey().startsWith("input")) - .mapToLong(entry -> { - ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) entry.getValue(); - return snapshot.getBytes() == null ? 0L : Arrays.stream(snapshot.getBytes()).sum(); - }) - .sum(); + totalProcessedBytes = + msqTaskReportPayload.getCounters() + .copyMap() + .entrySet() + .stream() + .filter(entry -> stagesReport == null || stagesToInclude.contains(entry.getKey())) + .flatMap(counterSnapshotsMap -> counterSnapshotsMap.getValue().values().stream()) + .flatMap(counterSnapshots -> counterSnapshots.getMap().entrySet().stream()) + .filter(entry -> entry.getKey().startsWith("input")) + .mapToLong(entry -> { + ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) entry.getValue(); + return snapshot.getBytes() == null ? 0L : Arrays.stream(snapshot.getBytes()).sum(); + }) + .sum(); } log.debug("Processed bytes[%d] for query[%s].", totalProcessedBytes, querySpec.getQuery()); @@ -700,14 +703,17 @@ private void addToRetryQueue(ControllerQueryKernel kernel, int worker, MSQFault if (!retriableWorkOrders.isEmpty()) { log.info("Submitting worker[%s] for relaunch because of fault[%s]", worker, fault); retryCapableWorkerManager.submitForRelaunch(worker); - workOrdersToRetry.compute(worker, (workerNumber, workOrders) -> { - if (workOrders == null) { - return new HashSet<>(retriableWorkOrders); - } else { - workOrders.addAll(retriableWorkOrders); - return workOrders; - } - }); + workOrdersToRetry.compute( + worker, + (workerNumber, workOrders) -> { + if (workOrders == null) { + return new HashSet<>(retriableWorkOrders); + } else { + workOrders.addAll(retriableWorkOrders); + return workOrders; + } + } + ); } else { log.debug( "Worker[%d] has no active workOrders that need relaunch therefore not relaunching", @@ -939,11 +945,9 @@ public TaskReport.ReportMap liveReports() /** * @param isStageOutputEmpty {@code true} if the stage output is empty, {@code false} if the stage output is non-empty, * {@code null} for stages where cluster key statistics are not gathered or is incomplete. - * * @return the segments that will be generated by this job. Delegates to * {@link #generateSegmentIdsWithShardSpecsForAppend} or {@link #generateSegmentIdsWithShardSpecsForReplace} as * appropriate. This is a potentially expensive call, since it requires calling Overlord APIs. - * * @throws MSQException with {@link InsertCannotAllocateSegmentFault} if an allocation cannot be made */ private List generateSegmentIdsWithShardSpecs( @@ -1105,7 +1109,14 @@ private void validateNumSegmentsPerBucketOrThrow( for (final Map.Entry>> bucketEntry : partitionsByBucket.entrySet()) { final int numSegmentsInTimeChunk = bucketEntry.getValue().size(); if (numSegmentsInTimeChunk > maxNumSegments) { - throw new MSQException(new TooManySegmentsInTimeChunkFault(bucketEntry.getKey(), numSegmentsInTimeChunk, maxNumSegments, segmentGranularity)); + throw new MSQException( + new TooManySegmentsInTimeChunkFault( + bucketEntry.getKey(), + numSegmentsInTimeChunk, + maxNumSegments, + segmentGranularity + ) + ); } } } @@ -1612,7 +1623,10 @@ private void handleQueryResults( } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); - ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider(), context.taskTempDir()); + ExportMetadataManager exportMetadataManager = new ExportMetadataManager( + destination.getExportStorageProvider(), + context.taskTempDir() + ); final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); //noinspection unchecked @@ -1621,7 +1635,10 @@ private void handleQueryResults( Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId); if (!(resultObjectForStage instanceof List)) { // This might occur if all workers are running on an older version. We are not able to write a manifest file in this case. - log.warn("Unable to create export manifest file. Received result[%s] from worker instead of a list of file names.", resultObjectForStage); + log.warn( + "Unable to create export manifest file. Received result[%s] from worker instead of a list of file names.", + resultObjectForStage + ); return; } @SuppressWarnings("unchecked") @@ -1708,15 +1725,12 @@ private static Function, Set> addCompactionStateTo // Collect all aggregators that are part of the current dataSchema, since a non-rollup query (isRollup() is false) // moves metrics columns to dimensions in the final schema. Set aggregatorsInDataSchema = Arrays.stream(dataSchema.getAggregators()) - .map(AggregatorFactory::getName) - .collect( - Collectors.toSet()); - metricsSpec = new ArrayList<>( - groupByQuery.getAggregatorSpecs() - .stream() - .filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName())) - .collect(Collectors.toList()) - ); + .map(AggregatorFactory::getName) + .collect(Collectors.toSet()); + metricsSpec = groupByQuery.getAggregatorSpecs() + .stream() + .filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName())) + .collect(Collectors.toList()); } IndexSpec indexSpec = tuningConfig.getIndexSpec(); @@ -1848,11 +1862,12 @@ private static QueryDefinition makeQueryDefinition( final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); return builder.add( - destination.getTerminalStageSpec() - .constructFinalStage( - queryDef, - querySpec, - jsonMapper) + destination.getTerminalStageSpec() + .constructFinalStage( + queryDef, + querySpec, + jsonMapper + ) ) .build(); } else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) { @@ -1881,7 +1896,8 @@ private static QueryDefinition makeQueryDefinition( try { // Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export. - Iterator filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir()).listDir(""); + Iterator filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir()) + .listDir(""); if (filesIterator.hasNext()) { throw DruidException.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.RUNTIME_FAILURE) @@ -2398,17 +2414,20 @@ private void retryFailedTasks() throws InterruptedException queryKernel.workOrdersSentForWorker(stageWorkOrders.getKey(), workerNumber); // remove successfully contacted workOrders from workOrdersToRetry - workOrdersToRetry.compute(workerNumber, (task, workOrderSet) -> { - if (workOrderSet == null - || workOrderSet.size() == 0 - || !workOrderSet.remove(stageWorkOrders.getValue().get(workerNumber))) { - throw new ISE("Worker[%s] with number[%d] orders not found", workerId, workerNumber); - } - if (workOrderSet.size() == 0) { - return null; - } - return workOrderSet; - }); + workOrdersToRetry.compute( + workerNumber, + (task, workOrderSet) -> { + if (workOrderSet == null + || workOrderSet.size() == 0 + || !workOrderSet.remove(stageWorkOrders.getValue().get(workerNumber))) { + throw new ISE("Worker[%s] with number[%d] orders not found", workerId, workerNumber); + } + if (workOrderSet.size() == 0) { + return null; + } + return workOrderSet; + } + ); }, queryKernelConfig.isFaultTolerant() ); @@ -2490,9 +2509,10 @@ private void submitParallelMergeRequests(StageId stageId, Set tasks) stageId, tasks.stream().map(workerManager::getWorkerNumber).collect(Collectors.toSet()) ); - workerSketchFetcher.inMemoryFullSketchMerging(ControllerImpl.this::addToKernelManipulationQueue, - stageId, tasks, - ControllerImpl.this::addToRetryQueue + workerSketchFetcher.inMemoryFullSketchMerging( + ControllerImpl.this::addToKernelManipulationQueue, + stageId, tasks, + ControllerImpl.this::addToRetryQueue ); } @@ -2734,7 +2754,7 @@ private boolean cleanUpEffectivelyFinishedStages() /** * Start a {@link ControllerQueryResultsReader} that pushes results to our {@link QueryListener}. - * + *

* The reader runs in a single-threaded executor that is created by this method, and shut down when results * are done being read. */