Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,10 @@ private void emitSummaryMetrics(final MSQTaskReportPayload msqTaskReportPayload,
final MSQStagesReport stagesReport = msqTaskReportPayload.getStages();
if (stagesReport != null) {
for (MSQStagesReport.Stage stage : stagesReport.getStages()) {
boolean hasParentStage = stage.getStageDefinition().getInputSpecs().stream()
.anyMatch(stageInput -> stageInput instanceof StageInputSpec);
boolean hasParentStage = stage.getStageDefinition()
.getInputSpecs()
.stream()
.anyMatch(stageInput -> stageInput instanceof StageInputSpec);
if (!hasParentStage) {
stagesToInclude.add(stage.getStageNumber());
}
Expand All @@ -552,19 +554,20 @@ private void emitSummaryMetrics(final MSQTaskReportPayload msqTaskReportPayload,
long totalProcessedBytes = 0;

if (msqTaskReportPayload.getCounters() != null) {
totalProcessedBytes = msqTaskReportPayload.getCounters()
.copyMap()
.entrySet()
.stream()
.filter(entry -> stagesReport == null || stagesToInclude.contains(entry.getKey()))
.flatMap(counterSnapshotsMap -> counterSnapshotsMap.getValue().values().stream())
.flatMap(counterSnapshots -> counterSnapshots.getMap().entrySet().stream())
.filter(entry -> entry.getKey().startsWith("input"))
.mapToLong(entry -> {
ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) entry.getValue();
return snapshot.getBytes() == null ? 0L : Arrays.stream(snapshot.getBytes()).sum();
})
.sum();
totalProcessedBytes =
msqTaskReportPayload.getCounters()
.copyMap()
.entrySet()
.stream()
.filter(entry -> stagesReport == null || stagesToInclude.contains(entry.getKey()))
.flatMap(counterSnapshotsMap -> counterSnapshotsMap.getValue().values().stream())
.flatMap(counterSnapshots -> counterSnapshots.getMap().entrySet().stream())
.filter(entry -> entry.getKey().startsWith("input"))
.mapToLong(entry -> {
ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) entry.getValue();
return snapshot.getBytes() == null ? 0L : Arrays.stream(snapshot.getBytes()).sum();
})
.sum();
}

log.debug("Processed bytes[%d] for query[%s].", totalProcessedBytes, querySpec.getQuery());
Expand Down Expand Up @@ -700,14 +703,17 @@ private void addToRetryQueue(ControllerQueryKernel kernel, int worker, MSQFault
if (!retriableWorkOrders.isEmpty()) {
log.info("Submitting worker[%s] for relaunch because of fault[%s]", worker, fault);
retryCapableWorkerManager.submitForRelaunch(worker);
workOrdersToRetry.compute(worker, (workerNumber, workOrders) -> {
if (workOrders == null) {
return new HashSet<>(retriableWorkOrders);
} else {
workOrders.addAll(retriableWorkOrders);
return workOrders;
}
});
workOrdersToRetry.compute(
worker,
(workerNumber, workOrders) -> {
if (workOrders == null) {
return new HashSet<>(retriableWorkOrders);
} else {
workOrders.addAll(retriableWorkOrders);
return workOrders;
}
}
);
} else {
log.debug(
"Worker[%d] has no active workOrders that need relaunch therefore not relaunching",
Expand Down Expand Up @@ -939,11 +945,9 @@ public TaskReport.ReportMap liveReports()
/**
* @param isStageOutputEmpty {@code true} if the stage output is empty, {@code false} if the stage output is non-empty,
* {@code null} for stages where cluster key statistics are not gathered or is incomplete.
*
* @return the segments that will be generated by this job. Delegates to
* {@link #generateSegmentIdsWithShardSpecsForAppend} or {@link #generateSegmentIdsWithShardSpecsForReplace} as
* appropriate. This is a potentially expensive call, since it requires calling Overlord APIs.
*
* @throws MSQException with {@link InsertCannotAllocateSegmentFault} if an allocation cannot be made
*/
private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecs(
Expand Down Expand Up @@ -1105,7 +1109,14 @@ private void validateNumSegmentsPerBucketOrThrow(
for (final Map.Entry<DateTime, List<Pair<Integer, ClusterByPartition>>> bucketEntry : partitionsByBucket.entrySet()) {
final int numSegmentsInTimeChunk = bucketEntry.getValue().size();
if (numSegmentsInTimeChunk > maxNumSegments) {
throw new MSQException(new TooManySegmentsInTimeChunkFault(bucketEntry.getKey(), numSegmentsInTimeChunk, maxNumSegments, segmentGranularity));
throw new MSQException(
new TooManySegmentsInTimeChunkFault(
bucketEntry.getKey(),
numSegmentsInTimeChunk,
maxNumSegments,
segmentGranularity
)
);
}
}
}
Expand Down Expand Up @@ -1612,7 +1623,10 @@ private void handleQueryResults(
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider(), context.taskTempDir());
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(
destination.getExportStorageProvider(),
context.taskTempDir()
);

final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
//noinspection unchecked
Expand All @@ -1621,7 +1635,10 @@ private void handleQueryResults(
Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId);
if (!(resultObjectForStage instanceof List)) {
// This might occur if all workers are running on an older version. We are not able to write a manifest file in this case.
log.warn("Unable to create export manifest file. Received result[%s] from worker instead of a list of file names.", resultObjectForStage);
log.warn(
"Unable to create export manifest file. Received result[%s] from worker instead of a list of file names.",
resultObjectForStage
);
return;
}
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1708,15 +1725,12 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
// Collect all aggregators that are part of the current dataSchema, since a non-rollup query (isRollup() is false)
// moves metrics columns to dimensions in the final schema.
Set<String> aggregatorsInDataSchema = Arrays.stream(dataSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(
Collectors.toSet());
metricsSpec = new ArrayList<>(
groupByQuery.getAggregatorSpecs()
.stream()
.filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName()))
.collect(Collectors.toList())
);
.map(AggregatorFactory::getName)
.collect(Collectors.toSet());
metricsSpec = groupByQuery.getAggregatorSpecs()
.stream()
.filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName()))
.collect(Collectors.toList());
}

IndexSpec indexSpec = tuningConfig.getIndexSpec();
Expand Down Expand Up @@ -1848,11 +1862,12 @@ private static QueryDefinition makeQueryDefinition(

final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
return builder.add(
destination.getTerminalStageSpec()
.constructFinalStage(
queryDef,
querySpec,
jsonMapper)
destination.getTerminalStageSpec()
.constructFinalStage(
queryDef,
querySpec,
jsonMapper
)
)
.build();
} else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) {
Expand Down Expand Up @@ -1881,7 +1896,8 @@ private static QueryDefinition makeQueryDefinition(

try {
// Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export.
Iterator<String> filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir()).listDir("");
Iterator<String> filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir())
.listDir("");
if (filesIterator.hasNext()) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
Expand Down Expand Up @@ -2398,17 +2414,20 @@ private void retryFailedTasks() throws InterruptedException
queryKernel.workOrdersSentForWorker(stageWorkOrders.getKey(), workerNumber);

// remove successfully contacted workOrders from workOrdersToRetry
workOrdersToRetry.compute(workerNumber, (task, workOrderSet) -> {
if (workOrderSet == null
|| workOrderSet.size() == 0
|| !workOrderSet.remove(stageWorkOrders.getValue().get(workerNumber))) {
throw new ISE("Worker[%s] with number[%d] orders not found", workerId, workerNumber);
}
if (workOrderSet.size() == 0) {
return null;
}
return workOrderSet;
});
workOrdersToRetry.compute(
workerNumber,
(task, workOrderSet) -> {
if (workOrderSet == null
|| workOrderSet.size() == 0
|| !workOrderSet.remove(stageWorkOrders.getValue().get(workerNumber))) {
throw new ISE("Worker[%s] with number[%d] orders not found", workerId, workerNumber);
}
if (workOrderSet.size() == 0) {
return null;
}
return workOrderSet;
}
);
},
queryKernelConfig.isFaultTolerant()
);
Expand Down Expand Up @@ -2490,9 +2509,10 @@ private void submitParallelMergeRequests(StageId stageId, Set<String> tasks)
stageId,
tasks.stream().map(workerManager::getWorkerNumber).collect(Collectors.toSet())
);
workerSketchFetcher.inMemoryFullSketchMerging(ControllerImpl.this::addToKernelManipulationQueue,
stageId, tasks,
ControllerImpl.this::addToRetryQueue
workerSketchFetcher.inMemoryFullSketchMerging(
ControllerImpl.this::addToKernelManipulationQueue,
stageId, tasks,
ControllerImpl.this::addToRetryQueue
);
}

Expand Down Expand Up @@ -2734,7 +2754,7 @@ private boolean cleanUpEffectivelyFinishedStages()

/**
* Start a {@link ControllerQueryResultsReader} that pushes results to our {@link QueryListener}.
*
* <p>
* The reader runs in a single-threaded executor that is created by this method, and shut down when results
* are done being read.
*/
Expand Down