diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java index 57067e50a50b..81f45f237b4f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java @@ -23,6 +23,7 @@ import com.google.common.io.BaseEncoding; import com.google.common.primitives.Chars; import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexLiteral; @@ -378,4 +379,37 @@ public static String makePrefixedName(final String prefix, final String suffix) { return StringUtils.format("%s:%s", prefix, suffix); } + + public static int getInt(RexNode rex, int defaultValue) + { + return rex == null ? defaultValue : RexLiteral.intValue(rex); + } + + public static int getOffset(Sort sort) + { + return Calcites.getInt(sort.offset, 0); + } + + public static int getFetch(Sort sort) + { + return Calcites.getInt(sort.fetch, -1); + } + + public static int collapseFetch(int innerFetch, int outerFetch, int outerOffset) + { + final int fetch; + if (innerFetch < 0 && outerFetch < 0) { + // Neither has a limit => no limit overall. + fetch = -1; + } else if (innerFetch < 0) { + // Outer limit only. + fetch = outerFetch; + } else if (outerFetch < 0) { + // Inner limit only. + fetch = Math.max(0, innerFetch - outerOffset); + } else { + fetch = Math.max(0, Math.min(innerFetch - outerOffset, outerFetch)); + } + return fetch; + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 9349286b4696..54a9ad807794 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -19,11 +19,11 @@ package org.apache.druid.sql.calcite.planner; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.primitives.Ints; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.interpreter.BindableConvention; @@ -35,6 +35,7 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; @@ -139,31 +140,21 @@ private PlannerResult planWithDruidConvention( if (explain != null) { return planExplanation(druidRel, explain, dataSourceNames); } else { - final Supplier> resultsSupplier = new Supplier>() - { - @Override - public Sequence get() - { - if (root.isRefTrivial()) { - return druidRel.runQuery(); - } else { - // Add a mapping on top to accommodate root.fields. - return Sequences.map( - druidRel.runQuery(), - new Function() - { - @Override - public Object[] apply(final Object[] input) - { - final Object[] retVal = new Object[root.fields.size()]; - for (int i = 0; i < root.fields.size(); i++) { - retVal[i] = input[root.fields.get(i).getKey()]; - } - return retVal; - } + final Supplier> resultsSupplier = () -> { + if (root.isRefTrivial()) { + return druidRel.runQuery(); + } else { + // Add a mapping on top to accommodate root.fields. + return Sequences.map( + druidRel.runQuery(), + input -> { + final Object[] retVal = new Object[root.fields.size()]; + for (int i = 0; i < root.fields.size(); i++) { + retVal[i] = input[root.fields.get(i).getKey()]; } - ); - } + return retVal; + } + ); } }; @@ -212,9 +203,9 @@ private PlannerResult planWithBindableConvention( new BaseSequence.IteratorMaker>() { @Override - public EnumeratorIterator make() + public EnumeratorIterator make() { - return new EnumeratorIterator(new Iterator() + return new EnumeratorIterator<>(new Iterator() { @Override public boolean hasNext() @@ -236,16 +227,19 @@ public void cleanup(EnumeratorIterator iterFromMake) } } - ), () -> enumerator.close()); + ), enumerator::close); }; return new PlannerResult(resultsSupplier, root.validatedRowType, ImmutableSet.of()); } } /** - * This method wraps the root with a logical sort that applies a limit (no ordering change). - * The CTX_SQL_OUTER_LIMIT flag that controls this wrapping is meant for internal use only by the - * web console, allowing it to apply a limit to queries without rewriting the original SQL. + * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel + * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in + * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}. + * + * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by + * the web console, allowing it to apply a limit to queries without rewriting the original SQL. * * @param root root node * @return root node wrapped with a limiting logical sort if a limit is specified in the query context. @@ -261,6 +255,23 @@ private RelNode possiblyWrapRootWithOuterLimitFromContext( return root.rel; } + if (root.rel instanceof Sort) { + Sort innerSort = (Sort) root.rel; + final int offset = Calcites.getOffset(innerSort); + final int fetch = Calcites.collapseFetch( + Calcites.getFetch(innerSort), + Ints.checkedCast(outerLimit), + 0 + ); + + return LogicalSort.create( + innerSort.getInput(), + innerSort.collation, + makeBigIntLiteral(offset), + makeBigIntLiteral(fetch) + ); + } + return LogicalSort.create( root.rel, root.collation, @@ -282,7 +293,7 @@ private static class EnumeratorIterator implements Iterator { private final Iterator it; - public EnumeratorIterator(Iterator it) + EnumeratorIterator(Iterator it) { this.it = it; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java index c3e91233f990..bad8bdef7ed1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java @@ -22,7 +22,7 @@ import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rex.RexLiteral; +import org.apache.druid.sql.calcite.planner.Calcites; /** * Collapses two adjacent Sort operations together. Useful for queries like @@ -45,48 +45,30 @@ public static SortCollapseRule instance() @Override public void onMatch(final RelOptRuleCall call) { - // First is the inner sort, second is the outer sort. - final Sort first = call.rel(1); - final Sort second = call.rel(0); + final Sort outerSort = call.rel(0); + final Sort innerSort = call.rel(1); - if (second.collation.getFieldCollations().isEmpty() - || second.collation.getFieldCollations().equals(first.collation.getFieldCollations())) { - // Add up the offsets. - final int firstOffset = (first.offset != null ? RexLiteral.intValue(first.offset) : 0); - final int secondOffset = (second.offset != null ? RexLiteral.intValue(second.offset) : 0); - - final int offset = firstOffset + secondOffset; - final int fetch; + if (outerSort.collation.getFieldCollations().isEmpty() + || outerSort.collation.getFieldCollations().equals(innerSort.collation.getFieldCollations())) { + final int innerOffset = Calcites.getOffset(innerSort); + final int innerFetch = Calcites.getFetch(innerSort); + final int outerOffset = Calcites.getOffset(outerSort); + final int outerFetch = Calcites.getFetch(outerSort); - if (first.fetch == null && second.fetch == null) { - // Neither has a limit => no limit overall. - fetch = -1; - } else if (first.fetch == null) { - // Outer limit only. - fetch = RexLiteral.intValue(second.fetch); - } else if (second.fetch == null) { - // Inner limit only. - fetch = Math.max(0, RexLiteral.intValue(first.fetch) - secondOffset); - } else { - fetch = Math.max( - 0, - Math.min( - RexLiteral.intValue(first.fetch) - secondOffset, - RexLiteral.intValue(second.fetch) - ) - ); - } + // Add up the offsets. + final int offset = innerOffset + outerOffset; + final int fetch = Calcites.collapseFetch(innerFetch, outerFetch, outerOffset); - final Sort combined = first.copy( - first.getTraitSet(), - first.getInput(), - first.getCollation(), + final Sort combined = innerSort.copy( + innerSort.getTraitSet(), + innerSort.getInput(), + innerSort.getCollation(), offset == 0 ? null : call.builder().literal(offset), fetch < 0 ? null : call.builder().literal(fetch) ); call.transformTo(combined); - call.getPlanner().setImportance(second, 0.0); + call.getPlanner().setImportance(outerSort, 0.0); } } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 4d22779491c2..4ac6f92e8dbe 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -214,6 +214,8 @@ public int getMaxSemiJoinRowsInMemory() // Matches QUERY_CONTEXT_LOS_ANGELES public static final Map TIMESERIES_CONTEXT_LOS_ANGELES = new HashMap<>(); + public static final Map OUTER_LIMIT_CONTEXT = new HashMap<>(QUERY_CONTEXT_DEFAULT); + public static QueryRunnerFactoryConglomerate conglomerate; public static Closer resourceCloser; @@ -236,6 +238,8 @@ public int getMaxSemiJoinRowsInMemory() TIMESERIES_CONTEXT_LOS_ANGELES.put("skipEmptyBuckets", true); TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS); TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); + + OUTER_LIMIT_CONTEXT.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 2); } // Generate timestamps for expected results diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 0c1b9f96fa11..0f9ffb764e72 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -786,13 +786,14 @@ public void testSelectStarFromSelectSingleColumnWithLimitDescending() throws Exc new Object[]{"10.1"} ) ); + } - // The outer limit wrapping behavior that was used in the query above can be applied with a context flag instead - Map outerLimitContext = new HashMap<>(QUERY_CONTEXT_DEFAULT); - outerLimitContext.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 2); + @Test + public void testSelectLimitWrapping() throws Exception + { testQuery( "SELECT dim1 FROM druid.foo ORDER BY __time DESC", - outerLimitContext, + OUTER_LIMIT_CONTEXT, ImmutableList.of( newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) @@ -801,7 +802,7 @@ public void testSelectStarFromSelectSingleColumnWithLimitDescending() throws Exc .limit(2) .order(ScanQuery.Order.DESCENDING) .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .context(outerLimitContext) + .context(OUTER_LIMIT_CONTEXT) .build() ), ImmutableList.of( @@ -811,6 +812,138 @@ public void testSelectStarFromSelectSingleColumnWithLimitDescending() throws Exc ); } + @Test + public void testTopNLimitWrapping() throws Exception + { + List expected; + if (NullHandling.replaceWithDefault()) { + expected = ImmutableList.of( + new Object[]{"", 1L}, + new Object[]{"def", 1L} + ); + } else { + expected = ImmutableList.of( + new Object[]{"def", 1L}, + new Object[]{"abc", 1L} + ); + } + testQuery( + "SELECT dim1, COUNT(*) FROM druid.foo GROUP BY dim1 ORDER BY dim1 DESC", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("dim1", "d0", ValueType.STRING)) + .threshold(2) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric( + new InvertedTopNMetricSpec( + new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC) + ) + ) + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + expected + ); + } + + + @Test + public void testTopNLimitWrappingOrderByAgg() throws Exception + { + testQuery( + "SELECT dim1, COUNT(*) FROM druid.foo GROUP BY 1 ORDER BY 2 DESC", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("dim1", "d0", ValueType.STRING)) + .threshold(2) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric("a0") + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of(new Object[]{"", 1L}, new Object[]{"1", 1L}) + ); + } + + @Test + public void testGroupByLimitWrapping() throws Exception + { + List expected; + if (NullHandling.replaceWithDefault()) { + expected = ImmutableList.of( + new Object[]{"def", "abc", 1L}, + new Object[]{"abc", "", 1L} + ); + } else { + expected = ImmutableList.of( + new Object[]{"def", "abc", 1L}, + new Object[]{"abc", null, 1L} + ); + } + testQuery( + "SELECT dim1, dim2, COUNT(*) FROM druid.foo GROUP BY dim1, dim2 ORDER BY dim1 DESC", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + new GroupByQuery.Builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0", ValueType.STRING), + new DefaultDimensionSpec("dim2", "d1", ValueType.STRING) + ) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC)), + 2 + ) + ) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) + .setContext(OUTER_LIMIT_CONTEXT) + .build() + ), + expected + ); + } + + @Test + public void testGroupByLimitWrappingOrderByAgg() throws Exception + { + testQuery( + "SELECT dim1, dim2, COUNT(*) FROM druid.foo GROUP BY 1, 2 ORDER BY 3 DESC", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + new GroupByQuery.Builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0", ValueType.STRING), + new DefaultDimensionSpec("dim2", "d1", ValueType.STRING) + ) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC) + ), + 2 + ) + ) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) + .setContext(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of(new Object[]{"", "a", 1L}, new Object[]{"def", "abc", 1L}) + ); + } + @Test public void testSelectProjectionFromSelectSingleColumnWithInnerLimitDescending() throws Exception { diff --git a/web-console/src/views/query-view/query-view.tsx b/web-console/src/views/query-view/query-view.tsx index 2f1d7499bec8..45729071c2a4 100644 --- a/web-console/src/views/query-view/query-view.tsx +++ b/web-console/src/views/query-view/query-view.tsx @@ -131,13 +131,6 @@ export class QueryView extends React.PureComponent { const { queryString, queryContext, wrapQueryLimit } = queryWithContext; - const actualQuery = QueryView.wrapInLimitIfNeeded(queryString, wrapQueryLimit); - const explainPayload: Record = { - query: QueryView.wrapInExplainIfNeeded(actualQuery), + query: QueryView.wrapInExplainIfNeeded(queryString), resultFormat: 'object', }; - if (!isEmptyContext(queryContext)) explainPayload.context = queryContext; + if (!isEmptyContext(queryContext) || wrapQueryLimit) { + explainPayload.context = Object.assign({}, queryContext || {}); + explainPayload.context.sqlOuterLimit = wrapQueryLimit; + } const result = await queryDruidSql(explainPayload); return parseQueryPlan(result[0]['PLAN']);