From 4c8738072afbb960cd98820775f6cc94f330c642 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 25 Sep 2024 21:44:27 +0530 Subject: [PATCH 1/9] Refactor WindowOperatorQueryKit to use WindowStage class for representing different window stages --- .../msq/querykit/WindowOperatorQueryKit.java | 174 +++++++++++------- 1 file changed, 107 insertions(+), 67 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 02542f8e7366..d4bfcf358a30 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -82,8 +82,8 @@ public QueryDefinition makeQueryDefinition( .map(of -> (NaivePartitioningOperatorFactory) of) .anyMatch(of -> of.getPartitionColumns().isEmpty()); - List> operatorList = getOperatorListFromQuery(originalQuery); - log.info("Created operatorList with operator factories: [%s]", operatorList); + final List stages = getWindowStagesFromQuery(originalQuery); + log.info("Created window stages with operator factories: [%s]", stages); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( queryKitSpec, @@ -97,7 +97,7 @@ public QueryDefinition makeQueryDefinition( ); ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow( - operatorList.get(0), + stages.get(0), queryKitSpec.getNumPartitionsForShuffle() ); final QueryDefinitionBuilder queryDefBuilder = @@ -154,32 +154,23 @@ public QueryDefinition makeQueryDefinition( List partitionColumnNames = new ArrayList<>(); - /* - operatorList is a List>, where each List corresponds to the operator factories - to be used for a different window stage. - - We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder. - */ - for (int i = 0; i < operatorList.size(); i++) { - for (OperatorFactory operatorFactory : operatorList.get(i)) { - if (operatorFactory instanceof WindowOperatorFactory) { - List outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames(); - - // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, - // since they need to be present in the row signature for this window stage. - for (String columnName : outputColumnNames) { - int indexInRowSignature = rowSignature.indexOf(columnName); - if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { - ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); - bob.add(columnName, columnType); - log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); - } else { - throw new ISE( - "Found unexpected column [%s] already present in row signature [%s].", - columnName, - rowSignature - ); - } + // Iterate over the list of window stages, and add the definition for each window stage to QueryDefinitionBuilder. + for (int i = 0; i < stages.size(); i++) { + for (WindowOperatorFactory operatorFactory : stages.get(i).getWindowOperatorFactories()) { + // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, + // since they need to be present in the row signature for this window stage. + for (String columnName : operatorFactory.getProcessor().getOutputColumnNames()) { + int indexInRowSignature = rowSignature.indexOf(columnName); + if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { + ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); + bob.add(columnName, columnType); + log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); + } else { + throw new ISE( + "Found unexpected column [%s] already present in row signature [%s].", + columnName, + rowSignature + ); } } } @@ -187,12 +178,11 @@ public QueryDefinition makeQueryDefinition( final RowSignature intermediateSignature = bob.build(); final RowSignature stageRowSignature; - if (i + 1 == operatorList.size()) { + if (i + 1 == stages.size()) { stageRowSignature = finalWindowStageRowSignature; nextShuffleSpec = finalWindowStageShuffleSpec; } else { - nextShuffleSpec = - findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); + nextShuffleSpec = findShuffleSpecForNextWindow(stages.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); if (nextShuffleSpec == null) { stageRowSignature = intermediateSignature; } else { @@ -205,18 +195,8 @@ public QueryDefinition makeQueryDefinition( log.info("Using row signature [%s] for window stage.", stageRowSignature); - boolean partitionOperatorExists = false; - List currentPartitionColumns = new ArrayList<>(); - for (OperatorFactory of : operatorList.get(i)) { - if (of instanceof NaivePartitioningOperatorFactory) { - for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) { - currentPartitionColumns.add(s); - partitionOperatorExists = true; - } - } - } - - if (partitionOperatorExists) { + final List currentPartitionColumns = stages.get(i).getPartitionColumns(); + if (!currentPartitionColumns.isEmpty()) { partitionColumnNames = currentPartitionColumns; } @@ -233,7 +213,7 @@ public QueryDefinition makeQueryDefinition( .shuffleSpec(nextShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, - operatorList.get(i), + stages.get(i).getOperatorFactories(), stageRowSignature, maxRowsMaterialized, partitionColumnNames @@ -247,34 +227,33 @@ public QueryDefinition makeQueryDefinition( /** * * @param originalQuery - * @return A list of list of operator factories, where each list represents the operator factories for a particular - * window stage. + * @return A list of {@link WindowStage}. */ - private List> getOperatorListFromQuery(WindowOperatorQuery originalQuery) + private List getWindowStagesFromQuery(WindowOperatorQuery originalQuery) { - List> operatorList = new ArrayList<>(); + final List stages = new ArrayList<>(); final List operators = originalQuery.getOperators(); - List currentStage = new ArrayList<>(); + WindowStage currentStage = new WindowStage(); for (int i = 0; i < operators.size(); i++) { OperatorFactory of = operators.get(i); - currentStage.add(of); + currentStage.addOperatorFactory(of); if (of instanceof WindowOperatorFactory) { // Process consecutive window operators while (i + 1 < operators.size() && operators.get(i + 1) instanceof WindowOperatorFactory) { i++; - currentStage.add(operators.get(i)); + currentStage.addOperatorFactory(operators.get(i)); } // Finalize the current stage - operatorList.add(new ArrayList<>(currentStage)); - currentStage.clear(); + stages.add(currentStage); + currentStage = new WindowStage(); } } // There shouldn't be any operators left in currentStage. The last operator should always be WindowOperatorFactory. - if (!currentStage.isEmpty()) { + if (!currentStage.getOperatorFactories().isEmpty()) { throw new ISE( "Found unexpected operators [%s] present in the list of operators [%s].", currentStage, @@ -282,20 +261,13 @@ private List> getOperatorListFromQuery(WindowOperatorQuery ); } - return operatorList; + return stages; } - private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int partitionCount) + private ShuffleSpec findShuffleSpecForNextWindow(WindowStage windowStage, int partitionCount) { - NaivePartitioningOperatorFactory partition = null; - NaiveSortOperatorFactory sort = null; - for (OperatorFactory of : operatorFactories) { - if (of instanceof NaivePartitioningOperatorFactory) { - partition = (NaivePartitioningOperatorFactory) of; - } else if (of instanceof NaiveSortOperatorFactory) { - sort = (NaiveSortOperatorFactory) of; - } - } + final NaivePartitioningOperatorFactory partition = windowStage.getPartitioningOperatorFactory(); + final NaiveSortOperatorFactory sort = windowStage.getSortOperatorFactory(); Map sortColumnsMap = new HashMap<>(); if (sort != null) { @@ -305,7 +277,7 @@ private ShuffleSpec findShuffleSpecForNextWindow(List operatorF } if (partition == null) { - // If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. + // If the window stage doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. // This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. return null; } @@ -377,4 +349,72 @@ private static RowSignature computeSignatureForFinalWindowStage(RowSignature row finalWindowClusterBy.getColumns() ); } + + /** + * Represents a window stage in a query execution. + * Each stage can contain a sort operator, a partition operator, and multiple window operators. + */ + private static class WindowStage + { + private NaiveSortOperatorFactory sortOperatorFactory; + private NaivePartitioningOperatorFactory partitioningOperatorFactory; + private final List windowOperatorFactories; + + public WindowStage() + { + this.windowOperatorFactories = new ArrayList<>(); + } + + public void addOperatorFactory(OperatorFactory op) + { + if (op instanceof NaiveSortOperatorFactory) { + this.sortOperatorFactory = (NaiveSortOperatorFactory) op; + } else if (op instanceof NaivePartitioningOperatorFactory) { + this.partitioningOperatorFactory = (NaivePartitioningOperatorFactory) op; + } else { + this.windowOperatorFactories.add((WindowOperatorFactory) op); + } + } + + public List getOperatorFactories() + { + List operatorFactories = new ArrayList<>(); + if (sortOperatorFactory != null) { + operatorFactories.add(sortOperatorFactory); + } + if (partitioningOperatorFactory != null) { + operatorFactories.add(partitioningOperatorFactory); + } + operatorFactories.addAll(windowOperatorFactories); + return operatorFactories; + } + + public List getPartitionColumns() + { + return partitioningOperatorFactory != null ? partitioningOperatorFactory.getPartitionColumns() : new ArrayList<>(); + } + + public NaiveSortOperatorFactory getSortOperatorFactory() + { + return sortOperatorFactory; + } + + public NaivePartitioningOperatorFactory getPartitioningOperatorFactory() + { + return partitioningOperatorFactory; + } + + public List getWindowOperatorFactories() + { + return windowOperatorFactories; + } + + @Override + public String toString() + { + return "WindowStage{" + + "operatorFactories=" + getOperatorFactories() + + '}'; + } + } } From 713bacebd0edc0ad2a340387539f8c549d5a04a2 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 2 Oct 2024 00:02:39 +0530 Subject: [PATCH 2/9] Address review comments --- .../msq/querykit/WindowOperatorQueryKit.java | 82 ++++++++----------- 1 file changed, 33 insertions(+), 49 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index d4bfcf358a30..3e7500b7238f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -96,10 +96,7 @@ public QueryDefinition makeQueryDefinition( false ); - ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow( - stages.get(0), - queryKitSpec.getNumPartitionsForShuffle() - ); + ShuffleSpec nextShuffleSpec = stages.get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); @@ -182,7 +179,7 @@ public QueryDefinition makeQueryDefinition( stageRowSignature = finalWindowStageRowSignature; nextShuffleSpec = finalWindowStageShuffleSpec; } else { - nextShuffleSpec = findShuffleSpecForNextWindow(stages.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); + nextShuffleSpec = stages.get(i + 1).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); if (nextShuffleSpec == null) { stageRowSignature = intermediateSignature; } else { @@ -264,42 +261,6 @@ private List getWindowStagesFromQuery(WindowOperatorQuery originalQ return stages; } - private ShuffleSpec findShuffleSpecForNextWindow(WindowStage windowStage, int partitionCount) - { - final NaivePartitioningOperatorFactory partition = windowStage.getPartitioningOperatorFactory(); - final NaiveSortOperatorFactory sort = windowStage.getSortOperatorFactory(); - - Map sortColumnsMap = new HashMap<>(); - if (sort != null) { - for (ColumnWithDirection sortColumn : sort.getSortColumns()) { - sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection()); - } - } - - if (partition == null) { - // If the window stage doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. - // This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. - return null; - } - - if (partition.getPartitionColumns().isEmpty()) { - return MixShuffleSpec.instance(); - } - - List keyColsOfWindow = new ArrayList<>(); - for (String partitionColumn : partition.getPartitionColumns()) { - KeyColumn kc; - if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { - kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); - } else { - kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); - } - keyColsOfWindow.add(kc); - } - - return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); - } - /** * Override the shuffle spec of the last stage based on the shuffling required by the first window stage. * @param queryId @@ -394,19 +355,42 @@ public List getPartitionColumns() return partitioningOperatorFactory != null ? partitioningOperatorFactory.getPartitionColumns() : new ArrayList<>(); } - public NaiveSortOperatorFactory getSortOperatorFactory() + public List getWindowOperatorFactories() { - return sortOperatorFactory; + return windowOperatorFactories; } - public NaivePartitioningOperatorFactory getPartitioningOperatorFactory() + public ShuffleSpec findShuffleSpec(int partitionCount) { - return partitioningOperatorFactory; - } + Map sortColumnsMap = new HashMap<>(); + if (sortOperatorFactory != null) { + for (ColumnWithDirection sortColumn : sortOperatorFactory.getSortColumns()) { + sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection()); + } + } - public List getWindowOperatorFactories() - { - return windowOperatorFactories; + if (partitioningOperatorFactory == null) { + // If the window stage doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. + // This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. + return null; + } + + if (partitioningOperatorFactory.getPartitionColumns().isEmpty()) { + return MixShuffleSpec.instance(); + } + + List keyColsOfWindow = new ArrayList<>(); + for (String partitionColumn : partitioningOperatorFactory.getPartitionColumns()) { + KeyColumn kc; + if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { + kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); + } else { + kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); + } + keyColsOfWindow.add(kc); + } + + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); } @Override From 3f305a334a67a8c7296181f02b76306bfbf92cc5 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 9 Oct 2024 20:36:49 +0530 Subject: [PATCH 3/9] Address review comments --- .../msq/querykit/WindowOperatorQueryKit.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 3e7500b7238f..78cb97df0a6d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -232,32 +232,17 @@ private List getWindowStagesFromQuery(WindowOperatorQuery originalQ final List operators = originalQuery.getOperators(); WindowStage currentStage = new WindowStage(); - for (int i = 0; i < operators.size(); i++) { - OperatorFactory of = operators.get(i); - currentStage.addOperatorFactory(of); - - if (of instanceof WindowOperatorFactory) { - // Process consecutive window operators - while (i + 1 < operators.size() && operators.get(i + 1) instanceof WindowOperatorFactory) { - i++; - currentStage.addOperatorFactory(operators.get(i)); - } - - // Finalize the current stage + for (OperatorFactory of : operators) { + if (!currentStage.getOperatorFactories().isEmpty() && !currentStage.canAccept(of)) { stages.add(currentStage); currentStage = new WindowStage(); } + currentStage.addOperatorFactory(of); } - // There shouldn't be any operators left in currentStage. The last operator should always be WindowOperatorFactory. if (!currentStage.getOperatorFactories().isEmpty()) { - throw new ISE( - "Found unexpected operators [%s] present in the list of operators [%s].", - currentStage, - operators - ); + stages.add(currentStage); } - return stages; } @@ -393,11 +378,26 @@ public ShuffleSpec findShuffleSpec(int partitionCount) return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); } + public boolean canAccept(OperatorFactory operatorFactory) + { + if (operatorFactory instanceof NaiveSortOperatorFactory) { + return false; + } + + if (operatorFactory instanceof WindowOperatorFactory) { + return true; + } + // If it's a PartitioningOperatorFactory, we can add it to current stage if NaiveSortOperatorFactory is present. + return sortOperatorFactory != null; + } + @Override public String toString() { return "WindowStage{" + - "operatorFactories=" + getOperatorFactories() + + "sortOperatorFactory=" + sortOperatorFactory + + ", partitioningOperatorFactory=" + partitioningOperatorFactory + + ", windowOperatorFactories=" + windowOperatorFactories + '}'; } } From 2b3ac1713597c489a6ba85908d4fdaf6bd107b8c Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 12 Oct 2024 13:19:09 +0530 Subject: [PATCH 4/9] Refactor logic into WindowStages and WindowStage class --- .../msq/querykit/WindowOperatorQueryKit.java | 380 ++++++++++-------- 1 file changed, 206 insertions(+), 174 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 78cb97df0a6d..327610bd3966 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -34,6 +34,7 @@ import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.StageDefinitionBuilder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; @@ -68,23 +69,6 @@ public QueryDefinition makeQueryDefinition( int minStageNumber ) { - // Need to validate query first. - // Populate the group of operators to be processed at each stage. - // The size of the operators is the number of serialized stages. - // Later we should also check if these can be parallelized. - // Check if there is an empty OVER() clause or not. - RowSignature rowSignature = originalQuery.getRowSignature(); - log.info("Row signature received for query is [%s].", rowSignature); - - boolean isEmptyOverPresent = originalQuery.getOperators() - .stream() - .filter(of -> of instanceof NaivePartitioningOperatorFactory) - .map(of -> (NaivePartitioningOperatorFactory) of) - .anyMatch(of -> of.getPartitionColumns().isEmpty()); - - final List stages = getWindowStagesFromQuery(originalQuery); - log.info("Created window stages with operator factories: [%s]", stages); - final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( queryKitSpec, originalQuery.context(), @@ -95,44 +79,41 @@ public QueryDefinition makeQueryDefinition( minStageNumber, false ); + final RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature(); + + final WindowStages windowStages = new WindowStages( + originalQuery, + jsonMapper, + queryKitSpec.getNumPartitionsForShuffle(), + queryKitSpec.getMaxNonLeafWorkerCount(), + resultShuffleSpecFactory, + signatureFromInput + ); - ShuffleSpec nextShuffleSpec = stages.get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); - final QueryDefinitionBuilder queryDefBuilder = - makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); + final ShuffleSpec nextShuffleSpec = windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); + final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); - final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); + final int firstWindowStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); - // Get segment granularity from query context, and create ShuffleSpec and RowSignature to be used for the final window stage. - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); - final ClusterBy finalWindowClusterBy = computeClusterByForFinalWindowStage(segmentGranularity); - final ShuffleSpec finalWindowStageShuffleSpec = resultShuffleSpecFactory.build(finalWindowClusterBy, false); - final RowSignature finalWindowStageRowSignature = computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy, segmentGranularity); - - final int maxRowsMaterialized; - if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { - maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); - } else { - maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; - } - - if (isEmptyOverPresent) { + if (windowStages.hasEmptyOverClause()) { // Move everything to a single partition since we have to load all the data on a single worker anyway to compute empty over() clause. log.info( "Empty over clause is present in the query. Creating a single stage with all operator factories [%s].", queryToRun.getOperators() ); + final RowSignature finalWindowStageRowSignature = windowStages.getSignatureForFinalWindowStage(); queryDefBuilder.add( - StageDefinition.builder(firstStageNumber) - .inputs(new StageInputSpec(firstStageNumber - 1)) + StageDefinition.builder(firstWindowStageNumber) + .inputs(new StageInputSpec(firstWindowStageNumber - 1)) .signature(finalWindowStageRowSignature) .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) - .shuffleSpec(finalWindowStageShuffleSpec) + .shuffleSpec(windowStages.getShuffleSpecForFinalWindowStage()) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, queryToRun.getOperators(), finalWindowStageRowSignature, - maxRowsMaterialized, + windowStages.getMaxRowsMaterialized(), Collections.emptyList() )) ); @@ -141,111 +122,16 @@ public QueryDefinition makeQueryDefinition( // Create stages for each window in the query. // These stages will be serialized. // The partition by clause of the next window will be the shuffle key for the previous window. - RowSignature.Builder bob = RowSignature.builder(); - RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature(); log.info("Row signature received from last stage is [%s].", signatureFromInput); - for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) { - bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get()); - } - - List partitionColumnNames = new ArrayList<>(); - // Iterate over the list of window stages, and add the definition for each window stage to QueryDefinitionBuilder. - for (int i = 0; i < stages.size(); i++) { - for (WindowOperatorFactory operatorFactory : stages.get(i).getWindowOperatorFactories()) { - // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, - // since they need to be present in the row signature for this window stage. - for (String columnName : operatorFactory.getProcessor().getOutputColumnNames()) { - int indexInRowSignature = rowSignature.indexOf(columnName); - if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { - ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); - bob.add(columnName, columnType); - log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); - } else { - throw new ISE( - "Found unexpected column [%s] already present in row signature [%s].", - columnName, - rowSignature - ); - } - } - } - - final RowSignature intermediateSignature = bob.build(); - final RowSignature stageRowSignature; - - if (i + 1 == stages.size()) { - stageRowSignature = finalWindowStageRowSignature; - nextShuffleSpec = finalWindowStageShuffleSpec; - } else { - nextShuffleSpec = stages.get(i + 1).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); - if (nextShuffleSpec == null) { - stageRowSignature = intermediateSignature; - } else { - stageRowSignature = QueryKitUtils.sortableSignature( - intermediateSignature, - nextShuffleSpec.clusterBy().getColumns() - ); - } - } - - log.info("Using row signature [%s] for window stage.", stageRowSignature); - - final List currentPartitionColumns = stages.get(i).getPartitionColumns(); - if (!currentPartitionColumns.isEmpty()) { - partitionColumnNames = currentPartitionColumns; - } - - log.info( - "Columns which would be used to define partitioning boundaries for this window stage are [%s]", - partitionColumnNames - ); - - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + i) - .inputs(new StageInputSpec(firstStageNumber + i - 1)) - .signature(stageRowSignature) - .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) - .shuffleSpec(nextShuffleSpec) - .processorFactory(new WindowOperatorQueryFrameProcessorFactory( - queryToRun, - stages.get(i).getOperatorFactories(), - stageRowSignature, - maxRowsMaterialized, - partitionColumnNames - )) - ); + for (int i = 0; i < windowStages.getStages().size(); i++) { + queryDefBuilder.add(windowStages.getStageDefinitionBuilder(firstWindowStageNumber + i, i)); } } return queryDefBuilder.build(); } - /** - * - * @param originalQuery - * @return A list of {@link WindowStage}. - */ - private List getWindowStagesFromQuery(WindowOperatorQuery originalQuery) - { - final List stages = new ArrayList<>(); - final List operators = originalQuery.getOperators(); - WindowStage currentStage = new WindowStage(); - - for (OperatorFactory of : operators) { - if (!currentStage.getOperatorFactories().isEmpty() && !currentStage.canAccept(of)) { - stages.add(currentStage); - currentStage = new WindowStage(); - } - currentStage.addOperatorFactory(of); - } - - if (!currentStage.getOperatorFactories().isEmpty()) { - stages.add(currentStage); - } - return stages; - } - /** * Override the shuffle spec of the last stage based on the shuffling required by the first window stage. * @param queryId @@ -272,30 +158,174 @@ private QueryDefinitionBuilder makeQueryDefinitionBuilder(String queryId, DataSo } /** - * Computes the ClusterBy for the final window stage. We don't have to take the CLUSTERED BY columns into account, - * as they are handled as {@link org.apache.druid.query.scan.ScanQuery#orderBys}. + * Represents the window stages to be added to {@link QueryDefinitionBuilder}. + * This class is responsible for creating the window stages. */ - private static ClusterBy computeClusterByForFinalWindowStage(Granularity segmentGranularity) + private static class WindowStages { - final List clusterByColumns = Collections.singletonList(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); - return QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); - } + private final List stages; + private final WindowOperatorQuery query; + private final int numPartitionsForShuffle; + private final int maxNonLeafWorkerCount; + private final ShuffleSpec finalWindowStageShuffleSpec; + private final RowSignature finalWindowStageRowSignature; + private final RowSignature.Builder rowSignatureBuilder; + + private WindowStages( + WindowOperatorQuery query, + ObjectMapper jsonMapper, + int numPartitionsForShuffle, + int maxNonLeafWorkerCount, + ShuffleSpecFactory resultShuffleSpecFactory, + RowSignature signatureFromInput + ) + { + this.stages = new ArrayList<>(); + this.query = query; + this.numPartitionsForShuffle = numPartitionsForShuffle; + this.maxNonLeafWorkerCount = maxNonLeafWorkerCount; + + final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext()); + final ClusterBy finalWindowClusterBy = computeClusterByForFinalWindowStage(segmentGranularity); + this.finalWindowStageShuffleSpec = computeShuffleSpecForFinalWindowStage(resultShuffleSpecFactory, finalWindowClusterBy); + this.finalWindowStageRowSignature = computeSignatureForFinalWindowStage(query.getRowSignature(), finalWindowClusterBy, segmentGranularity); + + this.rowSignatureBuilder = RowSignature.builder().addAll(signatureFromInput); + populateStages(); + } - /** - * Computes the signature for the final window stage. The finalWindowClusterBy will always have the - * partition boost column as computed in {@link #computeClusterByForFinalWindowStage(Granularity)}. - */ - private static RowSignature computeSignatureForFinalWindowStage(RowSignature rowSignature, ClusterBy finalWindowClusterBy, Granularity segmentGranularity) - { - final RowSignature.Builder finalWindowStageRowSignatureBuilder = RowSignature.builder() - .addAll(rowSignature) - .add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); - return QueryKitUtils.sortableSignature( - QueryKitUtils.signatureWithSegmentGranularity(finalWindowStageRowSignatureBuilder.build(), segmentGranularity), - finalWindowClusterBy.getColumns() - ); - } + private void populateStages() + { + WindowStage currentStage = new WindowStage(); + for (OperatorFactory of : query.getOperators()) { + if (!currentStage.canAccept(of)) { + stages.add(currentStage); + currentStage = new WindowStage(); + } + currentStage.addOperatorFactory(of); + } + if (!currentStage.getOperatorFactories().isEmpty()) { + stages.add(currentStage); + } + } + + private List getStages() + { + return stages; + } + + private RowSignature getRowSignatureForStage(int windowStageIndex, ShuffleSpec shuffleSpec) + { + if (windowStageIndex == stages.size() - 1) { + return finalWindowStageRowSignature; + } + + final WindowStage stage = stages.get(windowStageIndex); + for (WindowOperatorFactory operatorFactory : stage.getWindowOperatorFactories()) { + for (String columnName : operatorFactory.getProcessor().getOutputColumnNames()) { + int indexInRowSignature = query.getRowSignature().indexOf(columnName); + if (indexInRowSignature != -1 && rowSignatureBuilder.build().indexOf(columnName) == -1) { + ColumnType columnType = query.getRowSignature().getColumnType(indexInRowSignature).get(); + rowSignatureBuilder.add(columnName, columnType); + } + } + } + + final RowSignature intermediateSignature = rowSignatureBuilder.build(); + + final RowSignature stageRowSignature; + if (shuffleSpec == null) { + stageRowSignature = intermediateSignature; + } else { + stageRowSignature = QueryKitUtils.sortableSignature( + intermediateSignature, + shuffleSpec.clusterBy().getColumns() + ); + } + + log.info("Using row signature [%s] for window stage.", stageRowSignature); + return stageRowSignature; + } + + private StageDefinitionBuilder getStageDefinitionBuilder(int stageNumber, int windowStageIndex) + { + final WindowStage stage = stages.get(windowStageIndex); + final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ? + finalWindowStageShuffleSpec : + stages.get(windowStageIndex + 1).findShuffleSpec(numPartitionsForShuffle); + + final RowSignature stageRowSignature = getRowSignatureForStage(windowStageIndex, shuffleSpec); + + return StageDefinition.builder(stageNumber) + .inputs(new StageInputSpec(stageNumber - 1)) + .signature(stageRowSignature) + .maxWorkerCount(maxNonLeafWorkerCount) + .shuffleSpec(shuffleSpec) + .processorFactory(new WindowOperatorQueryFrameProcessorFactory( + query, + stage.getOperatorFactories(), + stageRowSignature, + getMaxRowsMaterialized(), + stage.getPartitionColumns() + )); + } + + /** + * Computes the ClusterBy for the final window stage. We don't have to take the CLUSTERED BY columns into account, + * as they are handled as {@link org.apache.druid.query.scan.ScanQuery#orderBys}. + */ + private ClusterBy computeClusterByForFinalWindowStage(Granularity segmentGranularity) + { + final List clusterByColumns = Collections.singletonList(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); + return QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); + } + + /** + * Computes the signature for the final window stage. The finalWindowClusterBy will always have the + * partition boost column as computed in {@link #computeClusterByForFinalWindowStage(Granularity)}. + */ + private RowSignature computeSignatureForFinalWindowStage(RowSignature rowSignature, ClusterBy finalWindowClusterBy, Granularity segmentGranularity) + { + final RowSignature.Builder finalWindowStageRowSignatureBuilder = RowSignature.builder() + .addAll(rowSignature) + .add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); + return QueryKitUtils.sortableSignature( + QueryKitUtils.signatureWithSegmentGranularity(finalWindowStageRowSignatureBuilder.build(), segmentGranularity), + finalWindowClusterBy.getColumns() + ); + } + + private ShuffleSpec computeShuffleSpecForFinalWindowStage(ShuffleSpecFactory resultShuffleSpecFactory, ClusterBy finalWindowClusterBy) + { + return resultShuffleSpecFactory.build(finalWindowClusterBy, false); + } + + private RowSignature getSignatureForFinalWindowStage() + { + return this.finalWindowStageRowSignature; + } + + private ShuffleSpec getShuffleSpecForFinalWindowStage() + { + return this.finalWindowStageShuffleSpec; + } + + private int getMaxRowsMaterialized() + { + return query.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW) ? + (int) query.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW) : + Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; + } + private boolean hasEmptyOverClause() + { + return query.getOperators().stream() + .filter(of -> of instanceof NaivePartitioningOperatorFactory) + .map(of -> (NaivePartitioningOperatorFactory) of) + .anyMatch(of -> of.getPartitionColumns().isEmpty()); + } + } + /** * Represents a window stage in a query execution. * Each stage can contain a sort operator, a partition operator, and multiple window operators. @@ -306,12 +336,12 @@ private static class WindowStage private NaivePartitioningOperatorFactory partitioningOperatorFactory; private final List windowOperatorFactories; - public WindowStage() + private WindowStage() { this.windowOperatorFactories = new ArrayList<>(); } - public void addOperatorFactory(OperatorFactory op) + private void addOperatorFactory(OperatorFactory op) { if (op instanceof NaiveSortOperatorFactory) { this.sortOperatorFactory = (NaiveSortOperatorFactory) op; @@ -322,7 +352,7 @@ public void addOperatorFactory(OperatorFactory op) } } - public List getOperatorFactories() + private List getOperatorFactories() { List operatorFactories = new ArrayList<>(); if (sortOperatorFactory != null) { @@ -335,17 +365,17 @@ public List getOperatorFactories() return operatorFactories; } - public List getPartitionColumns() + private List getPartitionColumns() { return partitioningOperatorFactory != null ? partitioningOperatorFactory.getPartitionColumns() : new ArrayList<>(); } - public List getWindowOperatorFactories() + private List getWindowOperatorFactories() { return windowOperatorFactories; } - public ShuffleSpec findShuffleSpec(int partitionCount) + private ShuffleSpec findShuffleSpec(int partitionCount) { Map sortColumnsMap = new HashMap<>(); if (sortOperatorFactory != null) { @@ -364,33 +394,35 @@ public ShuffleSpec findShuffleSpec(int partitionCount) return MixShuffleSpec.instance(); } - List keyColsOfWindow = new ArrayList<>(); + final List keyColsOfWindow = new ArrayList<>(); for (String partitionColumn : partitioningOperatorFactory.getPartitionColumns()) { - KeyColumn kc; - if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { - kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); - } else { - kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); - } + KeyColumn kc = new KeyColumn( + partitionColumn, + sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC ? KeyOrder.DESCENDING : KeyOrder.ASCENDING + ); keyColsOfWindow.add(kc); } return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); } - public boolean canAccept(OperatorFactory operatorFactory) + private boolean canAccept(OperatorFactory operatorFactory) { + if (getOperatorFactories().isEmpty()) { + return true; + } if (operatorFactory instanceof NaiveSortOperatorFactory) { return false; } - if (operatorFactory instanceof WindowOperatorFactory) { return true; } - // If it's a PartitioningOperatorFactory, we can add it to current stage if NaiveSortOperatorFactory is present. - return sortOperatorFactory != null; + if (operatorFactory instanceof NaivePartitioningOperatorFactory) { + return sortOperatorFactory != null; + } + throw new ISE("Encountered unexpected operatorFactory type: [%s]", operatorFactory.getClass().getName()); } - + @Override public String toString() { From 684b8775ff7ade58ea5f1910e076d7dd91ab0083 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 12 Oct 2024 16:47:52 +0530 Subject: [PATCH 5/9] Add log message --- .../org/apache/druid/msq/querykit/WindowOperatorQueryKit.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 327610bd3966..15e0f2c0c2d1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -207,6 +207,8 @@ private void populateStages() if (!currentStage.getOperatorFactories().isEmpty()) { stages.add(currentStage); } + + log.info("Created window stages: [%s]", stages); } private List getStages() From c5b2166a3f9345084404a848a6c49dee10de3c51 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 30 Oct 2024 14:28:16 +0530 Subject: [PATCH 6/9] Revert unnecessary diff --- .../msq/querykit/WindowOperatorQueryKit.java | 78 +++++++++---------- 1 file changed, 36 insertions(+), 42 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index c82e56ad7a65..66ee5d3f1200 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -93,14 +93,8 @@ public QueryDefinition makeQueryDefinition( signatureFromInput ); - final ShuffleSpec nextShuffleSpec = windowStages.getStages() - .get(0) - .findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); - final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder( - queryKitSpec.getQueryId(), - dataSourcePlan, - nextShuffleSpec - ); + final ShuffleSpec nextShuffleSpec = windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); + final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); final int firstWindowStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); log.info("Row signature received from last stage is [%s].", signatureFromInput); @@ -112,40 +106,6 @@ public QueryDefinition makeQueryDefinition( return queryDefBuilder.build(); } - /** - * Override the shuffle spec of the last stage based on the shuffling required by the first window stage. - * - * @param queryId - * @param dataSourcePlan - * @param shuffleSpec - * @return - */ - private QueryDefinitionBuilder makeQueryDefinitionBuilder( - String queryId, - DataSourcePlan dataSourcePlan, - ShuffleSpec shuffleSpec - ) - { - final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); - int previousStageNumber = dataSourcePlan.getSubQueryDefBuilder() - .get() - .build() - .getFinalStageDefinition() - .getStageNumber(); - for (final StageDefinition stageDef : dataSourcePlan.getSubQueryDefBuilder().get().build().getStageDefinitions()) { - if (stageDef.getStageNumber() == previousStageNumber) { - RowSignature rowSignature = QueryKitUtils.sortableSignature( - stageDef.getSignature(), - shuffleSpec.clusterBy().getColumns() - ); - queryDefBuilder.add(StageDefinition.builder(stageDef).shuffleSpec(shuffleSpec).signature(rowSignature)); - } else { - queryDefBuilder.add(StageDefinition.builder(stageDef)); - } - } - return queryDefBuilder; - } - /** * Represents the window stages to be added to {@link QueryDefinitionBuilder}. * This class is responsible for creating the window stages. @@ -434,4 +394,38 @@ public String toString() '}'; } } + + /** + * Override the shuffle spec of the last stage based on the shuffling required by the first window stage. + * + * @param queryId + * @param dataSourcePlan + * @param shuffleSpec + * @return + */ + private QueryDefinitionBuilder makeQueryDefinitionBuilder( + String queryId, + DataSourcePlan dataSourcePlan, + ShuffleSpec shuffleSpec + ) + { + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); + int previousStageNumber = dataSourcePlan.getSubQueryDefBuilder() + .get() + .build() + .getFinalStageDefinition() + .getStageNumber(); + for (final StageDefinition stageDef : dataSourcePlan.getSubQueryDefBuilder().get().build().getStageDefinitions()) { + if (stageDef.getStageNumber() == previousStageNumber) { + RowSignature rowSignature = QueryKitUtils.sortableSignature( + stageDef.getSignature(), + shuffleSpec.clusterBy().getColumns() + ); + queryDefBuilder.add(StageDefinition.builder(stageDef).shuffleSpec(shuffleSpec).signature(rowSignature)); + } else { + queryDefBuilder.add(StageDefinition.builder(stageDef)); + } + } + return queryDefBuilder; + } } From 8e96e3df2dc26d970a311058a610d7c688472553 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 30 Oct 2024 14:29:16 +0530 Subject: [PATCH 7/9] Revert unnecessary diff --- .../druid/msq/querykit/WindowOperatorQueryKit.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 66ee5d3f1200..6b010917164c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -403,18 +403,10 @@ public String toString() * @param shuffleSpec * @return */ - private QueryDefinitionBuilder makeQueryDefinitionBuilder( - String queryId, - DataSourcePlan dataSourcePlan, - ShuffleSpec shuffleSpec - ) + private QueryDefinitionBuilder makeQueryDefinitionBuilder(String queryId, DataSourcePlan dataSourcePlan, ShuffleSpec shuffleSpec) { final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); - int previousStageNumber = dataSourcePlan.getSubQueryDefBuilder() - .get() - .build() - .getFinalStageDefinition() - .getStageNumber(); + int previousStageNumber = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getStageNumber(); for (final StageDefinition stageDef : dataSourcePlan.getSubQueryDefBuilder().get().build().getStageDefinitions()) { if (stageDef.getStageNumber() == previousStageNumber) { RowSignature rowSignature = QueryKitUtils.sortableSignature( From 3a1502fba12b0c28cc33abb2345bd257383c4b85 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 30 Oct 2024 14:30:40 +0530 Subject: [PATCH 8/9] Revert unnecessary diff --- .../org/apache/druid/msq/querykit/WindowOperatorQueryKit.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 6b010917164c..64602e86fb1b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -397,7 +397,6 @@ public String toString() /** * Override the shuffle spec of the last stage based on the shuffling required by the first window stage. - * * @param queryId * @param dataSourcePlan * @param shuffleSpec From 7965540a537d538421c720dec04848d6ca290013 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 12 Nov 2024 10:05:12 +0530 Subject: [PATCH 9/9] Fix refactoring due to merge commit --- .../msq/querykit/WindowOperatorQueryKit.java | 55 +++++++------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 0f67acdbb458..ca9cacef7bf4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -225,7 +225,7 @@ private StageDefinitionBuilder getStageDefinitionBuilder(int stageNumber, int wi final RowSignature stageRowSignature = getRowSignatureForStage(windowStageIndex, shuffleSpec); final List operatorFactories = isOperatorTransformationEnabled - ? getTransformedOperatorFactoryListForStageDefinition(stage.getOperatorFactories()) + ? stage.getTransformedOperatorFactories() : stage.getOperatorFactories(); return StageDefinition.builder(stageNumber) @@ -292,39 +292,6 @@ private int getMaxRowsMaterialized() { return MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context()); } - - - /** - * This method converts the operator chain received from native plan into MSQ plan. - * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). - * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. - * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. - * @param operatorFactoryListFromQuery - * @return - */ - private List getTransformedOperatorFactoryListForStageDefinition(List operatorFactoryListFromQuery) - { - final List operatorFactoryList = new ArrayList<>(); - final List sortOperatorFactoryList = new ArrayList<>(); - for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) { - if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { - AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory; - operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), getMaxRowsMaterialized())); - } else if (operatorFactory instanceof AbstractSortOperatorFactory) { - AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory; - sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); - } else { - // Add all the PartitionSortOperator(s) before every window operator. - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - operatorFactoryList.add(operatorFactory); - } - } - - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - return operatorFactoryList; - } } /** @@ -368,6 +335,26 @@ private List getOperatorFactories() return operatorFactories; } + /** + * This method converts the operator chain received from native plan into MSQ plan. + * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). + * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. + * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. + * @return + */ + private List getTransformedOperatorFactories() + { + List operatorFactories = new ArrayList<>(); + if (partitioningOperatorFactory != null) { + operatorFactories.add(new GlueingPartitioningOperatorFactory(partitioningOperatorFactory.getPartitionColumns(), maxRowsMaterialized)); + } + if (sortOperatorFactory != null) { + operatorFactories.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); + } + operatorFactories.addAll(windowOperatorFactories); + return operatorFactories; + } + private List getWindowOperatorFactories() { return windowOperatorFactories;