From 3dd9dcacdd6487e8c3cac0366b580448fba90b65 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 21 Jun 2024 13:29:08 +0530 Subject: [PATCH 01/13] Add test --- .../apache/druid/msq/exec/MSQExportTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java index 71b816e78c58..538cd4714200 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java @@ -316,6 +316,52 @@ private List readResultsFromFile(File resultFile) throws IOException } } + @Test + public void testExportWithLimit() throws IOException + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + File exportDir = newTempFolder("export"); + + Map queryContext = new HashMap<>(DEFAULT_MSQ_CONTEXT); + queryContext.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 1); + + final String sql = StringUtils.format("insert into extern(local(exportPath=>'%s')) as csv select cnt, dim1 from foo limit 3", exportDir.getAbsolutePath()); + + testIngestQuery().setSql(sql) + .setExpectedDataSource("foo1") + .setQueryContext(queryContext) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of()) + .setExpectedResultRows(ImmutableList.of()) + .verifyResults(); + + Assert.assertEquals( + ImmutableList.of( + "cnt,dim1", + "1," + ), + readResultsFromFile(new File(exportDir, "query-test-query-worker0-partition0.csv")) + ); + Assert.assertEquals( + ImmutableList.of( + "cnt,dim1", + "1,10.1" + ), + readResultsFromFile(new File(exportDir, "query-test-query-worker0-partition1.csv")) + ); + Assert.assertEquals( + ImmutableList.of( + "cnt,dim1", + "1,2" + ), + readResultsFromFile(new File(exportDir, "query-test-query-worker0-partition2.csv")) + ); + } + private void verifyManifestFile(File exportDir, List resultFiles) throws IOException { final File manifestFile = new File(exportDir, ExportMetadataManager.MANIFEST_FILE); From 85c32e82e83e1a166c9c67dc2ef78be4dd614a84 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sat, 22 Jun 2024 23:14:32 +0530 Subject: [PATCH 02/13] Fix limiting --- .../apache/druid/msq/exec/ControllerImpl.java | 3 ++- .../druid/msq/indexing/MSQControllerTask.java | 10 ++++++++++ .../druid/msq/querykit/DataSourcePlan.java | 3 ++- .../druid/msq/querykit/MultiQueryKit.java | 6 ++++-- .../org/apache/druid/msq/querykit/QueryKit.java | 4 +++- .../msq/querykit/WindowOperatorQueryKit.java | 3 ++- .../msq/querykit/groupby/GroupByQueryKit.java | 17 ++++++++++++----- .../druid/msq/querykit/scan/ScanQueryKit.java | 8 +++++--- 8 files changed, 40 insertions(+), 14 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b6541c7f26aa..8200d1815fa0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1724,7 +1724,8 @@ private static QueryDefinition makeQueryDefinition( toolKit, shuffleSpecFactory, tuningConfig.getMaxNumWorkers(), - 0 + 0, + MSQControllerTask.needsFinalShuffling(querySpec) ); } catch (MSQException e) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index bdaf3964b299..5f0d1f838886 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -315,6 +315,16 @@ public static boolean isExport(final MSQSpec querySpec) return querySpec.getDestination() instanceof ExportMSQDestination; } + public static boolean isDurableStorageQuery(final MSQSpec querySpec) + { + return querySpec.getDestination() instanceof DurableStorageMSQDestination; + } + + public static boolean needsFinalShuffling(final MSQSpec querySpec) + { + return isDurableStorageQuery(querySpec) || isExport(querySpec); + } + /** * Returns true if the task reads from the same table as the destination. In this case, we would prefer to fail * instead of reading any unused segments to ensure that old data is not read. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 831c9b139d3d..93977ee960fe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -442,7 +442,8 @@ private static DataSourcePlan forQuery( queryKit, ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount), maxWorkerCount, - minStageNumber + minStageNumber, + false ); final int stageNumber = subQueryDef.getFinalStageDefinition().getStageNumber(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java index a795f6496053..23c48432af18 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java @@ -46,7 +46,8 @@ public QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, - int minStageNumber + int minStageNumber, + boolean needsFinalShuffling ) { final QueryKit specificToolKit = toolKitMap.get(query.getClass()); @@ -59,7 +60,8 @@ public QueryDefinition makeQueryDefinition( this, resultShuffleSpecFactory, maxWorkerCount, - minStageNumber + minStageNumber, + needsFinalShuffling ); } else { throw new ISE("Unsupported query class [%s]", query.getClass().getName()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index b259022bba5b..7827eced3a88 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -40,6 +40,7 @@ public interface QueryKit> * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. + * @param needsFinalShuffling Determines if a final shuffling is required. */ QueryDefinition makeQueryDefinition( String queryId, @@ -47,6 +48,7 @@ QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, - int minStageNumber + int minStageNumber, + boolean needsFinalShuffling ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index d08d78ef791f..0a04794389e2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -62,7 +62,8 @@ public QueryDefinition makeQueryDefinition( QueryKit> queryKit, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, - int minStageNumber + int minStageNumber, + final boolean destinationNeedsSorting ) { // need to validate query first diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index f02e505d0c5a..d445632e1554 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -73,7 +73,8 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, - final int minStageNumber + final int minStageNumber, + final boolean destinationNeedsSorting ) { validateQuery(originalQuery); @@ -132,12 +133,12 @@ public QueryDefinition makeQueryDefinition( shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); - shuffleSpecFactoryPostAggregation = doLimitOrOffset + shuffleSpecFactoryPostAggregation = doLimitOrOffset && !destinationNeedsSorting ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; partitionBoost = true; } else { - shuffleSpecFactoryPreAggregation = doLimitOrOffset + shuffleSpecFactoryPreAggregation = doLimitOrOffset && !destinationNeedsSorting ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; @@ -185,13 +186,16 @@ public QueryDefinition makeQueryDefinition( ); if (doLimitOrOffset) { + final ShuffleSpec finalShuffleSpec = destinationNeedsSorting ? + shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true) : + null; final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 2) .inputs(new StageInputSpec(firstStageNumber + 1)) .signature(resultSignature) .maxWorkerCount(1) - .shuffleSpec(null) // no shuffling should be required after a limit processor. + .shuffleSpec(finalShuffleSpec) .processorFactory( new OffsetLimitFrameProcessorFactory( limitSpec.getOffset(), @@ -224,12 +228,15 @@ public QueryDefinition makeQueryDefinition( ); if (doLimitOrOffset) { final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); + final ShuffleSpec finalShuffleSpec = destinationNeedsSorting ? + shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true) : + null; queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 2) .inputs(new StageInputSpec(firstStageNumber + 1)) .signature(resultSignature) .maxWorkerCount(1) - .shuffleSpec(null) + .shuffleSpec(finalShuffleSpec) .processorFactory( new OffsetLimitFrameProcessorFactory( limitSpec.getOffset(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 2927264382a4..b844c7d732bd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -89,7 +89,8 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, - final int minStageNumber + final int minStageNumber, + final boolean destinationNeedsSorting ) { final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); @@ -119,7 +120,7 @@ public QueryDefinition makeQueryDefinition( // We ignore the resultShuffleSpecFactory in case: // 1. There is no cluster by // 2. There is an offset which means everything gets funneled into a single partition hence we use MaxCountShuffleSpec - if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset) { + if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset && !destinationNeedsSorting) { shuffleSpec = MixShuffleSpec.instance(); signatureToUse = scanSignature; } else { @@ -180,12 +181,13 @@ public QueryDefinition makeQueryDefinition( ); if (hasLimitOrOffset) { + final ShuffleSpec finalShuffleSpec = destinationNeedsSorting ? shuffleSpec : null; queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) .signature(signatureToUse) .maxWorkerCount(1) - .shuffleSpec(null) // no shuffling should be required after a limit processor. + .shuffleSpec(finalShuffleSpec) .processorFactory( new OffsetLimitFrameProcessorFactory( queryToRun.getScanRowsOffset(), From e0e641c795aa660aaea1905edeb8f58a21b7fd9e Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 24 Jun 2024 15:59:40 +0530 Subject: [PATCH 03/13] Fix tests --- .../msq/querykit/WindowOperatorQueryKit.java | 2 +- .../msq/querykit/groupby/GroupByQueryKit.java | 14 ++-- .../druid/msq/querykit/scan/ScanQueryKit.java | 7 +- .../apache/druid/msq/exec/MSQSelectTest.java | 83 ++++++++++++++----- 4 files changed, 74 insertions(+), 32 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 0a04794389e2..3eee8193c8c2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -63,7 +63,7 @@ public QueryDefinition makeQueryDefinition( ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, int minStageNumber, - final boolean destinationNeedsSorting + boolean needsFinalShuffling ) { // need to validate query first diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index d445632e1554..8f348ed8a030 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -74,7 +74,7 @@ public QueryDefinition makeQueryDefinition( final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, final int minStageNumber, - final boolean destinationNeedsSorting + final boolean needsFinalShuffling ) { validateQuery(originalQuery); @@ -133,12 +133,12 @@ public QueryDefinition makeQueryDefinition( shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); - shuffleSpecFactoryPostAggregation = doLimitOrOffset && !destinationNeedsSorting + shuffleSpecFactoryPostAggregation = doLimitOrOffset && !needsFinalShuffling ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; partitionBoost = true; } else { - shuffleSpecFactoryPreAggregation = doLimitOrOffset && !destinationNeedsSorting + shuffleSpecFactoryPreAggregation = doLimitOrOffset && !needsFinalShuffling ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; @@ -186,8 +186,8 @@ public QueryDefinition makeQueryDefinition( ); if (doLimitOrOffset) { - final ShuffleSpec finalShuffleSpec = destinationNeedsSorting ? - shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true) : + final ShuffleSpec finalShuffleSpec = needsFinalShuffling ? + shuffleSpecFactoryPreAggregation.build(resultClusterBy, true) : null; final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); queryDefBuilder.add( @@ -228,8 +228,8 @@ public QueryDefinition makeQueryDefinition( ); if (doLimitOrOffset) { final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); - final ShuffleSpec finalShuffleSpec = destinationNeedsSorting ? - shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true) : + final ShuffleSpec finalShuffleSpec = needsFinalShuffling ? + shuffleSpecFactoryPreAggregation.build(resultClusterBy, true) : null; queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 2) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index b844c7d732bd..232e0305337f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -90,7 +90,7 @@ public QueryDefinition makeQueryDefinition( final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, final int minStageNumber, - final boolean destinationNeedsSorting + final boolean needsFinalShuffling ) { final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); @@ -120,7 +120,8 @@ public QueryDefinition makeQueryDefinition( // We ignore the resultShuffleSpecFactory in case: // 1. There is no cluster by // 2. There is an offset which means everything gets funneled into a single partition hence we use MaxCountShuffleSpec - if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset && !destinationNeedsSorting) { + // 3. The destination does not require shuffling after the limit stage to get outputs of a specific size. + if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset && !needsFinalShuffling) { shuffleSpec = MixShuffleSpec.instance(); signatureToUse = scanSignature; } else { @@ -181,7 +182,7 @@ public QueryDefinition makeQueryDefinition( ); if (hasLimitOrOffset) { - final ShuffleSpec finalShuffleSpec = destinationNeedsSorting ? shuffleSpec : null; + final ShuffleSpec finalShuffleSpec = needsFinalShuffling ? shuffleSpec : null; queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 84dddd526c1d..8a894fa8a895 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -49,6 +49,7 @@ import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; @@ -624,7 +625,17 @@ public void testSelectWithLimit(String contextName, Map context) .add("dim1", ColumnType.STRING) .build(); - testSelectQuery() + final ImmutableList expectedResults = ImmutableList.of( + new Object[]{1L, ""}, + new Object[]{1L, "10.1"}, + new Object[]{1L, "2"}, + new Object[]{1L, "1"}, + new Object[]{1L, "def"}, + new Object[]{1L, "abc"} + ); + final long fullResultsSize = expectedResults.size(); + + final SelectTester selectTester = testSelectQuery() .setSql("select cnt,dim1 from foo limit 10") .setExpectedMSQSpec( MSQSpec.builder() @@ -646,29 +657,59 @@ public void testSelectWithLimit(String contextName, Map context) ) .setQueryContext(context) .setExpectedRowSignature(resultSignature) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().totalFiles(1), - 0, 0, "input0" - ) - .setExpectedCountersForStageWorkerChannel( + .setExpectedResultRows(expectedResults); + + final QueryContext queryContext = QueryContext.of(context); + if (MSQSelectDestination.DURABLESTORAGE.equals(MultiStageQueryContext.getSelectDestinationOrNull(queryContext))) { + selectTester.setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(fullResultsSize).frames(1), + 0, 0, "output" + ); + if (!context.containsKey(MultiStageQueryContext.CTX_ROWS_PER_PAGE)) { + selectTester.setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(6).frames(1), - 0, 0, "output" - ) - .setExpectedCountersForStageWorkerChannel( + .with().rows(fullResultsSize).frames(1), + 0, 0, "shuffle" + ); + } else { + final long rowsPerPage = MultiStageQueryContext.getRowsPerPage(queryContext); + final int expectedPageCount = (int) (fullResultsSize / rowsPerPage); + long[] rows = new long[expectedPageCount]; + Arrays.fill(rows, rowsPerPage); + long[] frames = new long[expectedPageCount]; + Arrays.fill(frames, 1); + selectTester.setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(6).frames(1), + .with().rows(rows).frames(frames), 0, 0, "shuffle" - ) - .setExpectedResultRows(ImmutableList.of( - new Object[]{1L, ""}, - new Object[]{1L, "10.1"}, - new Object[]{1L, "2"}, - new Object[]{1L, "1"}, - new Object[]{1L, "def"}, - new Object[]{1L, "abc"} - )).verifyResults(); + ); + } + } else { + selectTester.setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(fullResultsSize).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(fullResultsSize).frames(1), + 0, 0, "shuffle" + ) + .setExpectedResultRows(expectedResults); + } + + selectTester.verifyResults(); } @MethodSource("data") From d2208890c5384aed12034baa27d7f16b7bab3229 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 24 Jun 2024 21:52:05 +0530 Subject: [PATCH 04/13] Include ingestion in list of destinations --- .../druid/msq/indexing/MSQControllerTask.java | 2 +- .../apache/druid/msq/exec/MSQFaultsTest.java | 3 +- .../apache/druid/msq/exec/MSQInsertTest.java | 35 ++++++++++++++++++- .../apache/druid/msq/exec/MSQSelectTest.java | 34 +++++++++--------- .../apache/druid/msq/test/MSQTestBase.java | 8 ++--- 5 files changed, 57 insertions(+), 25 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 5f0d1f838886..f4e25fd28968 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -322,7 +322,7 @@ public static boolean isDurableStorageQuery(final MSQSpec querySpec) public static boolean needsFinalShuffling(final MSQSpec querySpec) { - return isDurableStorageQuery(querySpec) || isExport(querySpec); + return isDurableStorageQuery(querySpec) || isExport(querySpec) || isIngestion(querySpec); } /** diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 425609628b3a..5df33582514d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -43,6 +43,7 @@ import org.apache.druid.msq.indexing.error.TooManyPartitionsFault; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestTaskActionClient; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -288,7 +289,7 @@ public void testInsertWithTooManySegments() throws IOException { Map context = ImmutableMap.builder() .putAll(DEFAULT_MSQ_CONTEXT) - .put("rowsPerSegment", 1) + .put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 1) .build(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index ecdc30294dbd..befdb4277236 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; public class MSQInsertTest extends MSQTestBase @@ -1392,7 +1393,7 @@ public void testInsertOffsetThrowsException(String contextName, Map context) + { + Map queryContext = ImmutableMap.builder() + .putAll(context) + .put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 2) + .build(); + + List expectedRows = expectedFooRows().stream().limit(4).collect(Collectors.toList()); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + testIngestQuery().setSql( + "insert into foo1 select __time, dim1, cnt from foo LIMIT 4 PARTITIONED by ALL") + .setExpectedDataSource("foo1") + .setQueryContext(queryContext) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0), SegmentId.of("foo1", Intervals.ETERNITY, "test", 1))) + .setExpectedResultRows(expectedRows) + .setExpectedMSQSegmentReport( + new MSQSegmentReport( + NumberedShardSpec.class.getSimpleName(), + "Using NumberedShardSpec to generate segments since the query is inserting rows." + ) + ) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testCorrectNumberOfWorkersUsedAutoModeWithoutBytesLimit(String contextName, Map context) throws IOException diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 8a894fa8a895..257941f48f07 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -669,8 +669,8 @@ public void testSelectWithLimit(String contextName, Map context) .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher .with().rows(fullResultsSize).frames(1), - 0, 0, "output" - ); + 0, 0, "output"); + if (!context.containsKey(MultiStageQueryContext.CTX_ROWS_PER_PAGE)) { selectTester.setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher @@ -692,21 +692,21 @@ public void testSelectWithLimit(String contextName, Map context) } } else { selectTester.setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().totalFiles(1), - 0, 0, "input0" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(fullResultsSize).frames(1), - 0, 0, "output" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(fullResultsSize).frames(1), - 0, 0, "shuffle" - ) - .setExpectedResultRows(expectedResults); + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(fullResultsSize).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(fullResultsSize).frames(1), + 0, 0, "shuffle" + ) + .setExpectedResultRows(expectedResults); } selectTester.verifyResults(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 33c1374d2a73..b25af755757c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1285,12 +1285,10 @@ public void verifyResults() .filter(segmentId -> segmentId.getInterval() .contains((Long) row[0])) .collect(Collectors.toList()); - if (diskSegmentList.size() != 1) { - throw new IllegalStateException("Single key in multiple partitions"); - } - SegmentId diskSegment = diskSegmentList.get(0); // Checking if the row belongs to the correct segment interval - Assert.assertTrue(segmentIdVsOutputRowsMap.get(diskSegment).contains(Arrays.asList(row))); + Assert.assertTrue(diskSegmentList.stream() + .map(segmentIdVsOutputRowsMap::get) + .anyMatch(rows -> rows.contains(Arrays.asList(row)))); } } From 3bb783ff9ea1f2bb04530a945c7dd372e13338cb Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 25 Jun 2024 09:02:18 +0530 Subject: [PATCH 05/13] Fix flaky test --- .../java/org/apache/druid/msq/exec/MSQInsertTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index befdb4277236..005c0bd11d28 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -1411,7 +1411,12 @@ public void testInsertOnFoo1WithLimit(String contextName, Map co .put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 2) .build(); - List expectedRows = expectedFooRows().stream().limit(4).collect(Collectors.toList()); + List expectedRows = ImmutableList.of( + new Object[]{946684800000L, "", 1L}, + new Object[]{978307200000L, "1", 1L}, + new Object[]{946771200000L, "10.1", 1L}, + new Object[]{946857600000L, "2", 1L} + ); RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -1419,7 +1424,7 @@ public void testInsertOnFoo1WithLimit(String contextName, Map co .add("cnt", ColumnType.LONG).build(); testIngestQuery().setSql( - "insert into foo1 select __time, dim1, cnt from foo LIMIT 4 PARTITIONED by ALL") + "insert into foo1 select __time, dim1, cnt from foo LIMIT 4 PARTITIONED by ALL CLUSTERED BY dim1") .setExpectedDataSource("foo1") .setQueryContext(queryContext) .setExpectedRowSignature(rowSignature) From 809f90afd78afee68a57047610dc4569532d4e7f Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 25 Jun 2024 09:14:10 +0530 Subject: [PATCH 06/13] Fix checkstyle --- .../src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 005c0bd11d28..7f7f224549b3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -65,7 +65,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.stream.Collectors; public class MSQInsertTest extends MSQTestBase From f092860ffeb4fa5ca9dbbda549a07885ebaf1e95 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 25 Jun 2024 14:49:40 +0530 Subject: [PATCH 07/13] Fix flaky test --- .../org/apache/druid/msq/exec/MSQInsertTest.java | 8 ++++---- .../java/org/apache/druid/msq/test/MSQTestBase.java | 13 ++++++++++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 7f7f224549b3..be8e6000b4aa 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -1411,10 +1411,10 @@ public void testInsertOnFoo1WithLimit(String contextName, Map co .build(); List expectedRows = ImmutableList.of( - new Object[]{946684800000L, "", 1L}, - new Object[]{978307200000L, "1", 1L}, new Object[]{946771200000L, "10.1", 1L}, - new Object[]{946857600000L, "2", 1L} + new Object[]{978307200000L, "1", 1L}, + new Object[]{946857600000L, "2", 1L}, + new Object[]{978480000000L, "abc", 1L} ); RowSignature rowSignature = RowSignature.builder() @@ -1423,7 +1423,7 @@ public void testInsertOnFoo1WithLimit(String contextName, Map co .add("cnt", ColumnType.LONG).build(); testIngestQuery().setSql( - "insert into foo1 select __time, dim1, cnt from foo LIMIT 4 PARTITIONED by ALL CLUSTERED BY dim1") + "insert into foo1 select __time, dim1, cnt from foo where dim1 != '' limit 4 partitioned by ALL clustered by dim1") .setExpectedDataSource("foo1") .setQueryContext(queryContext) .setExpectedRowSignature(rowSignature) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index b25af755757c..34c1fd5cb699 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1284,11 +1284,18 @@ public void verifyResults() .stream() .filter(segmentId -> segmentId.getInterval() .contains((Long) row[0])) + .filter(segmentId -> { + List> lists = segmentIdVsOutputRowsMap.get(segmentId); + return lists.contains(Arrays.asList(row)); + }) .collect(Collectors.toList()); // Checking if the row belongs to the correct segment interval - Assert.assertTrue(diskSegmentList.stream() - .map(segmentIdVsOutputRowsMap::get) - .anyMatch(rows -> rows.contains(Arrays.asList(row)))); + if (diskSegmentList.size() != 1) { + throw new IllegalStateException("Single key in multiple partitions"); + } + SegmentId diskSegment = diskSegmentList.get(0); + // Checking if the row belongs to the correct segment interval + Assert.assertTrue(segmentIdVsOutputRowsMap.get(diskSegment).contains(Arrays.asList(row))); } } From defb0c79a81ed8ffff3b476578a1ddace7902629 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 25 Jun 2024 14:50:43 +0530 Subject: [PATCH 08/13] Remove duplicate comment --- .../src/test/java/org/apache/druid/msq/test/MSQTestBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 34c1fd5cb699..71fe4485af4e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1289,7 +1289,6 @@ public void verifyResults() return lists.contains(Arrays.asList(row)); }) .collect(Collectors.toList()); - // Checking if the row belongs to the correct segment interval if (diskSegmentList.size() != 1) { throw new IllegalStateException("Single key in multiple partitions"); } From 5be62a48e77aa1c589fcc1e377ae3549d4a1fa0b Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 26 Jun 2024 16:10:19 +0530 Subject: [PATCH 09/13] Address review comments --- .../druid/msq/indexing/MSQControllerTask.java | 13 ++++++ .../apache/druid/msq/querykit/QueryKit.java | 4 +- .../apache/druid/msq/exec/MSQInsertTest.java | 2 +- .../apache/druid/msq/exec/MSQSelectTest.java | 46 +++++++++++++++++++ 4 files changed, 63 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index f4e25fd28968..6b26b7f1111c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -305,21 +305,34 @@ public Optional getDestinationResource() return querySpec.getDestination().getDestinationResource(); } + /** + * Checks whether the task is an ingestion into a Druid datasource. + */ public static boolean isIngestion(final MSQSpec querySpec) { return querySpec.getDestination() instanceof DataSourceMSQDestination; } + /** + * Checks whether the task is an export into external files. + */ public static boolean isExport(final MSQSpec querySpec) { return querySpec.getDestination() instanceof ExportMSQDestination; } + /** + * Checks whether the task is an async query which writes frame files containing the final results into durable storage. + */ public static boolean isDurableStorageQuery(final MSQSpec querySpec) { return querySpec.getDestination() instanceof DurableStorageMSQDestination; } + /** + * Returns true if the destination of the {@link #querySpec} requires a specific final shuffling, such as applying a + * rowsPerSegment or rowsPerPage limit. + */ public static boolean needsFinalShuffling(final MSQSpec querySpec) { return isDurableStorageQuery(querySpec) || isExport(querySpec) || isIngestion(querySpec); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index 7827eced3a88..7056f50c41cf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -40,7 +40,9 @@ public interface QueryKit> * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. - * @param needsFinalShuffling Determines if a final shuffling is required. + * @param needsFinalShuffling Determines if a final shuffling is required. If this is true, the + * query kit should ensure that the resultShuffleSpecFactory passed to it is used + * for the final shuffling. */ QueryDefinition makeQueryDefinition( String queryId, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index be8e6000b4aa..f0b6e71a0217 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -1423,7 +1423,7 @@ public void testInsertOnFoo1WithLimit(String contextName, Map co .add("cnt", ColumnType.LONG).build(); testIngestQuery().setSql( - "insert into foo1 select __time, dim1, cnt from foo where dim1 != '' limit 4 partitioned by ALL clustered by dim1") + "insert into foo1 select __time, dim1, cnt from foo where dim1 != '' limit 4 partitioned by all clustered by dim1") .setExpectedDataSource("foo1") .setQueryContext(queryContext) .setExpectedRowSignature(rowSignature) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 257941f48f07..c5b154ca4881 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -1740,6 +1740,52 @@ public void testScanWithMultiValueSelectQuery(String contextName, Map context) + { + RowSignature expectedResultSignature = RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .build(); + + GroupByQuery query = GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0"))) + .setAggregatorSpecs( + aggregators( + new CountAggregatorFactory( + "a0" + ) + ) + ) + .setDimFilter(not(equality("dim1", "", ColumnType.STRING))) + .setLimit(1) + .setContext(context) + .build(); + + testSelectQuery() + .setSql("SELECT dim1, cnt FROM (SELECT dim1, COUNT(*) AS cnt FROM foo GROUP BY dim1 HAVING dim1 != '' LIMIT 1) LIMIT 20") + .setExpectedMSQSpec(MSQSpec.builder() + .query(query) + .columnMappings(new ColumnMappings(ImmutableList.of( + new ColumnMapping("d0", "dim1"), + new ColumnMapping("a0", "cnt") + ))) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination(contextName, context) + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build()) + .setExpectedRowSignature(expectedResultSignature) + .setQueryContext(context) + .setExpectedResultRows(ImmutableList.of( + new Object[]{"1", 1L} + )).verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testHavingOnApproximateCountDistinct(String contextName, Map context) From 604113caa7e7491564f11a24183d3cb9bf79f8f3 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 28 Jun 2024 16:11:08 +0530 Subject: [PATCH 10/13] Refactor --- .../apache/druid/msq/exec/ControllerImpl.java | 9 +- .../druid/msq/indexing/MSQControllerTask.java | 15 +- .../druid/msq/querykit/DataSourcePlan.java | 3 +- .../druid/msq/querykit/MultiQueryKit.java | 6 +- .../apache/druid/msq/querykit/QueryKit.java | 6 +- .../msq/querykit/WindowOperatorQueryKit.java | 3 +- .../msq/querykit/groupby/GroupByQueryKit.java | 15 +- .../druid/msq/querykit/scan/ScanQueryKit.java | 104 ++++----- .../apache/druid/msq/exec/MSQInsertTest.java | 3 +- .../apache/druid/msq/exec/MSQReplaceTest.java | 45 ++++ .../apache/druid/msq/exec/MSQSelectTest.java | 201 +++++++++++++----- 11 files changed, 259 insertions(+), 151 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 9f5cae248418..c9970e6dff27 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -101,9 +101,7 @@ import org.apache.druid.msq.indexing.WorkerCount; import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; -import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; import org.apache.druid.msq.indexing.destination.ExportMSQDestination; -import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.FaultsExceededChecker; @@ -1769,8 +1767,7 @@ private static QueryDefinition makeQueryDefinition( toolKit, shuffleSpecFactory, tuningConfig.getMaxNumWorkers(), - 0, - MSQControllerTask.needsFinalShuffling(querySpec) + 0 ); } catch (MSQException e) { @@ -1829,9 +1826,9 @@ private static QueryDefinition makeQueryDefinition( ); return builder.build(); - } else if (querySpec.getDestination() instanceof TaskReportMSQDestination) { + } else if (MSQControllerTask.writeResultsToTaskReport(querySpec)) { return queryDef; - } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) { + } else if (MSQControllerTask.writeResultsToDurableStorage(querySpec)) { // attaching new query results stage if the final stage does sort during shuffle so that results are ordered. StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 6b26b7f1111c..8eb944f32e33 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -52,6 +52,7 @@ import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; import org.apache.druid.msq.indexing.destination.ExportMSQDestination; import org.apache.druid.msq.indexing.destination.MSQDestination; +import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.ServiceClientFactory; @@ -324,18 +325,17 @@ public static boolean isExport(final MSQSpec querySpec) /** * Checks whether the task is an async query which writes frame files containing the final results into durable storage. */ - public static boolean isDurableStorageQuery(final MSQSpec querySpec) + public static boolean writeResultsToDurableStorage(final MSQSpec querySpec) { return querySpec.getDestination() instanceof DurableStorageMSQDestination; } /** - * Returns true if the destination of the {@link #querySpec} requires a specific final shuffling, such as applying a - * rowsPerSegment or rowsPerPage limit. + * Checks whether the task is an async query which writes frame files containing the final results into durable storage. */ - public static boolean needsFinalShuffling(final MSQSpec querySpec) + public static boolean writeResultsToTaskReport(final MSQSpec querySpec) { - return isDurableStorageQuery(querySpec) || isExport(querySpec) || isIngestion(querySpec); + return querySpec.getDestination() instanceof TaskReportMSQDestination; } /** @@ -353,11 +353,6 @@ public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec) } } - public static boolean writeResultsToDurableStorage(final MSQSpec querySpec) - { - return querySpec.getDestination() instanceof DurableStorageMSQDestination; - } - @Override public LookupLoadingSpec getLookupLoadingSpec() { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 93977ee960fe..831c9b139d3d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -442,8 +442,7 @@ private static DataSourcePlan forQuery( queryKit, ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount), maxWorkerCount, - minStageNumber, - false + minStageNumber ); final int stageNumber = subQueryDef.getFinalStageDefinition().getStageNumber(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java index 23c48432af18..a795f6496053 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java @@ -46,8 +46,7 @@ public QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, - int minStageNumber, - boolean needsFinalShuffling + int minStageNumber ) { final QueryKit specificToolKit = toolKitMap.get(query.getClass()); @@ -60,8 +59,7 @@ public QueryDefinition makeQueryDefinition( this, resultShuffleSpecFactory, maxWorkerCount, - minStageNumber, - needsFinalShuffling + minStageNumber ); } else { throw new ISE("Unsupported query class [%s]", query.getClass().getName()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index 7056f50c41cf..b259022bba5b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -40,9 +40,6 @@ public interface QueryKit> * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. - * @param needsFinalShuffling Determines if a final shuffling is required. If this is true, the - * query kit should ensure that the resultShuffleSpecFactory passed to it is used - * for the final shuffling. */ QueryDefinition makeQueryDefinition( String queryId, @@ -50,7 +47,6 @@ QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, - int minStageNumber, - boolean needsFinalShuffling + int minStageNumber ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 3eee8193c8c2..d08d78ef791f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -62,8 +62,7 @@ public QueryDefinition makeQueryDefinition( QueryKit> queryKit, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, - int minStageNumber, - boolean needsFinalShuffling + int minStageNumber ) { // need to validate query first diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 8f348ed8a030..eb9953402bad 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -73,8 +73,7 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, - final int minStageNumber, - final boolean needsFinalShuffling + final int minStageNumber ) { validateQuery(originalQuery); @@ -133,12 +132,12 @@ public QueryDefinition makeQueryDefinition( shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); - shuffleSpecFactoryPostAggregation = doLimitOrOffset && !needsFinalShuffling + shuffleSpecFactoryPostAggregation = doLimitOrOffset ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; partitionBoost = true; } else { - shuffleSpecFactoryPreAggregation = doLimitOrOffset && !needsFinalShuffling + shuffleSpecFactoryPreAggregation = doLimitOrOffset ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; @@ -186,9 +185,7 @@ public QueryDefinition makeQueryDefinition( ); if (doLimitOrOffset) { - final ShuffleSpec finalShuffleSpec = needsFinalShuffling ? - shuffleSpecFactoryPreAggregation.build(resultClusterBy, true) : - null; + final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false); final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 2) @@ -228,9 +225,7 @@ public QueryDefinition makeQueryDefinition( ); if (doLimitOrOffset) { final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); - final ShuffleSpec finalShuffleSpec = needsFinalShuffling ? - shuffleSpecFactoryPreAggregation.build(resultClusterBy, true) : - null; + final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false); queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 2) .inputs(new StageInputSpec(firstStageNumber + 1)) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 232e0305337f..1ab722845750 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -26,7 +26,6 @@ import org.apache.druid.frame.key.KeyOrder; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.msq.input.stage.StageInputSpec; -import org.apache.druid.msq.kernel.MixShuffleSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.ShuffleSpec; @@ -34,6 +33,7 @@ import org.apache.druid.msq.querykit.DataSourcePlan; import org.apache.druid.msq.querykit.QueryKit; import org.apache.druid.msq.querykit.QueryKitUtils; +import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -89,8 +89,7 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, - final int minStageNumber, - final boolean needsFinalShuffling + final int minStageNumber ) { final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); @@ -112,83 +111,74 @@ public QueryDefinition makeQueryDefinition( final ScanQuery queryToRun = originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); final RowSignature scanSignature = getAndValidateSignature(queryToRun, jsonMapper); - final ShuffleSpec shuffleSpec; - final RowSignature signatureToUse; final boolean hasLimitOrOffset = queryToRun.isLimited() || queryToRun.getScanRowsOffset() > 0; + final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature); + final Granularity segmentGranularity = + QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); + final List clusterByColumns = new ArrayList<>(); + + // Add regular orderBys. + for (final ScanQuery.OrderBy orderBy : queryToRun.getOrderBys()) { + clusterByColumns.add( + new KeyColumn( + orderBy.getColumnName(), + orderBy.getOrder() == ScanQuery.Order.DESCENDING ? KeyOrder.DESCENDING : KeyOrder.ASCENDING + ) + ); + } - // We ignore the resultShuffleSpecFactory in case: - // 1. There is no cluster by - // 2. There is an offset which means everything gets funneled into a single partition hence we use MaxCountShuffleSpec - // 3. The destination does not require shuffling after the limit stage to get outputs of a specific size. - if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset && !needsFinalShuffling) { - shuffleSpec = MixShuffleSpec.instance(); - signatureToUse = scanSignature; - } else { - final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature); - final Granularity segmentGranularity = - QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); - final List clusterByColumns = new ArrayList<>(); - - // Add regular orderBys. - for (final ScanQuery.OrderBy orderBy : queryToRun.getOrderBys()) { - clusterByColumns.add( - new KeyColumn( - orderBy.getColumnName(), - orderBy.getOrder() == ScanQuery.Order.DESCENDING ? KeyOrder.DESCENDING : KeyOrder.ASCENDING - ) - ); - } - - // Update partition by of next window - final RowSignature signatureSoFar = signatureBuilder.build(); - boolean addShuffle = true; - if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { - final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext() - .get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); - for (KeyColumn c : windowClusterBy.getColumns()) { - if (!signatureSoFar.contains(c.columnName())) { - addShuffle = false; - break; - } - } - if (addShuffle) { - clusterByColumns.addAll(windowClusterBy.getColumns()); + // Update partition by of next window + final RowSignature signatureSoFar = signatureBuilder.build(); + boolean addShuffle = true; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext() + .get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + for (KeyColumn c : windowClusterBy.getColumns()) { + if (!signatureSoFar.contains(c.columnName())) { + addShuffle = false; + break; } - } else { - // Add partition boosting column. - clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); - signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); } + if (addShuffle) { + clusterByColumns.addAll(windowClusterBy.getColumns()); + } + } else { + // Add partition boosting column. + clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); + signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); + } - final ClusterBy clusterBy = - QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); - shuffleSpec = resultShuffleSpecFactory.build(clusterBy, false); - signatureToUse = QueryKitUtils.sortableSignature( - QueryKitUtils.signatureWithSegmentGranularity(signatureBuilder.build(), segmentGranularity), - clusterBy.getColumns() - ); - } + final ClusterBy clusterBy = + QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); + final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(clusterBy, false); + + final RowSignature signatureToUse = QueryKitUtils.sortableSignature( + QueryKitUtils.signatureWithSegmentGranularity(signatureBuilder.build(), segmentGranularity), + clusterBy.getColumns() + ); + // If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected. + // If there is a limit spec, it would write everything into one partition anyway, so the final shuffling should be + // applied after that. queryDefBuilder.add( StageDefinition.builder(Math.max(minStageNumber, queryDefBuilder.getNextStageNumber())) .inputs(dataSourcePlan.getInputSpecs()) .broadcastInputs(dataSourcePlan.getBroadcastInputs()) - .shuffleSpec(shuffleSpec) + .shuffleSpec(hasLimitOrOffset ? ShuffleSpecFactories.singlePartition().build(clusterBy, false) : finalShuffleSpec) .signature(signatureToUse) .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) .processorFactory(new ScanQueryFrameProcessorFactory(queryToRun)) ); if (hasLimitOrOffset) { - final ShuffleSpec finalShuffleSpec = needsFinalShuffling ? shuffleSpec : null; queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) .signature(signatureToUse) .maxWorkerCount(1) - .shuffleSpec(finalShuffleSpec) + .shuffleSpec(finalShuffleSpec) // Apply the final shuffling after limit spec. .processorFactory( new OffsetLimitFrameProcessorFactory( queryToRun.getScanRowsOffset(), diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index af17383337dd..098b143b2772 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -1483,7 +1483,8 @@ public void testInsertOnFoo1WithLimit(String contextName, Map co RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG).build(); + .add("cnt", ColumnType.LONG) + .build(); testIngestQuery().setSql( "insert into foo1 select __time, dim1, cnt from foo where dim1 != '' limit 4 partitioned by all clustered by dim1") diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 7d7f4e310c62..66c86b0d7b15 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -906,6 +906,51 @@ public void testReplaceTimeChunks(String contextName, Map contex .verifyResults(); } + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testReplaceOnFoo1WithLimit(String contextName, Map context) + { + Map queryContext = ImmutableMap.builder() + .putAll(context) + .put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 2) + .build(); + + List expectedRows = ImmutableList.of( + new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null,}, + new Object[]{978307200000L, "1"}, + new Object[]{946771200000L, "10.1"}, + new Object[]{946857600000L, "2"} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .build(); + + testIngestQuery().setSql( + "REPLACE INTO \"foo1\" OVERWRITE ALL\n" + + "SELECT\n" + + " \"__time\",\n" + + " \"dim1\"\n" + + "FROM foo\n" + + "LIMIT 4\n" + + "PARTITIONED BY ALL\n" + + "CLUSTERED BY dim1") + .setExpectedDataSource("foo1") + .setQueryContext(queryContext) + .setExpectedRowSignature(rowSignature) + .setExpectedShardSpec(DimensionRangeShardSpec.class) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0), SegmentId.of("foo1", Intervals.ETERNITY, "test", 1))) + .setExpectedResultRows(expectedRows) + .setExpectedMSQSegmentReport( + new MSQSegmentReport( + DimensionRangeShardSpec.class.getSimpleName(), + "Using RangeShardSpec to generate segments." + ) + ) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testReplaceTimeChunksLargerThanData(String contextName, Map context) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index c5b154ca4881..2d14e7434974 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.systemfield.SystemFields; @@ -49,7 +50,6 @@ import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; @@ -633,10 +633,9 @@ public void testSelectWithLimit(String contextName, Map context) new Object[]{1L, "def"}, new Object[]{1L, "abc"} ); - final long fullResultsSize = expectedResults.size(); - final SelectTester selectTester = testSelectQuery() - .setSql("select cnt,dim1 from foo limit 10") + testSelectQuery() + .setSql("select cnt, dim1 from foo limit 10") .setExpectedMSQSpec( MSQSpec.builder() .query( @@ -657,59 +656,39 @@ public void testSelectWithLimit(String contextName, Map context) ) .setQueryContext(context) .setExpectedRowSignature(resultSignature) - .setExpectedResultRows(expectedResults); - - final QueryContext queryContext = QueryContext.of(context); - if (MSQSelectDestination.DURABLESTORAGE.equals(MultiStageQueryContext.getSelectDestinationOrNull(queryContext))) { - selectTester.setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().totalFiles(1), - 0, 0, "input0" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(fullResultsSize).frames(1), - 0, 0, "output"); - - if (!context.containsKey(MultiStageQueryContext.CTX_ROWS_PER_PAGE)) { - selectTester.setExpectedCountersForStageWorkerChannel( + .setExpectedResultRows(expectedResults) + .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(fullResultsSize).frames(1), - 0, 0, "shuffle" - ); - } else { - final long rowsPerPage = MultiStageQueryContext.getRowsPerPage(queryContext); - final int expectedPageCount = (int) (fullResultsSize / rowsPerPage); - long[] rows = new long[expectedPageCount]; - Arrays.fill(rows, rowsPerPage); - long[] frames = new long[expectedPageCount]; - Arrays.fill(frames, 1); - selectTester.setExpectedCountersForStageWorkerChannel( + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( CounterSnapshotMatcher - .with().rows(rows).frames(frames), + .with().rows(6), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6), 0, 0, "shuffle" - ); - } - } else { - selectTester.setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().totalFiles(1), - 0, 0, "input0" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(fullResultsSize).frames(1), - 0, 0, "output" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher - .with().rows(fullResultsSize).frames(1), - 0, 0, "shuffle" - ) - .setExpectedResultRows(expectedResults); - } - - selectTester.verifyResults(); + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6), + 1, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(!context.containsKey(MultiStageQueryContext.CTX_ROWS_PER_PAGE) ? new long[] {6} : new long[] {2, 2, 2}), + 1, 0, "shuffle" + ) + .setExpectedResultRows(expectedResults) + .verifyResults(); } @MethodSource("data") @@ -1786,6 +1765,120 @@ public void testGroupByWithLimit(String contextName, Map context )).verifyResults(); } + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testGroupByWithLimitAndOrdering(String contextName, Map context) + { + RowSignature rowSignature = RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("count", ColumnType.LONG) + .build(); + + GroupByQuery query = GroupByQuery.builder() + .setDataSource( + new ExternalDataSource( + new InlineInputSource("dim1\nabc\nxyz\ndef\nxyz\nabc\nxyz\nabc\nxyz\ndef\nbbb\naaa"), + new CsvInputFormat(null, null, null, true, 0), + RowSignature.builder().add("dim1", ColumnType.STRING).build() + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .addOrderByColumn(new OrderByColumnSpec("a0", OrderByColumnSpec.Direction.DESCENDING, StringComparators.NUMERIC)) + .addOrderByColumn(new OrderByColumnSpec("d0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.LEXICOGRAPHIC)) + .setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0"))) + .setAggregatorSpecs( + aggregators( + new CountAggregatorFactory( + "a0" + ) + ) + ) + .setLimit(4) + .setContext(context) + .build(); + + List expectedRows = ImmutableList.of( + new Object[]{"xyz", 4L}, + new Object[]{"abc", 3L}, + new Object[]{"def", 2L}, + new Object[]{"aaa", 1L} + ); + + testSelectQuery() + .setSql("WITH \"ext\" AS (\n" + + " SELECT *\n" + + " FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"inline\",\"data\":\"dim1\\nabc\\nxyz\\ndef\\nxyz\\nabc\\nxyz\\nabc\\nxyz\\ndef\\nbbb\\naaa\"}',\n" + + " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n" + + " )\n" + + " ) EXTEND (\"dim1\" VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " \"dim1\",\n" + + " COUNT(*) AS \"count\"\n" + + "FROM \"ext\"\n" + + "GROUP BY 1\n" + + "ORDER BY 2 DESC, 1\n" + + "LIMIT 4\n") + .setExpectedMSQSpec(MSQSpec.builder() + .query(query) + .columnMappings(new ColumnMappings(ImmutableList.of( + new ColumnMapping("d0", "dim1"), + new ColumnMapping("a0", "count") + ))) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination(contextName, context) + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build()) + .setExpectedRowSignature(rowSignature) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(5), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(5), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(5), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(5), + 1, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(5), + 2, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(4), + 2, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(!context.containsKey(MultiStageQueryContext.CTX_ROWS_PER_PAGE) ? new long[] {4} : new long[] {2, 2}), + 2, 0, "shuffle" + ) + .setQueryContext(context) + .setExpectedResultRows(expectedRows) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testHavingOnApproximateCountDistinct(String contextName, Map context) From c7ddafceb3115d2f431fb4edc376e010773f5d38 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 28 Jun 2024 16:34:40 +0530 Subject: [PATCH 11/13] Refactor --- .../src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 66c86b0d7b15..2e05d4479103 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -916,7 +916,7 @@ public void testReplaceOnFoo1WithLimit(String contextName, Map c .build(); List expectedRows = ImmutableList.of( - new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null,}, + new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null}, new Object[]{978307200000L, "1"}, new Object[]{946771200000L, "10.1"}, new Object[]{946857600000L, "2"} From 3ebae3a17ec0aba0ec6cf404366e153447277c91 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 5 Jul 2024 09:18:01 +0530 Subject: [PATCH 12/13] Rename functions --- .../java/org/apache/druid/msq/exec/ControllerImpl.java | 8 ++++---- .../org/apache/druid/msq/indexing/MSQControllerTask.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index c9970e6dff27..3ca6d5780de1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1826,9 +1826,9 @@ private static QueryDefinition makeQueryDefinition( ); return builder.build(); - } else if (MSQControllerTask.writeResultsToTaskReport(querySpec)) { + } else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) { return queryDef; - } else if (MSQControllerTask.writeResultsToDurableStorage(querySpec)) { + } else if (MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) { // attaching new query results stage if the final stage does sort during shuffle so that results are ordered. StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); @@ -2931,12 +2931,12 @@ private void startQueryResultsReader() final InputChannelFactory inputChannelFactory; - if (queryKernelConfig.isDurableStorage() || MSQControllerTask.writeResultsToDurableStorage(querySpec)) { + if (queryKernelConfig.isDurableStorage() || MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) { inputChannelFactory = DurableStorageInputChannelFactory.createStandardImplementation( queryId(), MSQTasks.makeStorageConnector(context.injector()), closer, - MSQControllerTask.writeResultsToDurableStorage(querySpec) + MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec) ); } else { inputChannelFactory = new WorkerInputChannelFactory(netClient, () -> taskIds); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 8eb944f32e33..b9c8ebe3b806 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -325,7 +325,7 @@ public static boolean isExport(final MSQSpec querySpec) /** * Checks whether the task is an async query which writes frame files containing the final results into durable storage. */ - public static boolean writeResultsToDurableStorage(final MSQSpec querySpec) + public static boolean writeFinalStageResultsToDurableStorage(final MSQSpec querySpec) { return querySpec.getDestination() instanceof DurableStorageMSQDestination; } @@ -333,7 +333,7 @@ public static boolean writeResultsToDurableStorage(final MSQSpec querySpec) /** * Checks whether the task is an async query which writes frame files containing the final results into durable storage. */ - public static boolean writeResultsToTaskReport(final MSQSpec querySpec) + public static boolean writeFinalResultsToTaskReport(final MSQSpec querySpec) { return querySpec.getDestination() instanceof TaskReportMSQDestination; } From 8db7a05efdd06384af58c532ca1ff494647d4627 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 5 Jul 2024 22:04:45 +0530 Subject: [PATCH 13/13] Remove unneeded shuffling --- .../druid/msq/querykit/scan/ScanQueryKit.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 1ab722845750..48a17a9e84e2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -26,6 +26,7 @@ import org.apache.druid.frame.key.KeyOrder; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.MixShuffleSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.ShuffleSpec; @@ -149,7 +150,6 @@ public QueryDefinition makeQueryDefinition( signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); } - final ClusterBy clusterBy = QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(clusterBy, false); @@ -159,14 +159,30 @@ public QueryDefinition makeQueryDefinition( clusterBy.getColumns() ); - // If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected. - // If there is a limit spec, it would write everything into one partition anyway, so the final shuffling should be - // applied after that. + ShuffleSpec scanShuffleSpec; + if (!hasLimitOrOffset) { + // If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected. + scanShuffleSpec = finalShuffleSpec; + } else { + // If there is a limit spec, check if there are any non-boost columns to sort in. + boolean requiresSort = clusterByColumns.stream() + .anyMatch(keyColumn -> !QueryKitUtils.PARTITION_BOOST_COLUMN.equals(keyColumn.columnName())); + if (requiresSort) { + // If yes, do a sort into a single partition. + scanShuffleSpec = ShuffleSpecFactories.singlePartition().build(clusterBy, false); + } else { + // If the only clusterBy column is the boost column, we just use a mix shuffle to avoid unused shuffling. + // Note that we still need the boost column to be present in the row signature, since the limit stage would + // need it to be populated to do its own shuffling later. + scanShuffleSpec = MixShuffleSpec.instance(); + } + } + queryDefBuilder.add( StageDefinition.builder(Math.max(minStageNumber, queryDefBuilder.getNextStageNumber())) .inputs(dataSourcePlan.getInputSpecs()) .broadcastInputs(dataSourcePlan.getBroadcastInputs()) - .shuffleSpec(hasLimitOrOffset ? ShuffleSpecFactories.singlePartition().build(clusterBy, false) : finalShuffleSpec) + .shuffleSpec(scanShuffleSpec) .signature(signatureToUse) .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) .processorFactory(new ScanQueryFrameProcessorFactory(queryToRun))