diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 98e4ff340dd8..43f87513659c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -51,7 +51,6 @@ import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; -import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.groupby.GroupByQuery; @@ -61,7 +60,6 @@ import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.query.select.PagingSpec; import org.apache.druid.query.select.SelectQuery; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.topn.DimensionTopNMetricSpec; @@ -93,7 +91,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.stream.Collectors; /** * A fully formed Druid query, built from a {@link PartialDruidQuery}. The work to develop this query is done @@ -145,7 +142,13 @@ public DruidQuery( // Now the fun begins. this.filter = computeWhereFilter(partialQuery, plannerContext, sourceQuerySignature); this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceQuerySignature); - this.grouping = computeGrouping(partialQuery, plannerContext, sourceQuerySignature, rexBuilder, finalizeAggregations); + this.grouping = computeGrouping( + partialQuery, + plannerContext, + sourceQuerySignature, + rexBuilder, + finalizeAggregations + ); final RowSignature sortingInputRowSignature; if (this.selectProjection != null) { @@ -436,9 +439,9 @@ private static ProjectRowOrderAndPostAggregations computePostAggregations( /** * Returns dimensions corresponding to {@code aggregate.getGroupSet()}, in the same order. * - * @param partialQuery partial query - * @param plannerContext planner context - * @param querySignature source row signature and re-usable virtual column references + * @param partialQuery partial query + * @param plannerContext planner context + * @param querySignature source row signature and re-usable virtual column references * * @return dimensions * @@ -452,12 +455,19 @@ private static List computeDimensions( { final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); final List dimensions = new ArrayList<>(); - final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(querySignature.getRowSignature().getRowOrder())); + final String outputNamePrefix = Calcites.findUnusedPrefix( + "d", + new TreeSet<>(querySignature.getRowSignature().getRowOrder()) + ); int outputNameCounter = 0; for (int i : aggregate.getGroupSet()) { // Dimension might need to create virtual columns. Avoid giving it a name that would lead to colliding columns. - final RexNode rexNode = Expressions.fromFieldAccess(querySignature.getRowSignature(), partialQuery.getSelectProject(), i); + final RexNode rexNode = Expressions.fromFieldAccess( + querySignature.getRowSignature(), + partialQuery.getSelectProject(), + i + ); final DruidExpression druidExpression = Expressions.toDruidExpression( plannerContext, querySignature.getRowSignature(), @@ -478,7 +488,11 @@ private static List computeDimensions( final String dimOutputName; if (!druidExpression.isSimpleExtraction()) { - virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(plannerContext, druidExpression, sqlTypeName); + virtualColumn = querySignature.getOrCreateVirtualColumnForExpression( + plannerContext, + druidExpression, + sqlTypeName + ); dimOutputName = virtualColumn.getOutputName(); } else { dimOutputName = outputNamePrefix + outputNameCounter++; @@ -515,7 +529,10 @@ private static List computeAggregations( { final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); final List aggregations = new ArrayList<>(); - final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(querySignature.getRowSignature().getRowOrder())); + final String outputNamePrefix = Calcites.findUnusedPrefix( + "a", + new TreeSet<>(querySignature.getRowSignature().getRowOrder()) + ); for (int i = 0; i < aggregate.getAggCallList().size(); i++) { final String aggName = outputNamePrefix + i; @@ -745,11 +762,6 @@ private Query computeQuery() return scanQuery; } - final SelectQuery selectQuery = toSelectQuery(); - if (selectQuery != null) { - return selectQuery; - } - throw new CannotBuildQueryException("Cannot convert query parts into an actual query"); } @@ -966,9 +978,12 @@ public ScanQuery toScanQuery() // Scan cannot GROUP BY. return null; } - - if (limitSpec != null && limitSpec.getColumns().size() > 0) { - // Scan cannot ORDER BY. + if (limitSpec != null && + (limitSpec.getColumns().size() > 1 + || (limitSpec.getColumns().size() == 1 && !Iterables.getOnlyElement(limitSpec.getColumns()) + .getDimension() + .equals(ColumnHolder.TIME_COLUMN_NAME)))) { + // Scan cannot ORDER BY non-time columns. return null; } @@ -984,6 +999,15 @@ public ScanQuery toScanQuery() ? 0L : (long) limitSpec.getLimit(); + ScanQuery.Order order; + if (limitSpec == null || limitSpec.getColumns().size() == 0) { + order = ScanQuery.Order.NONE; + } else if (limitSpec.getColumns().get(0).getDirection() == OrderByColumnSpec.Direction.ASCENDING) { + order = ScanQuery.Order.ASCENDING; + } else { + order = ScanQuery.Order.DESCENDING; + } + return new ScanQuery( dataSource, filtration.getQuerySegmentSpec(), @@ -991,90 +1015,11 @@ public ScanQuery toScanQuery() ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST, 0, scanLimit, - null, // Will default to "none" + order, // Will default to "none" filtration.getDimFilter(), Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())), false, ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) ); } - - /** - * Return this query as a Select query, or null if this query is not compatible with Select. - * - * @return query or null - */ - @Nullable - public SelectQuery toSelectQuery() - { - if (grouping != null) { - return null; - } - - final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature); - final boolean descending; - final int threshold; - - if (limitSpec != null) { - // Safe to assume limitSpec has zero or one entry; DruidSelectSortRule wouldn't push in anything else. - if (limitSpec.getColumns().size() == 0) { - descending = false; - } else if (limitSpec.getColumns().size() == 1) { - final OrderByColumnSpec orderBy = Iterables.getOnlyElement(limitSpec.getColumns()); - if (!orderBy.getDimension().equals(ColumnHolder.TIME_COLUMN_NAME)) { - // Select cannot handle sorting on anything other than __time. - return null; - } - descending = orderBy.getDirection() == OrderByColumnSpec.Direction.DESCENDING; - } else { - // Select cannot handle sorting on more than one column. - return null; - } - - threshold = limitSpec.getLimit(); - } else { - descending = false; - threshold = 0; - } - - // We need to ask for dummy columns to prevent Select from returning all of them. - String dummyColumn = "dummy"; - while (sourceQuerySignature.getRowSignature().getColumnType(dummyColumn) != null - || outputRowSignature.getRowOrder().contains(dummyColumn)) { - dummyColumn = dummyColumn + "_"; - } - - final List metrics = new ArrayList<>(); - - if (selectProjection != null) { - metrics.addAll(selectProjection.getDirectColumns()); - metrics.addAll(selectProjection.getVirtualColumns() - .stream() - .map(VirtualColumn::getOutputName) - .collect(Collectors.toList())); - } else { - // No projection, rowOrder should reference direct columns. - metrics.addAll(outputRowSignature.getRowOrder()); - } - - if (metrics.isEmpty()) { - metrics.add(dummyColumn); - } - - // Not used for actual queries (will be replaced by QueryMaker) but the threshold is important for the planner. - final PagingSpec pagingSpec = new PagingSpec(null, threshold); - - return new SelectQuery( - dataSource, - filtration.getQuerySegmentSpec(), - descending, - filtration.getDimFilter(), - Granularities.ALL, - ImmutableList.of(new DefaultDimensionSpec(dummyColumn, dummyColumn)), - metrics.stream().sorted().distinct().collect(Collectors.toList()), - getVirtualColumns(true), - pagingSpec, - ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) - ); - } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 687887cc5b62..03f78141cc47 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -21,7 +21,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -62,7 +61,6 @@ import org.apache.druid.query.lookup.RegisteredLookupExtractionFn; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.query.select.PagingSpec; import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.InvertedTopNMetricSpec; import org.apache.druid.query.topn.NumericTopNMetricSpec; @@ -157,7 +155,9 @@ public void testSelectCountStart() throws Exception CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of(Druids.newTimeseriesQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Intervals.of("2999-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z"))) + .intervals(querySegmentSpec(Intervals.of( + "2999-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z")) + ) .granularity(Granularities.ALL) .aggregators(aggregators( new CountAggregatorFactory("a0"), @@ -308,7 +308,7 @@ public void testInformationSchemaTables() throws Exception .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) - .build() + .build() ); } @@ -443,7 +443,16 @@ public void testSelectStar() throws Exception new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, hyperLogLogCollectorClassName}, new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, hyperLogLogCollectorClassName}, new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5.0, hyperLogLogCollectorClassName}, - new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6.0, hyperLogLogCollectorClassName} + new Object[]{ + timestamp("2001-01-03"), + 1L, + "abc", + NULL_VALUE, + NULL_VALUE, + 6f, + 6.0, + hyperLogLogCollectorClassName + } ) ); } @@ -576,16 +585,15 @@ public void testSelectStarWithLimitTimeDescending() throws Exception "SELECT * FROM druid.foo ORDER BY __time DESC LIMIT 2", CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")) - .descending(true) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build() + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")) + .limit(2) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6d, HLLC_STRING}, @@ -603,32 +611,15 @@ public void testSelectStarWithoutLimitTimeAscending() throws Exception "SELECT * FROM druid.foo ORDER BY __time", CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")) - .descending(false) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")) - .descending(false) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 5), - 1000, - true - ) - ) - .context(QUERY_CONTEXT_DEFAULT) - .build() + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")) + .limit(Long.MAX_VALUE) + .order(ScanQuery.Order.ASCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, HLLC_STRING}, @@ -669,17 +660,15 @@ public void testSelectSingleColumnWithLimitDescending() throws Exception testQuery( "SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 2", ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .dimensionSpecs(dimensions(new DefaultDimensionSpec("dim1", "d1"))) - .granularity(Granularities.ALL) - .descending(true) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "dim1")) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build() + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .limit(2) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{"abc"}, @@ -1714,8 +1703,8 @@ public void testEmptyStringEquality() throws Exception .intervals(querySegmentSpec(Filtration.eternity())) .granularity(Granularities.ALL) .filters(expressionFilter("case_searched((\"dim2\" == 'a')," - + (NullHandling.replaceWithDefault() ? "1" : "0") - + ",(\"dim2\" == ''))")) + + (NullHandling.replaceWithDefault() ? "1" : "0") + + ",(\"dim2\" == ''))")) .aggregators(aggregators(new CountAggregatorFactory("a0"))) .context(TIMESERIES_CONTEXT_DEFAULT) .build() @@ -1743,8 +1732,8 @@ public void testNullStringEquality() throws Exception .intervals(querySegmentSpec(Filtration.eternity())) .granularity(Granularities.ALL) .filters(expressionFilter("case_searched((\"dim2\" == 'a')," - + (NullHandling.replaceWithDefault() ? "1" : "0") - + ",(\"dim2\" == null))")) + + (NullHandling.replaceWithDefault() ? "1" : "0") + + ",(\"dim2\" == null))")) .aggregators(aggregators(new CountAggregatorFactory("a0"))) .context(TIMESERIES_CONTEXT_DEFAULT) .build() @@ -2022,7 +2011,9 @@ public void testCountNullableExpression() throws Exception .virtualColumns( expressionVirtualColumn( "v0", - "case_searched((\"dim2\" == 'abc'),'yes',(\"dim2\" == 'def'),'yes'," + DruidExpression.nullLiteral() + ")", + "case_searched((\"dim2\" == 'abc'),'yes',(\"dim2\" == 'def'),'yes'," + + DruidExpression.nullLiteral() + + ")", ValueType.STRING ) ) @@ -2303,7 +2294,8 @@ public void testFilterOnStringAsNumber() throws Exception expressionVirtualColumn( "v0", "floor(CAST(\"dim1\", 'DOUBLE'))", - ValueType.DOUBLE) + ValueType.DOUBLE + ) ) .setDimFilter( or( @@ -4296,7 +4288,11 @@ public void testMinMaxAvgDailyCountWithLimit() throws Exception ValueType.LONG ) ) - .setDimensions(dimensions(new DefaultDimensionSpec("v0", "v0", ValueType.LONG))) + .setDimensions(dimensions(new DefaultDimensionSpec( + "v0", + "v0", + ValueType.LONG + ))) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -4355,13 +4351,21 @@ public void testAvgDailyCountDistinct() throws Exception ValueType.LONG ) ) - .setDimensions(dimensions(new DefaultDimensionSpec("v0", "v0", ValueType.LONG))) + .setDimensions(dimensions(new DefaultDimensionSpec( + "v0", + "v0", + ValueType.LONG + ))) .setAggregatorSpecs( aggregators( new CardinalityAggregatorFactory( "a0:a", null, - dimensions(new DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)), + dimensions(new DefaultDimensionSpec( + "cnt", + "cnt", + ValueType.LONG + )), false, true ) @@ -7471,7 +7475,10 @@ public void testRequireTimeConditionPositive() throws Exception ImmutableList.of( GroupByQuery.builder() .setDataSource(CalciteTests.DATASOURCE1) - .setInterval(querySegmentSpec(Intervals.utc(DateTimes.of("2000-01-01").getMillis(), JodaUtils.MAX_INSTANT))) + .setInterval(querySegmentSpec(Intervals.utc( + DateTimes.of("2000-01-01").getMillis(), + JodaUtils.MAX_INSTANT + ))) .setGranularity(Granularities.ALL) .setDimFilter(not(selector("dim1", "", null))) .setDimensions(dimensions(new ExtractionDimensionSpec( @@ -7483,7 +7490,10 @@ public void testRequireTimeConditionPositive() throws Exception .build(), Druids.newTimeseriesQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Intervals.utc(DateTimes.of("2000-01-01").getMillis(), JodaUtils.MAX_INSTANT))) + .intervals(querySegmentSpec(Intervals.utc( + DateTimes.of("2000-01-01").getMillis(), + JodaUtils.MAX_INSTANT + ))) .granularity(Granularities.ALL) .filters(in( "dim2", @@ -7630,8 +7640,8 @@ public void testTrigonometricFunction() throws Exception PLANNER_CONFIG_DEFAULT, QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS, "SELECT exp(count(*)) + 10, sin(pi / 6), cos(pi / 6), tan(pi / 6), cot(pi / 6)," + - "asin(exp(count(*)) / 2), acos(exp(count(*)) / 2), atan(exp(count(*)) / 2), atan2(exp(count(*)), 1) " + - "FROM druid.foo WHERE dim2 = 0", + "asin(exp(count(*)) / 2), acos(exp(count(*)) / 2), atan(exp(count(*)) / 2), atan2(exp(count(*)), 1) " + + "FROM druid.foo WHERE dim2 = 0", CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of(Druids.newTimeseriesQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index de8cf2a0b38d..af0e89454328 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -19,15 +19,17 @@ package org.apache.druid.sql.calcite.util; -import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.io.Closeables; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.FunctionalIterable; -import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; @@ -39,15 +41,15 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; -import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; @@ -98,9 +100,16 @@ public QueryRunner getQueryRunnerForIntervals( final Iterable intervals ) { - final QueryRunnerFactory> factory = conglomerate.findFactory(query); + Query newQuery = query; + if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) { + newQuery = (Query) Druids.ScanQueryBuilder.copy((ScanQuery) query) + .intervals(new MultipleSpecificSegmentSpec(ImmutableList.of())) + .build(); + } + + final QueryRunnerFactory> factory = conglomerate.findFactory(newQuery); if (factory == null) { - throw new ISE("Unknown query type[%s].", query.getClass()); + throw new ISE("Unknown query type[%s].", newQuery.getClass()); } final QueryToolChest> toolChest = factory.getToolchest(); @@ -109,56 +118,46 @@ public QueryRunner getQueryRunnerForIntervals( toolChest.postMergeQueryDecoration( toolChest.mergeResults( toolChest.preMergeQueryDecoration( - new QueryRunner() - { - @Override - public Sequence run(QueryPlus queryPlus, Map responseContext) - { - Query query = queryPlus.getQuery(); - final VersionedIntervalTimeline timeline = getTimelineForTableDataSource(query); - return makeBaseRunner( - query, - toolChest, - factory, - FunctionalIterable - .create(intervals) - .transformCat( - new Function>>() - { - @Override - public Iterable> apply(final Interval interval) - { - return timeline.lookup(interval); - } - } - ) - .transformCat( - new Function, Iterable>() - { - @Override - public Iterable apply(final TimelineObjectHolder holder) - { - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, SegmentDescriptor>() - { - @Override - public SegmentDescriptor apply(final PartitionChunk chunk) - { - return new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - chunk.getChunkNumber() - ); - } - } - ); - } - } - ) - ).run(queryPlus, responseContext); + (queryPlus, responseContext) -> { + Query query1 = queryPlus.getQuery(); + Query newQuery1 = query1; + if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) { + newQuery1 = (Query) Druids.ScanQueryBuilder.copy((ScanQuery) query) + .intervals(new MultipleSpecificSegmentSpec( + ImmutableList.of(new SegmentDescriptor( + Intervals.of("2015-04-12/2015-04-13"), + "4", + 0 + )))) + .context(ImmutableMap.of( + ScanQuery.CTX_KEY_OUTERMOST, + false + )) + .build(); } + final VersionedIntervalTimeline timeline = getTimelineForTableDataSource( + newQuery1); + return makeBaseRunner( + newQuery1, + toolChest, + factory, + FunctionalIterable + .create(intervals) + .transformCat( + interval -> timeline.lookup(interval) + ) + .transformCat( + holder -> FunctionalIterable + .create(holder.getObject()) + .transform( + chunk -> new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + chunk.getChunkNumber() + ) + ) + ) + ).run(QueryPlus.wrap(newQuery1), responseContext); } ) ) @@ -228,31 +227,19 @@ private QueryRunner makeBaseRunner( FunctionalIterable .create(specs) .transformCat( - new Function>>() - { - @Override - public Iterable> apply(final SegmentDescriptor descriptor) - { - final PartitionHolder holder = timeline.findEntry( - descriptor.getInterval(), - descriptor.getVersion() - ); - - return Iterables.transform( - holder, - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(PartitionChunk chunk) - { - return new SpecificSegmentQueryRunner( - factory.createRunner(chunk.getObject()), - new SpecificSegmentSpec(descriptor) - ); - } - } - ); - } + descriptor -> { + final PartitionHolder holder = timeline.findEntry( + descriptor.getInterval(), + descriptor.getVersion() + ); + + return Iterables.transform( + holder, + chunk -> new SpecificSegmentQueryRunner( + factory.createRunner(chunk.getObject()), + new SpecificSegmentSpec(descriptor) + ) + ); } ) )