From ca48e0a3c8b0c19e1bc82b088121f22ddf519af6 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 19 Jun 2024 12:02:57 +0530 Subject: [PATCH 1/8] init --- .../sql/DoublesSketchSqlAggregatorTest.java | 141 +++++++++++++++++- .../druid/frame/segment/FrameCursorUtils.java | 4 + .../server/ClientQuerySegmentWalker.java | 35 +++-- .../sql/calcite/BaseCalciteQueryTest.java | 2 +- .../sql/calcite/CalciteSubqueryTest.java | 71 +++++++++ .../druid/sql/calcite/QueryTestRunner.java | 2 +- 6 files changed, 234 insertions(+), 21 deletions(-) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index bce6a306c800..252058b74a6f 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -263,6 +263,118 @@ public void testQuantileOnComplexColumn() ); } + @Test + public void testtest() + { + final List expectedResults; + if (NullHandling.replaceWithDefault()) { + expectedResults = ImmutableList.of( + new Object[]{ + 0.0, + 0.0, + 10.1, + 10.1, + 20.2, + 0.0, + 10.1, + 0.0 + } + ); + } else { + expectedResults = ImmutableList.of( + new Object[]{ + 1.0, + 2.0, + 10.1, + 10.1, + 20.2, + Double.NaN, + 2.0, + Double.NaN + } + ); + } + + testQuery( + "SELECT\n" + + " DATE_TRUNC('DAY', CURRENT_TIMESTAMP) AS __time,\n" + + " table_nb_days.\"user\",\n" + + " table_nb_days.number_days_interactions,\n" + + " (CASE WHEN table_nb_days.number_days_interactions < table_quantiles.first_quartile THEN 'Low_NB_DAYS' \n" + + " WHEN table_nb_days.number_days_interactions >= table_quantiles.first_quartile AND table_nb_days.number_days_interactions < table_quantiles.third_quartile THEN 'Medium_NB_DAYS' \n" + + " WHEN table_nb_days.number_days_interactions >= table_quantiles.third_quartile THEN 'High_NB_DAYS' END) AS NB_Days_Segment\n" + + "FROM (\n" + + " SELECT\n" + + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.25) AS first_quartile,\n" + + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.75) AS third_quartile\n" + + " FROM (\n" + + " SELECT\n" + + " dim1 \"user\",\n" + + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " FROM \"foo\"\n" + + " WHERE __time >= TIMESTAMP '1990-01-23' AND __time <= TIMESTAMP '2024-03-23'\n" + + " GROUP BY 1\n" + + " ) AS table_nb_days\n" + + ") AS table_quantiles, (\n" + + " SELECT\n" + + " dim1 \"user\",\n" + + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " FROM \"foo\"\n" + + " WHERE __time >= TIMESTAMP '1990-01-23' AND __time <= TIMESTAMP '2024-03-23' \n" + + " GROUP BY 1\n" + + ") AS table_nb_days\n", + QUERY_CONTEXT_WITH_SUBQUERY_MEMORY_LIMIT, + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "v0", + "CAST(\"dim1\", 'DOUBLE')", + ColumnType.FLOAT, + TestExprMacroTable.INSTANCE + ), + new ExpressionVirtualColumn( + "v1", + "(CAST(\"dim1\", 'DOUBLE') * 2)", + ColumnType.FLOAT, + TestExprMacroTable.INSTANCE + ) + ) + .aggregators(ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "v0", 128), + new DoublesSketchAggregatorFactory("a1:agg", "v0", 64), + new DoublesSketchAggregatorFactory("a2:agg", "v0", 256), + new DoublesSketchAggregatorFactory("a4:agg", "v1", 128), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a5:agg", "v0", 128), + equality("dim2", "abc", ColumnType.STRING) + ), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a6:agg", "v0", 128), + not(equality("dim2", "abc", ColumnType.STRING)) + ) + )) + .postAggregators( + new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), + new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), + new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), + new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), + new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f), + new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f), + new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f), + new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expectedResults + ); + } + + @Test public void testQuantileOnCastedString() { @@ -963,9 +1075,32 @@ public void testSuccessWithSmallMaxStreamLength() ); testQuery( "SELECT\n" - + "APPROX_QUANTILE_DS(m1, 0.01),\n" - + "APPROX_QUANTILE_DS(cnt, 0.5)\n" - + "FROM foo", + + " DATE_TRUNC('DAY', CURRENT_TIMESTAMP) AS __time,\n" + + " table_nb_days.\"user\",\n" + + " table_nb_days.number_days_interactions,\n" + + " (CASE WHEN table_nb_days.number_days_interactions < table_quantiles.first_quartile THEN 'Low_NB_DAYS' \n" + + " WHEN table_nb_days.number_days_interactions >= table_quantiles.first_quartile AND table_nb_days.number_days_interactions < table_quantiles.third_quartile THEN 'Medium_NB_DAYS' \n" + + " WHEN table_nb_days.number_days_interactions >= table_quantiles.third_quartile THEN 'High_NB_DAYS' END) AS NB_Days_Segment\n" + + "FROM (\n" + + " SELECT\n" + + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.25) AS first_quartile,\n" + + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.75) AS third_quartile\n" + + " FROM (\n" + + " SELECT\n" + + " dim1,\n" + + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " FROM \"foo\"\n" + + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23'\n" + + " GROUP BY 1\n" + + " ) AS table_nb_days\n" + + ") AS table_quantiles, (\n" + + " SELECT\n" + + " dim1,\n" + + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " FROM \"foo\"\n" + + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23' \n" + + " GROUP BY 1\n" + + ") AS table_nb_days\n", context, Collections.singletonList( Druids.newTimeseriesQueryBuilder() diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java index 3cb5c686e9d6..2be08660fc22 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java @@ -107,6 +107,10 @@ public static Iterable cursorToFramesIterable( final FrameWriterFactory frameWriterFactory ) { + + frameWriterFactory.signature() + + return () -> new Iterator() { @Override diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 1d3b38b2fdbd..5e1ab92cf53f 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -425,16 +426,18 @@ private DataSource inlineIfNecessary( } else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) { // Subquery needs to be inlined. Assign it a subquery id and run it. - final Sequence queryResults; + final Supplier> queryResults; if (dryRun) { - queryResults = Sequences.empty(); + queryResults = Sequences::empty; } else { - final QueryRunner subqueryRunner = subQuery.getRunner(this); - queryResults = subqueryRunner.run( - QueryPlus.wrap(subQuery), - DirectDruidClient.makeResponseContextForQuery() - ); + queryResults = () -> { + final QueryRunner subqueryRunner = subQuery.getRunner(this); + return subqueryRunner.run( + QueryPlus.wrap(subQuery), + DirectDruidClient.makeResponseContextForQuery() + ); + }; } return toInlineDataSource( @@ -645,7 +648,7 @@ private DataSource insertSubqueryIds( * Convert the results of a particular query into a materialized (List-based) InlineDataSource. * * @param query the query - * @param results query results + * @param resultsSupplier query results * @param toolChest toolchest for the query * @param limitAccumulator an accumulator for tracking the number of accumulated rows in all subqueries for a * particular master query @@ -655,7 +658,7 @@ private DataSource insertSubqueryIds( */ private static > DataSource toInlineDataSource( final QueryType query, - final Sequence results, + final Supplier> resultsSupplier, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, @@ -679,7 +682,7 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( query, - results, + resultsSupplier, toolChest, limitAccumulator, limit, @@ -693,7 +696,7 @@ private static > DataSource toInlineDataSource( } Optional maybeDataSource = materializeResultsAsFrames( query, - results, + resultsSupplier, toolChest, limitAccumulator, memoryLimitAccumulator, @@ -712,7 +715,7 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); dataSource = materializeResultsAsArray( query, - results, + resultsSupplier, toolChest, limitAccumulator, limit, @@ -735,7 +738,7 @@ private static > DataSource toInlineDataSource( */ private static > Optional materializeResultsAsFrames( final QueryType query, - final Sequence results, + final Supplier> results, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, @@ -749,7 +752,7 @@ private static > Optional materializeR try { framesOptional = toolChest.resultsAsFrames( query, - results, + (Sequence) results.get(), new ArenaMemoryAllocatorFactory(FRAME_SIZE), useNestedForUnknownTypeInSubquery ); @@ -795,7 +798,7 @@ private static > Optional materializeR */ private static > DataSource materializeResultsAsArray( final QueryType query, - final Sequence results, + final Supplier> resultsSupplier, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final int limit, @@ -807,7 +810,7 @@ private static > DataSource materializeResultsAsAr final ArrayList resultList = new ArrayList<>(); - toolChest.resultsAsArrays(query, results).accumulate( + toolChest.resultsAsArrays(query, (Sequence) resultsSupplier.get()).accumulate( resultList, (acc, in) -> { if (limitAccumulator.getAndIncrement() >= rowLimitToUse) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index a2e45c4af8c2..f6fdedb92fb7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -256,7 +256,7 @@ public static void setupNullValues() .putAll(QUERY_CONTEXT_DEFAULT) .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") // Disallows the fallback to row based limiting - .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "1") + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "100") .build(); // Add additional context to the given context map for when the diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index 6269e2a5c8cc..745958dfb014 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -1330,6 +1330,77 @@ public void testSingleValueStringAgg(String testName, Map queryC .run(); } + @MethodSource("constructorFeeder") + @ParameterizedTest(name = "{0}") + public void testtest(String testName, Map queryContext) + { + testBuilder() + .sql( + "SELECT\n" + + " DATE_TRUNC('DAY', CURRENT_TIMESTAMP) AS __time,\n" + + " table_nb_days.\"user\",\n" + + " table_nb_days.number_days_interactions,\n" + + " (CASE WHEN table_nb_days.number_days_interactions < table_quantiles.first_quartile THEN 'Low_NB_DAYS' \n" + + " WHEN table_nb_days.number_days_interactions >= table_quantiles.first_quartile AND table_nb_days.number_days_interactions < table_quantiles.third_quartile THEN 'Medium_NB_DAYS' \n" + + " WHEN table_nb_days.number_days_interactions >= table_quantiles.third_quartile THEN 'High_NB_DAYS' END) AS NB_Days_Segment\n" + + "FROM (\n" + + " SELECT\n" + + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.25) AS first_quartile,\n" + + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.75) AS third_quartile\n" + + " FROM (\n" + + " SELECT\n" + + " dim1,\n" + + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " FROM \"foo\"\n" + + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23'\n" + + " GROUP BY 1\n" + + " ) AS table_nb_days\n" + + ") AS table_quantiles, (\n" + + " SELECT\n" + + " dim1,\n" + + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " FROM \"foo\"\n" + + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23' \n" + + " GROUP BY 1\n" + + ") AS table_nb_days\n" + ) + .expectedQuery( + Druids.newTimeseriesQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.WIKIPEDIA), + new QueryDataSource( + Druids.newScanQueryBuilder() + .dataSource(CalciteTests.WIKIPEDIA) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .offset(6L) + .limit(1L) + .order(ScanQuery.Order.DESCENDING) + .columns("__time", "channel") + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + "j0.", + "(\"channel\" == \"j0.channel\")", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ) + .expectedResults( + ImmutableList.of( + new Object[] {1256L} + ) + ) + .run(); + } + @MethodSource("constructorFeeder") @ParameterizedTest(name = "{0}") public void testSingleValueStringMultipleRowsAgg(String testName, Map queryContext) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java index 3430e10edfcb..f12e9cc4b3b1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java @@ -410,7 +410,7 @@ public VerifyNativeQueries(BaseExecuteQuery execStep) public void verify() { for (QueryResults queryResults : execStep.results()) { - verifyQuery(queryResults); +// verifyQuery(queryResults); } } From a832ad255f91c83ace02abcb175d8c312a360120 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 27 Jun 2024 13:26:27 +0530 Subject: [PATCH 2/8] working --- .../sql/DoublesSketchSqlAggregatorTest.java | 38 ++++------------ .../druid/frame/segment/FrameCursorUtils.java | 26 ++++++++--- .../groupby/GroupByQueryQueryToolChest.java | 2 + .../TimeseriesQueryQueryToolChest.java | 2 + .../query/topn/TopNQueryQueryToolChest.java | 2 + .../server/ClientQuerySegmentWalker.java | 45 +++++++------------ .../druid/sql/calcite/QueryTestRunner.java | 2 +- 7 files changed, 53 insertions(+), 64 deletions(-) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 252058b74a6f..8f36b6743150 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -264,36 +264,16 @@ public void testQuantileOnComplexColumn() } @Test - public void testtest() + public void testSubqueryWithNestedGroupBy() { - final List expectedResults; - if (NullHandling.replaceWithDefault()) { - expectedResults = ImmutableList.of( - new Object[]{ - 0.0, - 0.0, - 10.1, - 10.1, - 20.2, - 0.0, - 10.1, - 0.0 - } - ); - } else { - expectedResults = ImmutableList.of( - new Object[]{ - 1.0, - 2.0, - 10.1, - 10.1, - 20.2, - Double.NaN, - 2.0, - Double.NaN - } - ); - } + final List expectedResults = ImmutableList.of( + new Object[]{946684800000L, "", 1L, "High_NB_DAYS"}, + new Object[]{946684800000L, "1", 1L, "High_NB_DAYS"}, + new Object[]{946684800000L, "10.1", 1L, "High_NB_DAYS"}, + new Object[]{946684800000L, "2", 1L, "High_NB_DAYS"}, + new Object[]{946684800000L, "abc", 1L, "High_NB_DAYS"}, + new Object[]{946684800000L, "def", 1L, "High_NB_DAYS"} + ); testQuery( "SELECT\n" diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java index 2be08660fc22..f842b7e21248 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java @@ -23,6 +23,7 @@ import org.apache.druid.frame.Frame; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.frame.write.UnsupportedColumnTypeException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -32,6 +33,7 @@ import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.filter.BoundFilter; import org.apache.druid.segment.filter.Filters; import org.joda.time.Interval; @@ -40,6 +42,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; public class FrameCursorUtils { @@ -100,16 +103,17 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval) /** * Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor, * and writes the columns to the frames. The iterable is lazy, and it traverses the required portion of the cursor - * as required + * as required. + *

+ * If the type is missing from the signature, the method throws an exception without advancing/modifying/closing the + * cursor */ public static Iterable cursorToFramesIterable( final Cursor cursor, final FrameWriterFactory frameWriterFactory ) { - - frameWriterFactory.signature() - + throwIfColumnsHaveUnknownType(frameWriterFactory.signature()); return () -> new Iterator() { @@ -162,7 +166,19 @@ public static Sequence cursorToFramesSequence( final FrameWriterFactory frameWriterFactory ) { - return Sequences.simple(cursorToFramesIterable(cursor, frameWriterFactory)); } + + /** + * Throws {@link UnsupportedColumnTypeException} if the row signature has columns with unknown types. This is used to + * pre-determine if the frames can be materialized as rows, without touching the resource generating the frames. + */ + public static void throwIfColumnsHaveUnknownType(final RowSignature rowSignature) + { + for (int i = 0; i < rowSignature.size(); ++i) { + if (!rowSignature.getColumnType(i).isPresent()) { + throw new UnsupportedColumnTypeException(rowSignature.getColumnName(i), null); + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 7588848cf5be..b19b479c26d1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -816,6 +816,8 @@ public Optional> resultsAsFrames( ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; + FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); + FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( memoryAllocatorFactory, modifiedRowSignature, diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 8527d551cf5c..17a2f8be956b 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -485,6 +485,8 @@ public Optional> resultsAsFrames( RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; + FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); + FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( memoryAllocatorFactory, modifiedRowSignature, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 02d07e255709..25a4284aa427 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -569,6 +569,8 @@ public Optional> resultsAsFrames( RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; + FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); + FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( memoryAllocatorFactory, rowSignature, diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 5e1ab92cf53f..ae918d1a17c4 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -78,7 +78,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -426,18 +425,16 @@ private DataSource inlineIfNecessary( } else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) { // Subquery needs to be inlined. Assign it a subquery id and run it. - final Supplier> queryResults; + final Sequence queryResults; if (dryRun) { - queryResults = Sequences::empty; + queryResults = Sequences.empty(); } else { - queryResults = () -> { - final QueryRunner subqueryRunner = subQuery.getRunner(this); - return subqueryRunner.run( - QueryPlus.wrap(subQuery), - DirectDruidClient.makeResponseContextForQuery() - ); - }; + final QueryRunner subqueryRunner = subQuery.getRunner(this); + queryResults = subqueryRunner.run( + QueryPlus.wrap(subQuery), + DirectDruidClient.makeResponseContextForQuery() + ); } return toInlineDataSource( @@ -648,7 +645,7 @@ private DataSource insertSubqueryIds( * Convert the results of a particular query into a materialized (List-based) InlineDataSource. * * @param query the query - * @param resultsSupplier query results + * @param results query results * @param toolChest toolchest for the query * @param limitAccumulator an accumulator for tracking the number of accumulated rows in all subqueries for a * particular master query @@ -658,7 +655,7 @@ private DataSource insertSubqueryIds( */ private static > DataSource toInlineDataSource( final QueryType query, - final Supplier> resultsSupplier, + final Sequence results, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, @@ -682,7 +679,7 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( query, - resultsSupplier, + results, toolChest, limitAccumulator, limit, @@ -696,7 +693,7 @@ private static > DataSource toInlineDataSource( } Optional maybeDataSource = materializeResultsAsFrames( query, - resultsSupplier, + results, toolChest, limitAccumulator, memoryLimitAccumulator, @@ -715,7 +712,7 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); dataSource = materializeResultsAsArray( query, - resultsSupplier, + results, toolChest, limitAccumulator, limit, @@ -738,7 +735,7 @@ private static > DataSource toInlineDataSource( */ private static > Optional materializeResultsAsFrames( final QueryType query, - final Supplier> results, + final Sequence results, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, @@ -752,7 +749,7 @@ private static > Optional materializeR try { framesOptional = toolChest.resultsAsFrames( query, - (Sequence) results.get(), + results, new ArenaMemoryAllocatorFactory(FRAME_SIZE), useNestedForUnknownTypeInSubquery ); @@ -775,22 +772,12 @@ private static > Optional materializeR } ); return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); - - } - catch (ResourceLimitExceededException e) { - throw e; } catch (UnsupportedColumnTypeException e) { subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo(); log.debug(e, "Type info in signature insufficient to materialize rows as frames."); return Optional.empty(); } - catch (Exception e) { - subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason(); - log.debug(e, "Unable to materialize the results as frames due to an unhandleable exception " - + "while conversion. Defaulting to materializing the results as rows"); - return Optional.empty(); - } } /** @@ -798,7 +785,7 @@ private static > Optional materializeR */ private static > DataSource materializeResultsAsArray( final QueryType query, - final Supplier> resultsSupplier, + final Sequence results, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final int limit, @@ -810,7 +797,7 @@ private static > DataSource materializeResultsAsAr final ArrayList resultList = new ArrayList<>(); - toolChest.resultsAsArrays(query, (Sequence) resultsSupplier.get()).accumulate( + toolChest.resultsAsArrays(query, results).accumulate( resultList, (acc, in) -> { if (limitAccumulator.getAndIncrement() >= rowLimitToUse) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java index f12e9cc4b3b1..3430e10edfcb 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestRunner.java @@ -410,7 +410,7 @@ public VerifyNativeQueries(BaseExecuteQuery execStep) public void verify() { for (QueryResults queryResults : execStep.results()) { -// verifyQuery(queryResults); + verifyQuery(queryResults); } } From fea583a96378ee099fe3ef6b7e4575b10db910f9 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 1 Jul 2024 17:36:32 +0530 Subject: [PATCH 3/8] tests fix --- .../sql/DoublesSketchSqlAggregatorTest.java | 204 ++++++++++-------- .../druid/frame/segment/FrameCursorUtils.java | 1 - 2 files changed, 113 insertions(+), 92 deletions(-) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 8f36b6743150..8a7f222ac0b7 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -24,8 +24,10 @@ import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.DruidInjectorBuilder; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; +import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -33,6 +35,7 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule; @@ -43,6 +46,7 @@ import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator; import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchSqlAggregatorTest.DoublesSketchComponentSupplier; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -53,6 +57,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; @@ -267,88 +272,128 @@ public void testQuantileOnComplexColumn() public void testSubqueryWithNestedGroupBy() { final List expectedResults = ImmutableList.of( - new Object[]{946684800000L, "", 1L, "High_NB_DAYS"}, - new Object[]{946684800000L, "1", 1L, "High_NB_DAYS"}, - new Object[]{946684800000L, "10.1", 1L, "High_NB_DAYS"}, - new Object[]{946684800000L, "2", 1L, "High_NB_DAYS"}, - new Object[]{946684800000L, "abc", 1L, "High_NB_DAYS"}, - new Object[]{946684800000L, "def", 1L, "High_NB_DAYS"} + new Object[]{946684800000L, "", 1L, "val1"}, + new Object[]{946684800000L, "1", 1L, "val1"}, + new Object[]{946684800000L, "10.1", 1L, "val1"}, + new Object[]{946684800000L, "2", 1L, "val1"}, + new Object[]{946684800000L, "abc", 1L, "val1"}, + new Object[]{946684800000L, "def", 1L, "val1"} ); testQuery( "SELECT\n" - + " DATE_TRUNC('DAY', CURRENT_TIMESTAMP) AS __time,\n" - + " table_nb_days.\"user\",\n" - + " table_nb_days.number_days_interactions,\n" - + " (CASE WHEN table_nb_days.number_days_interactions < table_quantiles.first_quartile THEN 'Low_NB_DAYS' \n" - + " WHEN table_nb_days.number_days_interactions >= table_quantiles.first_quartile AND table_nb_days.number_days_interactions < table_quantiles.third_quartile THEN 'Medium_NB_DAYS' \n" - + " WHEN table_nb_days.number_days_interactions >= table_quantiles.third_quartile THEN 'High_NB_DAYS' END) AS NB_Days_Segment\n" + + " MILLIS_TO_TIMESTAMP(946684800000) AS __time,\n" + + " alias.\"user\",\n" + + " alias.days,\n" + + " (CASE WHEN alias.days < quantiles.first_quartile THEN 'val2' \n" + + " WHEN alias.days >= quantiles.first_quartile AND alias.days < quantiles.third_quartile THEN 'val3' \n" + + " WHEN alias.days >= quantiles.third_quartile THEN 'val1' END) AS val4\n" + "FROM (\n" + " SELECT\n" - + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.25) AS first_quartile,\n" - + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.75) AS third_quartile\n" + + " APPROX_QUANTILE_DS(alias.days, 0.25) AS first_quartile,\n" + + " APPROX_QUANTILE_DS(alias.days, 0.75) AS third_quartile\n" + " FROM (\n" + " SELECT\n" + " dim1 \"user\",\n" - + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " COUNT(DISTINCT __time) AS days\n" + " FROM \"foo\"\n" - + " WHERE __time >= TIMESTAMP '1990-01-23' AND __time <= TIMESTAMP '2024-03-23'\n" + " GROUP BY 1\n" - + " ) AS table_nb_days\n" - + ") AS table_quantiles, (\n" + + " ) AS alias\n" + + ") AS quantiles, (\n" + " SELECT\n" + " dim1 \"user\",\n" - + " COUNT(DISTINCT __time) AS number_days_interactions\n" + + " COUNT(DISTINCT __time) AS days\n" + " FROM \"foo\"\n" - + " WHERE __time >= TIMESTAMP '1990-01-23' AND __time <= TIMESTAMP '2024-03-23' \n" + " GROUP BY 1\n" - + ") AS table_nb_days\n", + + ") AS alias\n", QUERY_CONTEXT_WITH_SUBQUERY_MEMORY_LIMIT, ImmutableList.of( - Druids.newTimeseriesQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) - .granularity(Granularities.ALL) - .virtualColumns( - new ExpressionVirtualColumn( - "v0", - "CAST(\"dim1\", 'DOUBLE')", - ColumnType.FLOAT, - TestExprMacroTable.INSTANCE - ), - new ExpressionVirtualColumn( - "v1", - "(CAST(\"dim1\", 'DOUBLE') * 2)", - ColumnType.FLOAT, - TestExprMacroTable.INSTANCE - ) - ) - .aggregators(ImmutableList.of( - new DoublesSketchAggregatorFactory("a0:agg", "v0", 128), - new DoublesSketchAggregatorFactory("a1:agg", "v0", 64), - new DoublesSketchAggregatorFactory("a2:agg", "v0", 256), - new DoublesSketchAggregatorFactory("a4:agg", "v1", 128), - new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a5:agg", "v0", 128), - equality("dim2", "abc", ColumnType.STRING) - ), - new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a6:agg", "v0", 128), - not(equality("dim2", "abc", ColumnType.STRING)) - ) - )) - .postAggregators( - new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), - new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), - new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), - new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), - new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f), - new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f), - new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f), - new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f) - ) - .context(QUERY_CONTEXT_DEFAULT) - .build() + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource("foo") + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .addDimension(new DefaultDimensionSpec( + "dim1", + "d0", + ColumnType.STRING + )) + .addAggregator(new CardinalityAggregatorFactory( + "a0:a", + null, + Collections.singletonList(new DefaultDimensionSpec( + "__time", + "__time", + ColumnType.LONG + )), + false, + true + )) + .setPostAggregatorSpecs(new HyperUniqueFinalizingPostAggregator( + "a0", + "a0:a" + )) + .build() + ) + ) + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .addAggregator(new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)) + .setPostAggregatorSpecs( + new DoublesSketchToQuantilePostAggregator( + "_a0", + new FieldAccessPostAggregator("_a0:agg", "_a0:agg"), + 0.25 + ), + new DoublesSketchToQuantilePostAggregator( + "_a1", + new FieldAccessPostAggregator("_a0:agg", "_a0:agg"), + 0.75 + ) + ) + .build() + + ), + new QueryDataSource( + GroupByQuery.builder() + .setDataSource("foo") + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .addDimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING)) + .addAggregator(new CardinalityAggregatorFactory( + "a0", + null, + Collections.singletonList(new DefaultDimensionSpec( + "__time", + "__time", + ColumnType.LONG + )), + false, + true + )) + .build() + ), + "j0.", + "1", + JoinType.INNER, + null, + TestExprMacroTable.INSTANCE, + null + ) + ) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .virtualColumns( + new ExpressionVirtualColumn("v0", "946684800000", ColumnType.LONG, TestExprMacroTable.INSTANCE), + new ExpressionVirtualColumn("v1", "case_searched((\"j0.a0\" < \"_a0\"),'val2',((\"j0.a0\" >= \"_a0\") && (\"j0.a0\" < \"_a1\")),'val3',(\"j0.a0\" >= \"_a1\"),'val1',null)", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ) + .columns("j0.a0", "j0.d0", "v0", "v1") + .build() ), expectedResults ); @@ -1055,32 +1100,9 @@ public void testSuccessWithSmallMaxStreamLength() ); testQuery( "SELECT\n" - + " DATE_TRUNC('DAY', CURRENT_TIMESTAMP) AS __time,\n" - + " table_nb_days.\"user\",\n" - + " table_nb_days.number_days_interactions,\n" - + " (CASE WHEN table_nb_days.number_days_interactions < table_quantiles.first_quartile THEN 'Low_NB_DAYS' \n" - + " WHEN table_nb_days.number_days_interactions >= table_quantiles.first_quartile AND table_nb_days.number_days_interactions < table_quantiles.third_quartile THEN 'Medium_NB_DAYS' \n" - + " WHEN table_nb_days.number_days_interactions >= table_quantiles.third_quartile THEN 'High_NB_DAYS' END) AS NB_Days_Segment\n" - + "FROM (\n" - + " SELECT\n" - + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.25) AS first_quartile,\n" - + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.75) AS third_quartile\n" - + " FROM (\n" - + " SELECT\n" - + " dim1,\n" - + " COUNT(DISTINCT __time) AS number_days_interactions\n" - + " FROM \"foo\"\n" - + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23'\n" - + " GROUP BY 1\n" - + " ) AS table_nb_days\n" - + ") AS table_quantiles, (\n" - + " SELECT\n" - + " dim1,\n" - + " COUNT(DISTINCT __time) AS number_days_interactions\n" - + " FROM \"foo\"\n" - + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23' \n" - + " GROUP BY 1\n" - + ") AS table_nb_days\n", + + "APPROX_QUANTILE_DS(m1, 0.01),\n" + + "APPROX_QUANTILE_DS(cnt, 0.5)\n" + + "FROM foo", context, Collections.singletonList( Druids.newTimeseriesQueryBuilder() diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java index f842b7e21248..de970363bb4f 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java @@ -42,7 +42,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.Objects; public class FrameCursorUtils { From ce12c442768ca20eafaf8b6aee6691af442c2ed2 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 2 Jul 2024 08:46:10 +0530 Subject: [PATCH 4/8] tests --- .../sql/DoublesSketchSqlAggregatorTest.java | 8 ++- .../sql/calcite/BaseCalciteQueryTest.java | 2 +- .../sql/calcite/CalciteSubqueryTest.java | 71 ------------------- 3 files changed, 8 insertions(+), 73 deletions(-) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 8a7f222ac0b7..ac10c6b35ef2 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -306,7 +307,12 @@ public void testSubqueryWithNestedGroupBy() + " FROM \"foo\"\n" + " GROUP BY 1\n" + ") AS alias\n", - QUERY_CONTEXT_WITH_SUBQUERY_MEMORY_LIMIT, + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") + // Disallows the fallback to row based limiting + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "1") + .build(), ImmutableList.of( newScanQueryBuilder() .dataSource( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index f6fdedb92fb7..a2e45c4af8c2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -256,7 +256,7 @@ public static void setupNullValues() .putAll(QUERY_CONTEXT_DEFAULT) .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") // Disallows the fallback to row based limiting - .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "100") + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "1") .build(); // Add additional context to the given context map for when the diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index 745958dfb014..6269e2a5c8cc 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -1330,77 +1330,6 @@ public void testSingleValueStringAgg(String testName, Map queryC .run(); } - @MethodSource("constructorFeeder") - @ParameterizedTest(name = "{0}") - public void testtest(String testName, Map queryContext) - { - testBuilder() - .sql( - "SELECT\n" - + " DATE_TRUNC('DAY', CURRENT_TIMESTAMP) AS __time,\n" - + " table_nb_days.\"user\",\n" - + " table_nb_days.number_days_interactions,\n" - + " (CASE WHEN table_nb_days.number_days_interactions < table_quantiles.first_quartile THEN 'Low_NB_DAYS' \n" - + " WHEN table_nb_days.number_days_interactions >= table_quantiles.first_quartile AND table_nb_days.number_days_interactions < table_quantiles.third_quartile THEN 'Medium_NB_DAYS' \n" - + " WHEN table_nb_days.number_days_interactions >= table_quantiles.third_quartile THEN 'High_NB_DAYS' END) AS NB_Days_Segment\n" - + "FROM (\n" - + " SELECT\n" - + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.25) AS first_quartile,\n" - + " APPROX_QUANTILE_DS(table_nb_days.number_days_interactions, 0.75) AS third_quartile\n" - + " FROM (\n" - + " SELECT\n" - + " dim1,\n" - + " COUNT(DISTINCT __time) AS number_days_interactions\n" - + " FROM \"foo\"\n" - + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23'\n" - + " GROUP BY 1\n" - + " ) AS table_nb_days\n" - + ") AS table_quantiles, (\n" - + " SELECT\n" - + " dim1,\n" - + " COUNT(DISTINCT __time) AS number_days_interactions\n" - + " FROM \"foo\"\n" - + " WHERE __time >= TIMESTAMP '2010-01-23' AND __time <= TIMESTAMP '2024-03-23' \n" - + " GROUP BY 1\n" - + ") AS table_nb_days\n" - ) - .expectedQuery( - Druids.newTimeseriesQueryBuilder() - .dataSource( - join( - new TableDataSource(CalciteTests.WIKIPEDIA), - new QueryDataSource( - Druids.newScanQueryBuilder() - .dataSource(CalciteTests.WIKIPEDIA) - .intervals(querySegmentSpec(Filtration.eternity())) - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .offset(6L) - .limit(1L) - .order(ScanQuery.Order.DESCENDING) - .columns("__time", "channel") - .legacy(false) - .context(QUERY_CONTEXT_DEFAULT) - .build() - ), - "j0.", - "(\"channel\" == \"j0.channel\")", - JoinType.INNER - ) - ) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .aggregators(aggregators(new CountAggregatorFactory("a0"))) - .context(QUERY_CONTEXT_DEFAULT) - .build() - ) - .expectedResults( - ImmutableList.of( - new Object[] {1256L} - ) - ) - .run(); - } - @MethodSource("constructorFeeder") @ParameterizedTest(name = "{0}") public void testSingleValueStringMultipleRowsAgg(String testName, Map queryContext) From 1bbfefe062f94143711d26e836b32689b92634ce Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 2 Jul 2024 10:26:11 +0530 Subject: [PATCH 5/8] fallback, window tests --- .../server/ClientQuerySegmentWalker.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index ae918d1a17c4..d54b43336ca2 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -746,6 +746,7 @@ private static > Optional materializeR { Optional> framesOptional; + boolean startedAccumulating = false; try { framesOptional = toolChest.resultsAsFrames( query, @@ -760,6 +761,9 @@ private static > Optional materializeR Sequence frames = framesOptional.get(); List frameSignaturePairs = new ArrayList<>(); + + startedAccumulating = true; + frames.forEach( frame -> { limitAccumulator.addAndGet(frame.getFrame().numRows()); @@ -778,6 +782,24 @@ private static > Optional materializeR log.debug(e, "Type info in signature insufficient to materialize rows as frames."); return Optional.empty(); } + catch (ResourceLimitExceededException e) { + throw e; + } + catch (Exception e) { + if (startedAccumulating) { + // If we have opened the resultSequence, we can't fall back safely as the resultSequence might hold some resources + // somewhere, due to which we need to surface the exception. + throw DruidException.defensive() + .build( + e, + "Unable to materialize the results as frames for estimating the byte footprint. " + + "Please disable the 'maxSubqueryBytes' by setting it to 'disabled' in the query context or removing it altogether " + + "from the query context and/or the server config." + ); + } else { + return Optional.empty(); + } + } } /** From 0a728c263a9755631af123cc794f237294df86b1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 2 Jul 2024 10:31:10 +0530 Subject: [PATCH 6/8] better comment --- .../java/org/apache/druid/server/ClientQuerySegmentWalker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index d54b43336ca2..d49ce3909f71 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -788,7 +788,7 @@ private static > Optional materializeR catch (Exception e) { if (startedAccumulating) { // If we have opened the resultSequence, we can't fall back safely as the resultSequence might hold some resources - // somewhere, due to which we need to surface the exception. + // that we release on exception, and we need to throw the exception to disable the 'maxSubqueryBytes' configuration throw DruidException.defensive() .build( e, From 33ee74cef6a320e46457c6dd1c5b91c90d4c54d5 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 2 Jul 2024 12:03:36 +0530 Subject: [PATCH 7/8] increase rows for fallback --- .../quantiles/sql/DoublesSketchSqlAggregatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index ac10c6b35ef2..4e9291254c35 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -311,7 +311,7 @@ public void testSubqueryWithNestedGroupBy() .putAll(QUERY_CONTEXT_DEFAULT) .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") // Disallows the fallback to row based limiting - .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "1") + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "10") .build(), ImmutableList.of( newScanQueryBuilder() From a3a36cf1081d9abb3326256cf99425f2ad9235c2 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 2 Jul 2024 15:11:30 +0530 Subject: [PATCH 8/8] Trigger Build