Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.query.Query;
import org.apache.druid.server.DruidNode;

/**
Expand Down Expand Up @@ -103,8 +104,13 @@ WorkerManager newWorkerManager(
WorkerClient newWorkerClient();

/**
* Default target partitions per worker for {@link QueryKit#makeQueryDefinition}. Can be overridden using
* {@link MultiStageQueryContext#CTX_TARGET_PARTITIONS_PER_WORKER}.
* Create a {@link QueryKitSpec}. This method provides controller contexts a way to customize parameters around the
* number of workers and partitions.
*/
int defaultTargetPartitionsPerWorker();
QueryKitSpec makeQueryKitSpec(
QueryKit<Query<?>> queryKit,
String queryId,
MSQSpec querySpec,
ControllerQueryKernelConfig queryKernelConfig

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'queryKernelConfig' is never used.
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.apache.druid.msq.kernel.controller.WorkerInputs;
import org.apache.druid.msq.querykit.MultiQueryKit;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
Expand Down Expand Up @@ -567,14 +568,9 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)

final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition(
queryId(),
makeQueryControllerToolKit(),
context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig),
querySpec,
context.jsonMapper(),
MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
queryContext,
context.defaultTargetPartitionsPerWorker()
),
resultsContext
);

Expand Down Expand Up @@ -1201,7 +1197,7 @@ private Int2ObjectMap<Object> makeWorkerFactoryInfosForStage(
}

@SuppressWarnings("rawtypes")
private QueryKit makeQueryControllerToolKit()
private QueryKit<Query<?>> makeQueryControllerToolKit()
{
final Map<Class<? extends Query>, QueryKit> kitMap =
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
Expand Down Expand Up @@ -1725,11 +1721,9 @@ private void cleanUpDurableStorageIfNeeded()

@SuppressWarnings("unchecked")
private static QueryDefinition makeQueryDefinition(
final String queryId,
@SuppressWarnings("rawtypes") final QueryKit toolKit,
final QueryKitSpec queryKitSpec,
final MSQSpec querySpec,
final ObjectMapper jsonMapper,
final int targetPartitionsPerWorker,
final ResultsContext resultsContext
)
{
Expand Down Expand Up @@ -1773,13 +1767,10 @@ private static QueryDefinition makeQueryDefinition(
final QueryDefinition queryDef;

try {
queryDef = toolKit.makeQueryDefinition(
queryId,
queryDef = queryKitSpec.getQueryKit().makeQueryDefinition(
queryKitSpec,
queryToPlan,
toolKit,
resultShuffleSpecFactory,
tuningConfig.getMaxNumWorkers(),
targetPartitionsPerWorker,
0
);
}
Expand Down Expand Up @@ -1808,7 +1799,7 @@ private static QueryDefinition makeQueryDefinition(

// Add all query stages.
// Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage.
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId());

for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
if (stageDef.equals(finalShuffleStageDef)) {
Expand All @@ -1834,7 +1825,7 @@ private static QueryDefinition makeQueryDefinition(
// attaching new query results stage if the final stage does sort during shuffle so that results are ordered.
StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
if (finalShuffleStageDef.doesSortDuringShuffle()) {
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId());
builder.addAll(queryDef);
builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
Expand Down Expand Up @@ -1871,15 +1862,15 @@ private static QueryDefinition makeQueryDefinition(
}

final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId());
builder.addAll(queryDef);
builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.signature(queryDef.getFinalStageDefinition().getSignature())
.shuffleSpec(null)
.processorFactory(new ExportResultsFrameProcessorFactory(
queryId,
queryKitSpec.getQueryId(),
exportStorageProvider,
resultFormat,
columnMappings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
Expand Down Expand Up @@ -203,11 +206,26 @@ public WorkerManager newWorkerManager(
}

@Override
public int defaultTargetPartitionsPerWorker()
public QueryKitSpec makeQueryKitSpec(
final QueryKit<Query<?>> queryKit,
final String queryId,
final MSQSpec querySpec,
final ControllerQueryKernelConfig queryKernelConfig
)
{
// Assume tasks are symmetric: workers have the same number of processors available as a controller.
// Create one partition per processor per task, for maximum parallelism.
return memoryIntrospector.numProcessingThreads();
return new QueryKitSpec(
queryKit,
queryId,
querySpec.getTuningConfig().getMaxNumWorkers(),
querySpec.getTuningConfig().getMaxNumWorkers(),

// Assume tasks are symmetric: workers have the same number of processors available as a controller.
// Create one partition per processor per task, for maximum parallelism.
MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
querySpec.getQuery().context(),
memoryIntrospector.numProcessingThreads()
)
);
}

/**
Expand Down
Loading