From 22e01a7e8a4964a5e79f0105a45cf521dec08248 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 25 Jul 2024 10:25:05 +0530 Subject: [PATCH 1/9] init --- .../WindowOperatorQueryQueryToolChest.java | 42 +++++++++++++++++++ .../sql/calcite/CalciteWindowQueryTest.java | 3 +- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index bec529eedefa..80bb146a02bb 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -23,9 +23,18 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.allocation.MemoryAllocatorFactory; +import org.apache.druid.frame.segment.FrameCursorUtils; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultQueryMetrics; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.IterableRowsCursorHelper; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -36,11 +45,14 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.NullColumn; +import org.apache.druid.segment.Cursor; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import java.io.Closeable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; public class WindowOperatorQueryQueryToolChest extends QueryToolChest @@ -116,6 +128,36 @@ public Sequence resultsAsArrays( return (Sequence) resultSequence; } + @Override + public Optional> resultsAsFrames( + WindowOperatorQuery query, + Sequence resultSequence, + MemoryAllocatorFactory memoryAllocatorFactory, + boolean useNestedForUnknownTypes + ) + { + RowSignature rowSignature = resultArraySignature(query); + RowSignature modifiedRowSignature = useNestedForUnknownTypes + ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) + : rowSignature; + FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); + FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( + memoryAllocatorFactory, + modifiedRowSignature, + new ArrayList<>() + ); + Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence( + resultsAsArrays(query, resultSequence), + rowSignature + ); + Cursor cursor = cursorAndCloseable.lhs; + Closeable closeble = cursorAndCloseable.rhs; + + Sequence frames = FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).withBaggage(closeble); + + return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature))); + } + /** * This class exists to unravel the RowsAndColumns that are used in this query and make it the return Sequence * actually be a Sequence of rows. This is relatively broken in a number of regards, the most obvious of which diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index b3d657b148f1..cbf0e7851a90 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -223,7 +223,8 @@ public void windowQueryTestWithCustomContextMaxSubqueryBytes(String filename) th .sql(testCase.getSql()) .queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true, PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000" + QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000", + QueryContexts.MAX_SUBQUERY_ROWS_KEY, "0" ) ) .addCustomVerification(QueryVerification.ofResults(testCase)) From ea1d57edf617e855bb35ca62d89519467debb797 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 19 Aug 2024 18:08:16 +0530 Subject: [PATCH 2/9] working --- .../druid/query/ResultSerializationMode.java | 29 +++++ .../WindowOperatorQueryQueryToolChest.java | 101 +++++++++--------- .../server/ClientQuerySegmentWalker.java | 85 +++++++++++---- .../server/ClientQuerySegmentWalkerUtils.java | 22 +++- .../server/ClientQuerySegmentWalkerTest.java | 4 +- 5 files changed, 167 insertions(+), 74 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java diff --git a/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java b/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java new file mode 100644 index 000000000000..ca866d130997 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +public enum ResultSerializationMode +{ + ROWS, + + FRAMES; + + public static String CTX_SERIALIZATION_PARAMETER = "serialization"; +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index 80bb146a02bb..56cce4a828da 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -23,33 +23,28 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.collect.ImmutableMap; -import org.apache.druid.frame.Frame; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; -import org.apache.druid.frame.segment.FrameCursorUtils; -import org.apache.druid.frame.write.FrameWriterFactory; -import org.apache.druid.frame.write.FrameWriterUtils; -import org.apache.druid.frame.write.FrameWriters; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.FrameSignaturePair; -import org.apache.druid.query.IterableRowsCursorHelper; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.ResultSerializationMode; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.NullColumn; -import org.apache.druid.segment.Cursor; +import org.apache.druid.query.rowsandcols.semantic.FrameMaker; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import java.io.Closeable; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -62,30 +57,45 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest mergeResults(QueryRunner runner) { - return new RowsAndColumnsUnravelingQueryRunner( - (queryPlus, responseContext) -> { - final WindowOperatorQuery query = (WindowOperatorQuery) queryPlus.getQuery(); - final List opFactories = query.getOperators(); - if (opFactories.isEmpty()) { - return runner.run(queryPlus, responseContext); - } - - Supplier opSupplier = () -> { - Operator retVal = new SequenceOperator( - runner.run( - queryPlus.withQuery(query.withOperators(new ArrayList())), - responseContext - ) - ); - for (OperatorFactory operatorFactory : opFactories) { - retVal = operatorFactory.wrap(retVal); - } - return retVal; - }; + QueryRunner baseRunner = (queryPlus, responseContext) -> { + final WindowOperatorQuery query = (WindowOperatorQuery) queryPlus.getQuery(); + final List opFactories = query.getOperators(); + if (opFactories.isEmpty()) { + return runner.run(queryPlus, responseContext); + } - return new OperatorSequence(opSupplier); + Supplier opSupplier = () -> { + Operator retVal = new SequenceOperator( + runner.run( + queryPlus.withQuery(query.withOperators(new ArrayList())), + responseContext + ) + ); + for (OperatorFactory operatorFactory : opFactories) { + retVal = operatorFactory.wrap(retVal); } - ); + return retVal; + }; + + return new OperatorSequence(opSupplier); + }; + + return (queryPlus, responseContext) -> { + final Query query = queryPlus.getQuery(); + final ResultSerializationMode serializationMode = query.context().getEnum( + ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, + ResultSerializationMode.class, + ResultSerializationMode.ROWS + ); + switch (serializationMode) { + case FRAMES: + return baseRunner.run(queryPlus, responseContext); + case ROWS: + return new RowsAndColumnsUnravelingQueryRunner(baseRunner).run(queryPlus, responseContext); + default: + throw DruidException.defensive("Serialization mode[%s] not supported", serializationMode); + } + }; } @Override @@ -136,26 +146,17 @@ public Optional> resultsAsFrames( boolean useNestedForUnknownTypes ) { - RowSignature rowSignature = resultArraySignature(query); - RowSignature modifiedRowSignature = useNestedForUnknownTypes - ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) - : rowSignature; - FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); - FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( - memoryAllocatorFactory, - modifiedRowSignature, - new ArrayList<>() - ); - Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence( - resultsAsArrays(query, resultSequence), - rowSignature + return Optional.of( + resultSequence.map( + rac -> { + FrameMaker frameMaker = FrameMaker.fromRAC(rac); + return new FrameSignaturePair( + frameMaker.toColumnBasedFrame(), + frameMaker.computeSignature() + ); + } + ) ); - Cursor cursor = cursorAndCloseable.lhs; - Closeable closeble = cursorAndCloseable.rhs; - - Sequence frames = FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).withBaggage(closeble); - - return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature))); } /** diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 990878eda6e3..323d0528bebe 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -56,6 +56,7 @@ import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.ResultLevelCachingQueryRunner; +import org.apache.druid.query.ResultSerializationMode; import org.apache.druid.query.RetryQueryRunner; import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; @@ -80,6 +81,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -359,7 +362,7 @@ private DataSource globalizeIfPossible( * @param dryRun if true, does not actually execute any subqueries, but will inline empty result sets. */ @SuppressWarnings({"rawtypes", "unchecked"}) // Subquery, toolchest, runner handling all use raw types - private DataSource inlineIfNecessary( + private DataSource inlineIfNecessary( final DataSource dataSource, @Nullable final QueryToolChest toolChestIfOutermost, final AtomicInteger subqueryRowLimitAccumulator, @@ -429,22 +432,18 @@ private DataSource inlineIfNecessary( } else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) { // Subquery needs to be inlined. Assign it a subquery id and run it. - final Sequence queryResults; + final Function, QueryRunner> queryRunnerSupplier; if (dryRun) { - queryResults = Sequences.empty(); + queryRunnerSupplier = query -> (queryPlus, responseContext) -> Sequences.empty(); } else { - final QueryRunner subqueryRunner = subQuery.getRunner(this); - queryResults = subqueryRunner.run( - QueryPlus.wrap(subQuery), - DirectDruidClient.makeResponseContextForQuery() - ); + queryRunnerSupplier = query -> query.getRunner(this); } return toInlineDataSource( subQuery, - queryResults, warehouse.getToolChest(subQuery), + queryRunnerSupplier, subqueryRowLimitAccumulator, subqueryMemoryLimitAccumulator, cannotMaterializeToFrames, @@ -647,14 +646,11 @@ private DataSource insertSubqueryIds( .collect(Collectors.toList())); } - /** - */ /** * * Convert the results of a particular query into a materialized (List-based) InlineDataSource. * * @param query the query - * @param results query results * @param toolChest toolchest for the query * @param limitAccumulator an accumulator for tracking the number of accumulated rows in all subqueries for a * particular master query @@ -671,8 +667,8 @@ private DataSource insertSubqueryIds( */ private static > DataSource toInlineDataSource( final QueryType query, - final Sequence results, final QueryToolChest toolChest, + final Function, QueryRunner> queryRunnerSupplier, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, final AtomicBoolean cannotMaterializeToFrames, @@ -697,8 +693,8 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( query, - results, toolChest, + queryRunnerSupplier, limitAccumulator, limit, subqueryStatsProvider, @@ -711,10 +707,12 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementQueriesExceedingByteLimit(); throw ResourceLimitExceededException.withMessage(byteLimitExceededMessage(memoryLimit)); } + AtomicReference> fallbackSequence = new AtomicReference<>(null); Optional maybeDataSource = materializeResultsAsFrames( query, - results, toolChest, + queryRunnerSupplier, + fallbackSequence, limitAccumulator, memoryLimitAccumulator, memoryLimit, @@ -734,7 +732,7 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); dataSource = materializeResultsAsArray( query, - results, + fallbackSequence.get(), toolChest, limitAccumulator, limit, @@ -759,8 +757,9 @@ private static > DataSource toInlineDataSource( */ private static > Optional materializeResultsAsFrames( final QueryType query, - final Sequence results, final QueryToolChest toolChest, + final Function, QueryRunner> queryRunnerSupplier, + final AtomicReference> resultSequence, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, final long memoryLimit, @@ -770,11 +769,20 @@ private static > Optional materializeR final ServiceEmitter emitter ) { - Optional> framesOptional; + Query queryWithSerializationParameter = query.withOverriddenContext( + Collections.singletonMap( + ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, + ResultSerializationMode.FRAMES.toString() + ) + ); + Sequence results = + queryRunnerSupplier + .apply(queryWithSerializationParameter) + .run(QueryPlus.wrap(queryWithSerializationParameter), DirectDruidClient.makeResponseContextForQuery()); boolean startedAccumulating = false; try { - framesOptional = toolChest.resultsAsFrames( + Optional> framesOptional = toolChest.resultsAsFrames( query, results, new ArenaMemoryAllocatorFactory(FRAME_SIZE), @@ -840,11 +848,49 @@ private static > Optional materializeR + "from the query context and/or the server config." ); } else { + resultSequence.set(results); return Optional.empty(); } } } + /** + * This method materializes the query results as {@code List} + */ + private static > DataSource materializeResultsAsArray( + final QueryType query, + final QueryToolChest toolChest, + final Function, QueryRunner> queryRunnerSupplier, + final AtomicInteger limitAccumulator, + final int limit, + final SubqueryCountStatsProvider subqueryStatsProvider, + boolean emitMetrics, + final ServiceEmitter emitter + ) + { + //noinspection unchecked + QueryType queryWithSerializationParameter = (QueryType) query.withOverriddenContext( + Collections.singletonMap( + ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, + ResultSerializationMode.ROWS.toString() + ) + ); + Sequence results = queryRunnerSupplier + .apply(queryWithSerializationParameter) + .run(QueryPlus.wrap(queryWithSerializationParameter), DirectDruidClient.makeResponseContextForQuery()); + + return materializeResultsAsArray( + queryWithSerializationParameter, + results, + toolChest, + limitAccumulator, + limit, + subqueryStatsProvider, + emitMetrics, + emitter + ); + } + /** * This method materializes the query results as {@code List} */ @@ -912,5 +958,4 @@ private static String rowLimitExceededMessage(final int rowLimitUsed) QueryContexts.MAX_SUBQUERY_ROWS_KEY ); } - } diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java index 6667cd961129..0435ed231935 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java @@ -19,6 +19,8 @@ package org.apache.druid.server; +import org.apache.druid.query.ResultSerializationMode; + /** * Utilities for {@link ClientQuerySegmentWalker} */ @@ -35,7 +37,13 @@ public enum SubqueryResultLimit * walker ensures that the cumulative number of rows of the results of subqueries of the given query donot exceed * the limit specified in the context or as the server default */ - ROW_LIMIT, + ROW_LIMIT { + @Override + public ResultSerializationMode serializationMode() + { + return ResultSerializationMode.ROWS; + } + }, /** * Subqueries limited by the BYTE_LIMIT are materialized as {@link org.apache.druid.frame.Frame}s on heap. Frames @@ -44,10 +52,18 @@ public enum SubqueryResultLimit * Frames in the broker memory) of a given query do not exceed the limit specified in the context or as the server * default */ - MEMORY_LIMIT + MEMORY_LIMIT { + @Override + public ResultSerializationMode serializationMode() + { + return ResultSerializationMode.FRAMES; + } + }; + + public abstract ResultSerializationMode serializationMode(); } - /** + /** * Returns the limit type to be used for a given subquery. * It returns MEMORY_LIMIT only if: * 1. The user has enabled the 'maxSubqueryBytes' explicitly in the query context or as the server default diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 467f375f9f7b..fa5585c374ed 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -52,6 +52,7 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.ResultSerializationMode; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; @@ -1721,7 +1722,8 @@ private static class ExpectedQuery .put(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true) .put(GroupingEngine.CTX_KEY_OUTERMOST, true) .put(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP, "1979") - .put(QueryContexts.QUERY_RESOURCE_ID, "dummy"); + .put(QueryContexts.QUERY_RESOURCE_ID, "dummy") + .put(ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, "blast"); modifiedQuery = query.withOverriddenContext(contextBuilder.build()); From ef52b84bb490384961969abafa265fc7d6c74ace Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 20 Aug 2024 10:47:53 +0530 Subject: [PATCH 3/9] review comments - remove function, don't wrap query runners --- .../WindowOperatorQueryQueryToolChest.java | 179 +++++++++--------- .../server/ClientQuerySegmentWalker.java | 75 ++------ 2 files changed, 110 insertions(+), 144 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index 56cce4a828da..1c99c0017261 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -29,7 +29,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.FrameSignaturePair; -import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -57,45 +56,30 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest mergeResults(QueryRunner runner) { - QueryRunner baseRunner = (queryPlus, responseContext) -> { - final WindowOperatorQuery query = (WindowOperatorQuery) queryPlus.getQuery(); - final List opFactories = query.getOperators(); - if (opFactories.isEmpty()) { - return runner.run(queryPlus, responseContext); - } + return new RowsAndColumnsSerializingQueryRunner( + (queryPlus, responseContext) -> { + final WindowOperatorQuery query = (WindowOperatorQuery) queryPlus.getQuery(); + final List opFactories = query.getOperators(); + if (opFactories.isEmpty()) { + return runner.run(queryPlus, responseContext); + } + + Supplier opSupplier = () -> { + Operator retVal = new SequenceOperator( + runner.run( + queryPlus.withQuery(query.withOperators(new ArrayList<>())), + responseContext + ) + ); + for (OperatorFactory operatorFactory : opFactories) { + retVal = operatorFactory.wrap(retVal); + } + return retVal; + }; - Supplier opSupplier = () -> { - Operator retVal = new SequenceOperator( - runner.run( - queryPlus.withQuery(query.withOperators(new ArrayList())), - responseContext - ) - ); - for (OperatorFactory operatorFactory : opFactories) { - retVal = operatorFactory.wrap(retVal); + return new OperatorSequence(opSupplier); } - return retVal; - }; - - return new OperatorSequence(opSupplier); - }; - - return (queryPlus, responseContext) -> { - final Query query = queryPlus.getQuery(); - final ResultSerializationMode serializationMode = query.context().getEnum( - ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, - ResultSerializationMode.class, - ResultSerializationMode.ROWS - ); - switch (serializationMode) { - case FRAMES: - return baseRunner.run(queryPlus, responseContext); - case ROWS: - return new RowsAndColumnsUnravelingQueryRunner(baseRunner).run(queryPlus, responseContext); - default: - throw DruidException.defensive("Serialization mode[%s] not supported", serializationMode); - } - }; + ); } @Override @@ -134,11 +118,12 @@ public Sequence resultsAsArrays( Sequence resultSequence ) { - // Dark magic; see RowsAndColumnsUnravelingQueryRunner. + // Dark magic; see RowsAndColumnsSerializingQueryRunner. return (Sequence) resultSequence; } @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public Optional> resultsAsFrames( WindowOperatorQuery query, Sequence resultSequence, @@ -146,25 +131,16 @@ public Optional> resultsAsFrames( boolean useNestedForUnknownTypes ) { - return Optional.of( - resultSequence.map( - rac -> { - FrameMaker frameMaker = FrameMaker.fromRAC(rac); - return new FrameSignaturePair( - frameMaker.toColumnBasedFrame(), - frameMaker.computeSignature() - ); - } - ) - ); + // see RowsAndColumnsSerializingQueryRunner + return Optional.of((Sequence) resultSequence); } /** - * This class exists to unravel the RowsAndColumns that are used in this query and make it the return Sequence - * actually be a Sequence of rows. This is relatively broken in a number of regards, the most obvious of which - * is that it is going to run counter to the stated class on the Generic of the QueryToolChest. That is, the - * code makes it look like you are getting a Sequence of RowsAndColumns, but, by using this, the query will - * actually ultimately produce a Sequence of Object[]. This works because of type Erasure in Java (it's all Object + * This class exists to serialize the RowsAndColumns that are used in this query and make it the return Sequence + * actually be a Sequence of rows or frames, as the query requires. + * This is relatively broken in a number of regards, the most obvious of which is that it is going to run counter to the stated class on the Generic of the QueryToolChest. + * That is, the code makes it look like you are getting a Sequence of RowsAndColumns, but, by using this, the query will + * actually ultimately produce a Sequence of Object[] or Frames. This works because of type Erasure in Java (it's all Object * at the end of the day). *

* While it might seem like this will break all sorts of things, the Generic type is actually there more as a type @@ -175,12 +151,12 @@ public Optional> resultsAsFrames( * Not our proudest moment, but we use the tools available to us. */ @SuppressWarnings({"unchecked", "rawtypes"}) - private static class RowsAndColumnsUnravelingQueryRunner implements QueryRunner + private static class RowsAndColumnsSerializingQueryRunner implements QueryRunner { private final QueryRunner baseQueryRunner; - private RowsAndColumnsUnravelingQueryRunner( + private RowsAndColumnsSerializingQueryRunner( QueryRunner baseQueryRunner ) { @@ -201,42 +177,71 @@ public Sequence run( queryPlus.withQuery(query.withOverriddenContext(ImmutableMap.of("unravel", false))), responseContext ); + final ResultSerializationMode serializationMode = query.context().getEnum( + ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, + ResultSerializationMode.class, + ResultSerializationMode.ROWS + ); + switch (serializationMode) { + case ROWS: + return asRows(baseSequence, query); + case FRAMES: + return asFrames(baseSequence); + default: + throw DruidException.defensive("Serialization mode[%s] not supported", serializationMode); + } + } - final RowSignature rowSignature = query.getRowSignature(); - return baseSequence.flatMap( - rac -> { - List results = new ArrayList<>(rac.numRows()); - - ColumnAccessor[] accessors = new ColumnAccessor[rowSignature.size()]; - int index = 0; - for (String columnName : rowSignature.getColumnNames()) { - final Column column = rac.findColumn(columnName); - if (column == null) { - final ColumnType columnType = rowSignature - .getColumnType(columnName) - .orElse(ColumnType.UNKNOWN_COMPLEX); - - accessors[index] = new NullColumn.Accessor(columnType, rac.numRows()); - } else { - accessors[index] = column.toAccessor(); - } - ++index; - } + return baseQueryRunner.run(queryPlus, responseContext); + } - for (int i = 0; i < rac.numRows(); ++i) { - Object[] objArr = new Object[accessors.length]; - for (int j = 0; j < accessors.length; j++) { - objArr[j] = accessors[j].getObject(i); - } - results.add(objArr); + private static Sequence asRows(final Sequence baseSequence, final WindowOperatorQuery query) + { + final RowSignature rowSignature = query.getRowSignature(); + return baseSequence.flatMap( + rac -> { + List results = new ArrayList<>(rac.numRows()); + + ColumnAccessor[] accessors = new ColumnAccessor[rowSignature.size()]; + int index = 0; + for (String columnName : rowSignature.getColumnNames()) { + final Column column = rac.findColumn(columnName); + if (column == null) { + final ColumnType columnType = rowSignature + .getColumnType(columnName) + .orElse(ColumnType.UNKNOWN_COMPLEX); + + accessors[index] = new NullColumn.Accessor(columnType, rac.numRows()); + } else { + accessors[index] = column.toAccessor(); } + ++index; + } - return Sequences.simple(results); + for (int i = 0; i < rac.numRows(); ++i) { + Object[] objArr = new Object[accessors.length]; + for (int j = 0; j < accessors.length; j++) { + objArr[j] = accessors[j].getObject(i); + } + results.add(objArr); } - ); - } - return baseQueryRunner.run(queryPlus, responseContext); + return Sequences.simple(results); + } + ); + } + + private static Sequence asFrames(final Sequence baseSequence) + { + return baseSequence.map( + rac -> { + FrameMaker frameMaker = FrameMaker.fromRAC(rac); + return new FrameSignaturePair( + frameMaker.toColumnBasedFrame(), + frameMaker.computeSignature() + ); + } + ); } } } diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 323d0528bebe..24bbfe3bf391 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -82,7 +82,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -432,18 +431,28 @@ private DataSource inlineIfNecessary( } else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) { // Subquery needs to be inlined. Assign it a subquery id and run it. - final Function, QueryRunner> queryRunnerSupplier; + final Sequence queryResults; if (dryRun) { - queryRunnerSupplier = query -> (queryPlus, responseContext) -> Sequences.empty(); + queryResults = Sequences.empty(); } else { - queryRunnerSupplier = query -> query.getRunner(this); + Query subQueryWithSerialization = subQuery.withOverriddenContext( + Collections.singletonMap( + ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, + ClientQuerySegmentWalkerUtils.getLimitType(maxSubqueryMemory, cannotMaterializeToFrames.get()) + .serializationMode() + .toString() + ) + ); + queryResults = subQueryWithSerialization + .getRunner(this) + .run(QueryPlus.wrap(subQueryWithSerialization), DirectDruidClient.makeResponseContextForQuery()); } return toInlineDataSource( subQuery, + queryResults, warehouse.getToolChest(subQuery), - queryRunnerSupplier, subqueryRowLimitAccumulator, subqueryMemoryLimitAccumulator, cannotMaterializeToFrames, @@ -667,8 +676,8 @@ private DataSource insertSubqueryIds( */ private static > DataSource toInlineDataSource( final QueryType query, + final Sequence queryResults, final QueryToolChest toolChest, - final Function, QueryRunner> queryRunnerSupplier, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, final AtomicBoolean cannotMaterializeToFrames, @@ -693,8 +702,8 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( query, + queryResults, toolChest, - queryRunnerSupplier, limitAccumulator, limit, subqueryStatsProvider, @@ -710,8 +719,8 @@ private static > DataSource toInlineDataSource( AtomicReference> fallbackSequence = new AtomicReference<>(null); Optional maybeDataSource = materializeResultsAsFrames( query, + queryResults, toolChest, - queryRunnerSupplier, fallbackSequence, limitAccumulator, memoryLimitAccumulator, @@ -757,8 +766,8 @@ private static > DataSource toInlineDataSource( */ private static > Optional materializeResultsAsFrames( final QueryType query, + final Sequence results, final QueryToolChest toolChest, - final Function, QueryRunner> queryRunnerSupplier, final AtomicReference> resultSequence, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, @@ -769,17 +778,6 @@ private static > Optional materializeR final ServiceEmitter emitter ) { - Query queryWithSerializationParameter = query.withOverriddenContext( - Collections.singletonMap( - ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, - ResultSerializationMode.FRAMES.toString() - ) - ); - Sequence results = - queryRunnerSupplier - .apply(queryWithSerializationParameter) - .run(QueryPlus.wrap(queryWithSerializationParameter), DirectDruidClient.makeResponseContextForQuery()); - boolean startedAccumulating = false; try { Optional> framesOptional = toolChest.resultsAsFrames( @@ -854,43 +852,6 @@ private static > Optional materializeR } } - /** - * This method materializes the query results as {@code List} - */ - private static > DataSource materializeResultsAsArray( - final QueryType query, - final QueryToolChest toolChest, - final Function, QueryRunner> queryRunnerSupplier, - final AtomicInteger limitAccumulator, - final int limit, - final SubqueryCountStatsProvider subqueryStatsProvider, - boolean emitMetrics, - final ServiceEmitter emitter - ) - { - //noinspection unchecked - QueryType queryWithSerializationParameter = (QueryType) query.withOverriddenContext( - Collections.singletonMap( - ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, - ResultSerializationMode.ROWS.toString() - ) - ); - Sequence results = queryRunnerSupplier - .apply(queryWithSerializationParameter) - .run(QueryPlus.wrap(queryWithSerializationParameter), DirectDruidClient.makeResponseContextForQuery()); - - return materializeResultsAsArray( - queryWithSerializationParameter, - results, - toolChest, - limitAccumulator, - limit, - subqueryStatsProvider, - emitMetrics, - emitter - ); - } - /** * This method materializes the query results as {@code List} */ From 350661e41fc6cc5ec2d0a86bf3b46749e54bfc0a Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 20 Aug 2024 11:26:25 +0530 Subject: [PATCH 4/9] comment --- .../query/operator/WindowOperatorQueryQueryToolChest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index 1c99c0017261..7fb67e8732db 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -195,6 +195,9 @@ public Sequence run( return baseQueryRunner.run(queryPlus, responseContext); } + /** + * Translates Sequence of RACs to a Sequence of Object[] + */ private static Sequence asRows(final Sequence baseSequence, final WindowOperatorQuery query) { final RowSignature rowSignature = query.getRowSignature(); @@ -231,6 +234,9 @@ private static Sequence asRows(final Sequence baseSequence, fina ); } + /** + * Translates a sequence of RACs to a Sequence of Frames + */ private static Sequence asFrames(final Sequence baseSequence) { return baseSequence.map( From 27ae254cf7fefdb2af5183a659d20adb829cdb6e Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 12 Sep 2024 17:30:55 +0530 Subject: [PATCH 5/9] review and comment --- .../apache/druid/query/ResultSerializationMode.java | 12 +++++++++++- .../druid/server/ClientQuerySegmentWalker.java | 7 +------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java b/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java index ca866d130997..3d42e8cbc828 100644 --- a/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java +++ b/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java @@ -19,11 +19,21 @@ package org.apache.druid.query; +/** + * Serialization medium of the query results on the broker. It is currently used to communicate the result's format between + * the main query processing walker and the individual toolchests while materializing subquery's rows + */ public enum ResultSerializationMode { + /** + * Materialize the inner results as rows + */ ROWS, + /** + * Materialize the inner results as frames + */ FRAMES; - public static String CTX_SERIALIZATION_PARAMETER = "serialization"; + public static final String CTX_SERIALIZATION_PARAMETER = "serialization"; } diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 24bbfe3bf391..37ae14f56c30 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -81,7 +81,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -716,12 +715,10 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementQueriesExceedingByteLimit(); throw ResourceLimitExceededException.withMessage(byteLimitExceededMessage(memoryLimit)); } - AtomicReference> fallbackSequence = new AtomicReference<>(null); Optional maybeDataSource = materializeResultsAsFrames( query, queryResults, toolChest, - fallbackSequence, limitAccumulator, memoryLimitAccumulator, memoryLimit, @@ -741,7 +738,7 @@ private static > DataSource toInlineDataSource( subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); dataSource = materializeResultsAsArray( query, - fallbackSequence.get(), + queryResults, toolChest, limitAccumulator, limit, @@ -768,7 +765,6 @@ private static > Optional materializeR final QueryType query, final Sequence results, final QueryToolChest toolChest, - final AtomicReference> resultSequence, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, final long memoryLimit, @@ -846,7 +842,6 @@ private static > Optional materializeR + "from the query context and/or the server config." ); } else { - resultSequence.set(results); return Optional.empty(); } } From b34f3be5d61d8f856874fa90c2288ae558d1f877 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 13 Sep 2024 18:18:11 +0530 Subject: [PATCH 6/9] tests fix --- .../sql/calcite/BaseCalciteQueryTest.java | 58 ++++++++++++++++++- .../sql/calcite/CalciteWindowQueryTest.java | 21 +++---- .../tests/window/arrayAggWithOrderBy.sqlTest | 4 +- .../tests/window/arrayConcatAgg.sqlTest | 4 +- 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index fcaaf0448247..b55650ddd51a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.Evals; +import org.apache.druid.math.expr.ExprEval; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; import org.apache.druid.query.JoinDataSource; @@ -982,13 +983,40 @@ void validate(int row, int column, ValueType type, Object expectedCell, Object r mismatchMessage(row, column), (Float) expectedCell, (Float) resultCell, - ASSERTION_EPSILON); + ASSERTION_EPSILON + ); } else if (expectedCell instanceof Double) { assertEquals( mismatchMessage(row, column), (Double) expectedCell, (Double) resultCell, - ASSERTION_EPSILON); + ASSERTION_EPSILON + ); + } else if (expectedCell instanceof Object[] || expectedCell instanceof List) { + final Object[] expectedCellCasted; + if (expectedCell instanceof Object[]) { + expectedCellCasted = (Object[]) expectedCell; + } else { + expectedCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; + } + final Object[] resultCellCasted; + if (resultCell instanceof List) { + resultCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; + } else { + resultCellCasted = (Object[]) resultCell; + } + if (expectedCellCasted.length != resultCellCasted.length) { + throw new RE( + "Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]", + expectedCellCasted, + expectedCellCasted.length, + resultCellCasted, + resultCellCasted.length + ); + } + for (int i = 0; i < expectedCellCasted.length; ++i) { + validate(row, column, type, expectedCellCasted[i], resultCellCasted[i]); + } } else { EQUALS.validate(row, column, type, expectedCell, resultCell); } @@ -1019,6 +1047,31 @@ void validate(int row, int column, ValueType type, Object expectedCell, Object r (Double) resultCell, eps ); + } else if (expectedCell instanceof Object[] || expectedCell instanceof List) { + final Object[] expectedCellCasted; + if (expectedCell instanceof Object[]) { + expectedCellCasted = (Object[]) expectedCell; + } else { + expectedCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; + } + final Object[] resultCellCasted; + if (resultCell instanceof List) { + resultCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; + } else { + resultCellCasted = (Object[]) resultCell; + } + if (expectedCellCasted.length != resultCellCasted.length) { + throw new RE( + "Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]", + expectedCellCasted, + expectedCellCasted.length, + resultCellCasted, + resultCellCasted.length + ); + } + for (int i = 0; i < expectedCellCasted.length; ++i) { + validate(row, column, type, expectedCellCasted[i], resultCellCasted[i]); + } } else { EQUALS.validate(row, column, type, expectedCell, resultCell); } @@ -1031,7 +1084,6 @@ private static String mismatchMessage(int row, int column) { return StringUtils.format("column content mismatch at %d,%d", row, column); } - } /** diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index c9ea366271ea..2d0969180481 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -71,7 +71,8 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest private static final Map DEFAULT_QUERY_CONTEXT = ImmutableMap.of( PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.ENABLE_DEBUG, true + QueryContexts.ENABLE_DEBUG, true, + QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false ); private static final Map DEFAULT_QUERY_CONTEXT_WITH_SUBQUERY_BYTES = @@ -168,7 +169,7 @@ public void verifyResults(QueryResults results) throws Exception } } } - assertResultsValid(ResultMatchMode.RELAX_NULLS, input.expectedResults, results); + assertResultsValid(ResultMatchMode.EQUALS_RELATIVE_1000_ULPS, input.expectedResults, results); } private void validateOperators(List expectedOperators, List currentOperators) @@ -268,14 +269,14 @@ public void testWithArrayConcat() .expectedResults( ResultMatchMode.RELAX_NULLS, ImmutableList.of( - new Object[]{"Austria", null, "#de.wikipedia", "[\"abc\",\"#de.wikipedia\"]"}, - new Object[]{"Republic of Korea", null, "#en.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Republic of Korea", null, "#ja.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Republic of Korea", null, "#ko.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", "[\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Austria", "Vienna", "#de.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"}, - new Object[]{"Austria", "Vienna", "#es.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"}, - new Object[]{"Austria", "Vienna", "#tr.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"} + new Object[]{"Austria", null, "#de.wikipedia", ImmutableList.of("abc", "#de.wikipedia")}, + new Object[]{"Republic of Korea", null, "#en.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")}, + new Object[]{"Republic of Korea", null, "#ja.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")}, + new Object[]{"Republic of Korea", null, "#ko.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")}, + new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", ImmutableList.of("abc", "#ko.wikipedia")}, + new Object[]{"Austria", "Vienna", "#de.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")}, + new Object[]{"Austria", "Vienna", "#es.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")}, + new Object[]{"Austria", "Vienna", "#tr.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")} ) ) .run(); diff --git a/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest b/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest index bee3baeac0c4..3cbdf42af6fd 100644 --- a/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest @@ -10,5 +10,5 @@ sql: | ORDER BY d1, f1, m1 expectedResults: - - [2,"[1.0]","[1.0]","[1.0]"] - - [2,"[1.7]","[0.1]","[2.0]"] + - [2,[1.0],[1.0],[1.0]] + - [2,[1.7],[0.1],[2.0]] diff --git a/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest b/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest index 9ec451a94d94..2a648abc17bf 100644 --- a/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest @@ -9,5 +9,5 @@ sql: | GROUP BY cityName expectedResults: - - ["Horsching","[\"Horsching\"]"] - - ["Vienna","[\"Vienna\"]"] + - ["Horsching",["Horsching"]] + - ["Vienna",["Vienna"]] From d209f4a76313c5812644439a044f1a592d8718d6 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 16 Sep 2024 10:29:51 +0530 Subject: [PATCH 7/9] more tests, cov --- ...WindowOperatorQueryQueryToolChestTest.java | 162 ++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java diff --git a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java new file mode 100644 index 000000000000..2100b36e57dc --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.testutil.FrameTestUtil; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.Druids; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.ResultSerializationMode; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.LegacySegmentSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +public class WindowOperatorQueryQueryToolChestTest extends InitializedNullHandlingTest +{ + + private final WindowOperatorQueryQueryToolChest toolchest = new WindowOperatorQueryQueryToolChest(); + + @Test + public void mergeResultsWithRowResultSerializationMode() + { + RowSignature inputSignature = RowSignature.builder() + .add("length", ColumnType.LONG) + .build(); + RowSignature outputSignature = RowSignature.builder() + .addAll(inputSignature) + .add("w0", ColumnType.LONG) + .build(); + + final WindowOperatorQuery query = new WindowOperatorQuery( + new QueryDataSource( + Druids.newScanQueryBuilder() + .dataSource(new TableDataSource("test")) + .intervals(new LegacySegmentSpec(Intervals.ETERNITY)) + .columns("length") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(new HashMap<>()) + .build() + ), + new LegacySegmentSpec(Intervals.ETERNITY), + new HashMap<>(), + outputSignature, + ImmutableList.of( + new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) + ), + ImmutableList.of() + ); + List results = toolchest.mergeResults( + (queryPlus, responseContext) -> Sequences.simple( + Collections.singletonList( + MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of("length", new IntArrayColumn(new int[]{1, 5, 10})) + ) + ) + ) + ).run(QueryPlus.wrap(query)).toList(); + + Assert.assertTrue(results.get(0) instanceof Object[]); + Assert.assertEquals(3, results.size()); + List expectedResults = ImmutableList.of( + new Object[]{1, 1}, + new Object[]{5, 2}, + new Object[]{10, 3} + ); + + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals(expectedResults.get(i), (Object[]) results.get(i)); + } + } + + @Test + public void mergeResultsWithFrameResultSerializationMode() + { + RowSignature inputSignature = RowSignature.builder() + .add("length", ColumnType.LONG) + .build(); + RowSignature outputSignature = RowSignature.builder() + .addAll(inputSignature) + .add("w0", ColumnType.LONG) + .build(); + + final WindowOperatorQuery query = new WindowOperatorQuery( + new QueryDataSource( + Druids.newScanQueryBuilder() + .dataSource(new TableDataSource("test")) + .intervals(new LegacySegmentSpec(Intervals.ETERNITY)) + .columns("length") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(new HashMap<>()) + .build() + ), + new LegacySegmentSpec(Intervals.ETERNITY), + Collections.singletonMap(ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, ResultSerializationMode.FRAMES.toString()), + outputSignature, + ImmutableList.of( + new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) + ), + ImmutableList.of() + ); + List results = toolchest.mergeResults( + (queryPlus, responseContext) -> Sequences.simple( + Collections.singletonList( + MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of("length", new IntArrayColumn(new int[]{1, 5, 10})) + ) + ) + ) + ).run(QueryPlus.wrap(query)).toList(); + + Assert.assertTrue(results.get(0) instanceof FrameSignaturePair); + Assert.assertEquals(1, results.size()); + + FrameReader reader = FrameReader.create(((FrameSignaturePair) results.get(0)).getRowSignature()); + List> resultRows = FrameTestUtil.readRowsFromCursorFactory( + reader.makeCursorFactory(((FrameSignaturePair) results.get(0)).getFrame()) + ).toList(); + + List> expectedResults = ImmutableList.of( + ImmutableList.of(1L, 1L), + ImmutableList.of(5L, 2L), + ImmutableList.of(10L, 3L) + ); + Assertions.assertEquals(expectedResults, resultRows); + } +} From da9c53373b97501b9aaa20964e0e6c53c2683059 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 16 Sep 2024 11:14:24 +0530 Subject: [PATCH 8/9] test fix, add new match mode --- .../sql/calcite/BaseCalciteQueryTest.java | 19 +++++++++++++++++++ .../sql/calcite/CalciteWindowQueryTest.java | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index b55650ddd51a..ae4100875508 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -1076,6 +1076,25 @@ void validate(int row, int column, ValueType type, Object expectedCell, Object r EQUALS.validate(row, column, type, expectedCell, resultCell); } } + }, + + /** + * Relax nulls which accepts 1000 units of least precision. + */ + RELAX_NULLS_RELATIVE_1000_ULPS { + static final int ASSERTION_ERROR_ULPS = 1000; + + @Override + void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell) + { + if (expectedCell == null) { + if (resultCell == null) { + return; + } + expectedCell = NullHandling.defaultValueForType(type); + } + EQUALS_RELATIVE_1000_ULPS.validate(row, column, type, expectedCell, resultCell); + } }; abstract void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index 2d0969180481..5e413f1a2c2a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -169,7 +169,7 @@ public void verifyResults(QueryResults results) throws Exception } } } - assertResultsValid(ResultMatchMode.EQUALS_RELATIVE_1000_ULPS, input.expectedResults, results); + assertResultsValid(ResultMatchMode.RELAX_NULLS_RELATIVE_1000_ULPS, input.expectedResults, results); } private void validateOperators(List expectedOperators, List currentOperators) From 0741dc6dd7460c51fa918da8c01f5060155a30fc Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 16 Sep 2024 22:20:12 +0530 Subject: [PATCH 9/9] more changes --- .../sql/calcite/BaseCalciteQueryTest.java | 67 ++++++++++--------- .../sql/calcite/CalciteWindowQueryTest.java | 2 +- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index ae4100875508..f310edae3913 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -35,6 +35,7 @@ import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.hll.VersionOneHyperLogLogCollector; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; @@ -993,24 +994,14 @@ void validate(int row, int column, ValueType type, Object expectedCell, Object r ASSERTION_EPSILON ); } else if (expectedCell instanceof Object[] || expectedCell instanceof List) { - final Object[] expectedCellCasted; - if (expectedCell instanceof Object[]) { - expectedCellCasted = (Object[]) expectedCell; - } else { - expectedCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; - } - final Object[] resultCellCasted; - if (resultCell instanceof List) { - resultCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; - } else { - resultCellCasted = (Object[]) resultCell; - } + final Object[] expectedCellCasted = homogenizeArray(expectedCell); + final Object[] resultCellCasted = homogenizeArray(resultCell); if (expectedCellCasted.length != resultCellCasted.length) { throw new RE( "Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]", - expectedCellCasted, + Arrays.toString(expectedCellCasted), expectedCellCasted.length, - resultCellCasted, + Arrays.toString(resultCellCasted), resultCellCasted.length ); } @@ -1022,11 +1013,26 @@ void validate(int row, int column, ValueType type, Object expectedCell, Object r } } }, + + RELAX_NULLS_EPS { + @Override + void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell) + { + if (expectedCell == null) { + if (resultCell == null) { + return; + } + expectedCell = NullHandling.defaultValueForType(type); + } + EQUALS_EPS.validate(row, column, type, expectedCell, resultCell); + } + }, + /** * Comparision which accepts 1000 units of least precision. */ EQUALS_RELATIVE_1000_ULPS { - static final int ASSERTION_ERROR_ULPS = 1000; + private static final int ASSERTION_ERROR_ULPS = 1000; @Override void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell) @@ -1048,24 +1054,15 @@ void validate(int row, int column, ValueType type, Object expectedCell, Object r eps ); } else if (expectedCell instanceof Object[] || expectedCell instanceof List) { - final Object[] expectedCellCasted; - if (expectedCell instanceof Object[]) { - expectedCellCasted = (Object[]) expectedCell; - } else { - expectedCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; - } - final Object[] resultCellCasted; - if (resultCell instanceof List) { - resultCellCasted = ExprEval.coerceListToArray((List) resultCell, true).rhs; - } else { - resultCellCasted = (Object[]) resultCell; - } + final Object[] expectedCellCasted = homogenizeArray(expectedCell); + final Object[] resultCellCasted = homogenizeArray(resultCell); + if (expectedCellCasted.length != resultCellCasted.length) { throw new RE( "Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]", - expectedCellCasted, + Arrays.toString(expectedCellCasted), expectedCellCasted.length, - resultCellCasted, + Arrays.toString(resultCellCasted), resultCellCasted.length ); } @@ -1082,8 +1079,6 @@ void validate(int row, int column, ValueType type, Object expectedCell, Object r * Relax nulls which accepts 1000 units of least precision. */ RELAX_NULLS_RELATIVE_1000_ULPS { - static final int ASSERTION_ERROR_ULPS = 1000; - @Override void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell) { @@ -1103,6 +1098,16 @@ private static String mismatchMessage(int row, int column) { return StringUtils.format("column content mismatch at %d,%d", row, column); } + + private static Object[] homogenizeArray(Object array) + { + if (array instanceof Object[]) { + return (Object[]) array; + } else if (array instanceof List) { + return ExprEval.coerceListToArray((List) array, true).rhs; + } + throw new ISE("Found array[%s] of type[%s] which is not handled", array.toString(), array.getClass().getName()); + } } /** diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index 5e413f1a2c2a..5850be0bd1c5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -169,7 +169,7 @@ public void verifyResults(QueryResults results) throws Exception } } } - assertResultsValid(ResultMatchMode.RELAX_NULLS_RELATIVE_1000_ULPS, input.expectedResults, results); + assertResultsValid(ResultMatchMode.RELAX_NULLS_EPS, input.expectedResults, results); } private void validateOperators(List expectedOperators, List currentOperators)