Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,14 @@ public enum Stage

// WHERE_FILTER, SELECT_PROJECT may be present on any query, except ones with WINDOW.
WHERE_FILTER,
SELECT_PROJECT,
SELECT_PROJECT {
@Override
public boolean canFollow(Stage stage)
{
// SELECT_PROJECT can be stacked on top of another SELECT_PROJECT.
return stage.compareTo(this) <= 0;
}
},

// AGGREGATE, HAVING_FILTER, AGGREGATE_PROJECT can be present on non-WINDOW aggregating queries.
AGGREGATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,31 @@ public DruidOuterQueryRule(
@Override
public boolean matches(final RelOptRuleCall call)
{
// Only consider doing a subquery when the stage cannot be fused into a single query.
final DruidRel<?> druidRel = call.rel(call.getRelList().size() - 1);
return !stage.canFollow(druidRel.getPartialDruidQuery().stage());
final DruidRel<?> lowerDruidRel = call.rel(call.getRelList().size() - 1);
final RelNode lowerRel = lowerDruidRel.getPartialDruidQuery().leafRel();
final PartialDruidQuery.Stage lowerStage = lowerDruidRel.getPartialDruidQuery().stage();

if (stage.canFollow(lowerStage)
|| (stage == PartialDruidQuery.Stage.WHERE_FILTER
&& PartialDruidQuery.Stage.HAVING_FILTER.canFollow(lowerStage))
|| (stage == PartialDruidQuery.Stage.SELECT_PROJECT
&& PartialDruidQuery.Stage.SORT_PROJECT.canFollow(lowerStage))) {
// Don't consider cases that can be fused into a single query.
return false;
} else if (stage == PartialDruidQuery.Stage.WHERE_FILTER && lowerRel instanceof Filter) {
// Don't consider filter-on-filter. FilterMergeRule will handle it.
return false;
} else if (stage == PartialDruidQuery.Stage.WHERE_FILTER
&& lowerStage == PartialDruidQuery.Stage.SELECT_PROJECT) {
// Don't consider filter-on-project. ProjectFilterTransposeRule will handle it by swapping them.
return false;
} else if (stage == PartialDruidQuery.Stage.SELECT_PROJECT && lowerRel instanceof Project) {
// Don't consider project-on-project. ProjectMergeRule will handle it.
return false;
} else {
// Consider subqueries in all other cases.
return true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2718,8 +2718,9 @@ public void testLeftJoinOnTwoInlineDataSourcesWithTimeFilter(Map<String, Object>
JoinType.LEFT
)
)
.virtualColumns(expressionVirtualColumn("_v0", "'10.1'", ColumnType.STRING))
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "v0")
.columns("__time", "_v0")
.filters(new SelectorDimFilter("v0", "10.1", null))
.context(queryContext)
.build()
Expand Down Expand Up @@ -2829,8 +2830,10 @@ public void testLeftJoinOnTwoInlineDataSourcesWithOuterWhere(Map<String, Object>
JoinType.LEFT
)
)
.virtualColumns(expressionVirtualColumn("_v0", "'10.1'", ColumnType.STRING))
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "v0")
.filters(selector("v0", "10.1", null))
.columns("__time", "_v0")
.context(queryContext)
.build()
),
Expand Down Expand Up @@ -3022,8 +3025,9 @@ public void testInnerJoinOnTwoInlineDataSourcesWithOuterWhere(Map<String, Object
JoinType.INNER
)
)
.virtualColumns(expressionVirtualColumn("_v0", "'10.1'", ColumnType.STRING))
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "v0")
.columns("__time", "_v0")
.context(queryContext);

testQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6898,12 +6898,9 @@ public void testNestedGroupBy()
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
useDefault ? dimensions(
dimensions(
new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE),
new DefaultDimensionSpec("dim1", "d1")
) : dimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE)
)
)
.setDimFilter(new SelectorDimFilter("m1", "5.0", null))
Expand All @@ -6922,15 +6919,15 @@ public void testNestedGroupBy()
)
.setDimensions(dimensions(
new DefaultDimensionSpec("v0", "_d0", ColumnType.LONG),
new DefaultDimensionSpec(useDefault ? "d1" : "d0", "_d1", ColumnType.STRING)
new DefaultDimensionSpec("d1", "_d1", ColumnType.STRING)
))
.setAggregatorSpecs(
aggregators(
useDefault
? new CountAggregatorFactory("_a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0"),
not(selector("d1", null, null))
not(selector("d0", null, null))
)
)
)
Expand Down Expand Up @@ -10400,6 +10397,27 @@ public void testGroupByTimeFloorAndDimOnGroupByTimeFloorAndDim()
)
)
.setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("a0", "m1")))
.setPostAggregatorSpecs(
ImmutableList.of(
expressionPostAgg(
"p0",
"timestamp_floor(\"d0\",'P1M',null,'UTC')"
)
)
)
.setHavingSpec(
having(
bound(
"a0",
"1",
null,
true,
false,
null,
StringComparators.NUMERIC
)
)
)
.setContext(
withTimestampResultContext(
QUERY_CONTEXT_DEFAULT,
Expand All @@ -10413,29 +10431,10 @@ public void testGroupByTimeFloorAndDimOnGroupByTimeFloorAndDim()
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
expressionVirtualColumn(
"v0",
"timestamp_floor(\"d0\",'P1M',null,'UTC')",
ColumnType.LONG
)
)
.setDimensions(
dimensions(
new DefaultDimensionSpec("d1", "_d0"),
new DefaultDimensionSpec("v0", "_d1", ColumnType.LONG)
)
)
.setDimFilter(
new BoundDimFilter(
"a0",
"1",
null,
true,
null,
null,
null,
StringComparators.NUMERIC
new DefaultDimensionSpec("p0", "_d1", ColumnType.LONG)
)
)
.setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0")))
Expand Down