Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
Expand Down Expand Up @@ -424,21 +423,11 @@ private static DataSourcePlan forQuery(
@Nullable final QueryContext parentContext
)
{
// check if parentContext has a window operator
final Map<String, Object> windowShuffleMap = new HashMap<>();
if (parentContext != null && parentContext.containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
windowShuffleMap.put(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, parentContext.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL));
}
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
queryId,
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
windowShuffleMap.isEmpty()
? dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
: dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
.withOverriddenContext(windowShuffleMap),
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
queryKit,
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount),
maxWorkerCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.msq.querykit;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
Expand Down Expand Up @@ -88,17 +87,6 @@ public QueryDefinition makeQueryDefinition(
List<List<OperatorFactory>> operatorList = getOperatorListFromQuery(originalQuery);
log.info("Created operatorList with operator factories: [%s]", operatorList);

ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
// add this shuffle spec to the last stage of the inner query

final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId);
if (nextShuffleSpec != null) {
final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
originalQuery = (WindowOperatorQuery) originalQuery.withOverriddenContext(ImmutableMap.of(
MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
windowClusterBy
));
}
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKit,
queryId,
Expand All @@ -112,7 +100,8 @@ public QueryDefinition makeQueryDefinition(
false
);

dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec);

final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
Expand Down Expand Up @@ -309,12 +298,16 @@ private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorF
}
}

if (partition == null || partition.getPartitionColumns().isEmpty()) {
if (partition == null) {
Comment thread
Akshat-Jain marked this conversation as resolved.
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
return null;
}

if (partition.getPartitionColumns().isEmpty()) {
return MixShuffleSpec.instance();
}

List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
KeyColumn kc;
Expand All @@ -328,4 +321,29 @@ private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorF

return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount);
}

/**
* Override the shuffle spec of the last stage based on the shuffling required by the first window stage.
* @param queryId
* @param dataSourcePlan
* @param shuffleSpec
Comment thread
Akshat-Jain marked this conversation as resolved.
* @return
*/
private QueryDefinitionBuilder makeQueryDefinitionBuilder(String queryId, DataSourcePlan dataSourcePlan, ShuffleSpec shuffleSpec)
{
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId);
int previousStageNumber = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getStageNumber();
for (final StageDefinition stageDef : dataSourcePlan.getSubQueryDefBuilder().get().build().getStageDefinitions()) {
if (stageDef.getStageNumber() == previousStageNumber) {
RowSignature rowSignature = QueryKitUtils.sortableSignature(
stageDef.getSignature(),
shuffleSpec.clusterBy().getColumns()
);
queryDefBuilder.add(StageDefinition.builder(stageDef).shuffleSpec(shuffleSpec).signature(rowSignature));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be 100% sure that we are not causing correctness issues; can we validate if we are ok to override the old shuffleSpec ? if its not null or something we could allow we should preferably throw an Exception ( or if that's not really possibly the safest would be to add a new dummy stage which just re-shuffles?)

I guess the cases when its not safe to do so may need further investigation(s) - as those shuffles could possibly be moved "after" the window query....
I see the cases of clusterBy as something which should probably wrap-around the full built query regardless if its Scan / GroupBy / Window / "anything" ; but that could be a refactor of its own - which may close that gap for window queries as well.

note: I think if we don't handle clusterBy in this class then writing a windowed query to files might lead to unexpected results; but I guess that's not really a problem right now :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we throw error when the shuffleSpec is non-null for the stage that's getting overridden, a lot of tests fail in MSQDrillWindowQueryTest: 754 tests failed, 114 tests passed.

We can't throw an error when it's non-null 😅

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it is entirely possible that the final old shuffle spec contains non null shuffling. When creating the previous stages, it is the windowQueryKit which gives hints on what final it wants the shuffling to be based on resultShuffleSpecFactory parameter. Currently, this is globalSortWithMaxPartitionCount (if this was a normal subquery, we would want it to be running on as many workers as that stage allows).

I think it might be better to change the resultShuffleSpec we pass from windowQueryKit, and then do an assert on the shuffle spec.

@kgyrtkirk Do you know of any example query which might cause issues if we change the shuffle spec? I have thought about it a bit, but I can't think of one. Since it is window function which is reading the results of this shuffle, and it does not care about the ordering (I'm assuming this is the case since we want to change it to mixShuffleSpec), there should not be an issue with this change, from what I can tell, but I might have to think more about this.

} else {
queryDefBuilder.add(StageDefinition.builder(stageDef));
}
}
return queryDefBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.HashShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
Expand All @@ -39,7 +38,6 @@
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DimensionComparisonUtils;
import org.apache.druid.query.Query;
import org.apache.druid.query.dimension.DimensionSpec;
Expand Down Expand Up @@ -168,104 +166,40 @@ public QueryDefinition makeQueryDefinition(
partitionBoost
);

final ShuffleSpec nextShuffleWindowSpec = getShuffleSpecForNextWindow(originalQuery, maxWorkerCount);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
.signature(resultSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(
shuffleSpecFactoryPostAggregation != null
? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
: null
)
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
);

if (nextShuffleWindowSpec == null) {
if (doLimitOrOffset) {
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(
shuffleSpecFactoryPostAggregation != null
? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
: null
)
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
);

if (doLimitOrOffset) {
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
)
)
);
}
} else {
final RowSignature stageSignature;
// sort the signature to make sure the prefix is aligned
stageSignature = QueryKitUtils.sortableSignature(
resultSignature,
nextShuffleWindowSpec.clusterBy().getColumns()
);


queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
.signature(stageSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(doLimitOrOffset ? (shuffleSpecFactoryPostAggregation != null
? shuffleSpecFactoryPostAggregation.build(
resultClusterBy,
false
.maxWorkerCount(1)
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
)
)
: null) : nextShuffleWindowSpec)
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
);
if (doLimitOrOffset) {
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
)
)
);
}
}

return queryDefBuilder.build();
}

/**
* @param originalQuery which has the context for the next shuffle if that's present in the next window
* @param maxWorkerCount max worker count
* @return shuffle spec without partition boosting for next stage, null if there is no partition by for next window
*/
private ShuffleSpec getShuffleSpecForNextWindow(GroupByQuery originalQuery, int maxWorkerCount)
{
final ShuffleSpec nextShuffleWindowSpec;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
nextShuffleWindowSpec = new HashShuffleSpec(
windowClusterBy,
maxWorkerCount
);
} else {
nextShuffleWindowSpec = null;
}
return nextShuffleWindowSpec;
}

/**
* Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all
* aggregations are nonfinalized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
Expand Down Expand Up @@ -129,26 +128,8 @@ public QueryDefinition makeQueryDefinition(
);
}

// Update partition by of next window
final RowSignature signatureSoFar = signatureBuilder.build();
boolean addShuffle = true;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
for (KeyColumn c : windowClusterBy.getColumns()) {
if (!signatureSoFar.contains(c.columnName())) {
addShuffle = false;
break;
}
}
if (addShuffle) {
clusterByColumns.addAll(windowClusterBy.getColumns());
}
} else {
// Add partition boosting column.
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
}
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);

final ClusterBy clusterBy =
QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ public class MultiStageQueryContext
public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode";
public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.MVD;

public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol";

public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "maxRowsMaterializedInWindow";

public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification";
Expand Down
Loading