Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
*/
public class FilteredDataSource implements DataSource
{

private final DataSource base;
private final DimFilter filter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.druid.sql.calcite.rule.logical.DruidAggregateRemoveRedundancyRule;
import org.apache.druid.sql.calcite.rule.logical.DruidJoinRule;
import org.apache.druid.sql.calcite.rule.logical.DruidLogicalRules;
import org.apache.druid.sql.calcite.rule.logical.LogicalUnnestRule;
import org.apache.druid.sql.calcite.rule.logical.UnnestInputCleanupRule;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.hook.DruidHook;

Expand Down Expand Up @@ -280,7 +282,8 @@ private Program buildDecoupledLogicalOptimizationProgram(PlannerContext plannerC
builder.addRuleInstance(CoreRules.UNION_MERGE);
builder.addRuleInstance(JoinExtractFilterRule.Config.DEFAULT.toRule());
builder.addRuleInstance(FilterIntoJoinRuleConfig.DEFAULT.withPredicate(DruidJoinRule::isSupportedPredicate).toRule());

builder.addRuleInstance(new LogicalUnnestRule());
builder.addRuleInstance(new UnnestInputCleanupRule());
return Programs.of(builder.build(), true, DefaultRelMetadataProvider.INSTANCE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexBuilder;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.PDQVertexFactory.PDQVertex;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
Expand Down Expand Up @@ -232,6 +236,11 @@ boolean forceSubQuery(SourceDesc sourceDesc)
}
return this == RIGHT;
}

boolean filteredDatasourceAllowed()
{
return this == NONE;
}
}

/**
Expand Down Expand Up @@ -401,7 +410,13 @@ public SourceDesc unwrapSourceDesc()
if (canUnwrapSourceDesc()) {
DruidQuery q = buildQuery(false);
SourceDesc origInput = getSource();
return new SourceDesc(origInput.dataSource, q.getOutputRowSignature());
DataSource dataSource;
if (q.getFilter() == null) {
dataSource = origInput.dataSource;
} else {
dataSource = makeFilteredDataSource(origInput, q.getFilter());
}
return new SourceDesc(dataSource, q.getOutputRowSignature());
}
throw DruidException.defensive("Can't unwrap source of vertex[%s]", partialDruidQuery);
}
Expand All @@ -415,14 +430,29 @@ public boolean canUnwrapSourceDesc()
if (partialDruidQuery.stage() == Stage.SCAN) {
return true;
}
if (jst.filteredDatasourceAllowed() && partialDruidQuery.stage() == PartialDruidQuery.Stage.WHERE_FILTER) {
return true;
}
if (partialDruidQuery.stage() == PartialDruidQuery.Stage.SELECT_PROJECT &&
partialDruidQuery.getWhereFilter() == null &&
(jst.filteredDatasourceAllowed() || partialDruidQuery.getWhereFilter() == null) &&
partialDruidQuery.getSelectProject().isMapping()) {
return true;
}
return false;
}
}
}

/**
* This method should not live here.
*
* The fact that {@link Filtration} have to be run on the filter is out-of scope here.
*/
public static FilteredDataSource makeFilteredDataSource(SourceDesc sd, DimFilter filter)
{

Filtration filtration = Filtration.create(filter).optimizeFilterOnly(sd.rowSignature);
DimFilter newFilter = filtration.getDimFilter();
return FilteredDataSource.create(sd.dataSource, newFilter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ public void visitList(
/**
* Shuttle that replaces correlating variables with regular field accesses to the left-hand side.
*/
private static class CorrelatedFieldAccessToInputRef extends RexShuttle
public static class CorrelatedFieldAccessToInputRef extends RexShuttle
{
private final CorrelationId correlationId;

Expand All @@ -595,7 +595,6 @@ public RexNode visitFieldAccess(final RexFieldAccess fieldAccess)
return new RexInputRef(fieldAccess.getField().getIndex(), fieldAccess.getType());
}
}

return super.visitFieldAccess(fieldAccess);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public static boolean computeRightRequiresSubquery(final PlannerContext plannerC
&& DruidRels.druidTableIfLeafRel(right).filter(table -> table.getDataSource().isGlobal()).isPresent());
}

static Set<String> findExistingJoinPrefixes(DataSource... dataSources)
public static Set<String> findExistingJoinPrefixes(DataSource... dataSources)
{
final ArrayList<DataSource> copy = new ArrayList<>(Arrays.asList(dataSources));

Expand All @@ -442,7 +442,7 @@ static Set<String> findExistingJoinPrefixes(DataSource... dataSources)
* Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from
* applying that prefix.
*/
static Pair<String, RowSignature> computeJoinRowSignature(
public static Pair<String, RowSignature> computeJoinRowSignature(
final RowSignature leftSignature,
final RowSignature rightSignature,
final Set<String> prefixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,10 @@ VirtualColumns getVirtualColumns(final boolean includeDimensions)
public static List<DimFilter> getAllFiltersUnderDataSource(DataSource d, List<DimFilter> dimFilterList)
{
if (d instanceof FilteredDataSource) {
dimFilterList.add(((FilteredDataSource) d).getFilter());
DimFilter filter = ((FilteredDataSource) d).getFilter();
if (filter != null) {
dimFilterList.add(filter);
}
}
for (DataSource ds : d.getChildren()) {
dimFilterList.addAll(getAllFiltersUnderDataSource(ds, dimFilterList));
Expand Down Expand Up @@ -1741,4 +1744,9 @@ private RowSignature buildRowSignature(final VirtualColumns virtualColumns, fina
}
return builder.build();
}

public DimFilter getFilter()
{
return filter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ private RelNode visitNode(RelNode other)
}
if (other instanceof DruidRel<?>) {
DruidRel<?> druidRel = (DruidRel<?>) other;
return druidRel.getPartialDruidQuery().leafRel();
if (druidRel.getPartialDruidQuery() != null && druidRel.getPartialDruidQuery().leafRel() != null) {
return druidRel.getPartialDruidQuery().leafRel();
}
}
return other;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public List<RelOptRule> rules()
Convention.NONE,
DruidLogicalConvention.instance(),
DruidJoinRule.class.getSimpleName()
)
),
DruidUnnestRule.INSTANCE
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.rule.logical;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;

import java.util.List;

public class DruidUnnest extends Unnest implements DruidLogicalNode, SourceDescProducer
{
protected DruidUnnest(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode unnestExpr,
RelDataType rowType, RexNode condition)
{
super(cluster, traits, input, unnestExpr, rowType, condition);
}

@Override
protected RelNode copy(RelTraitSet traitSet, RelNode input)
{
return new DruidUnnest(getCluster(), traitSet, input, unnestExpr, rowType, filter);
}

@Override
public SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources)
{
SourceDesc inputDesc = sources.get(0);

RowSignature outputRowSignature = computeRowOutputSignature(inputDesc);

RowSignature filterRowSignature = RowSignature.builder().add(
outputRowSignature.getColumnName(outputRowSignature.size() - 1),
outputRowSignature.getColumnType(outputRowSignature.size() - 1).get()
).build();

VirtualColumn virtualColumn = buildUnnestVirtualColumn(
plannerContext,
inputDesc,
filterRowSignature.getColumnName(0)
);

DimFilter dimFilter = buildDimFilter(plannerContext, filterRowSignature);
DataSource dataSource = UnnestDataSource.create(inputDesc.dataSource, virtualColumn, dimFilter);
return new SourceDesc(dataSource, outputRowSignature);
}

private DimFilter buildDimFilter(PlannerContext plannerContext, RowSignature filterRowSignature)
{
if (filter == null) {
return null;
}
DimFilter dimFilter = Expressions.toFilter(
plannerContext,
filterRowSignature,
null,
filter
);
return Filtration.create(dimFilter).optimizeFilterOnly(filterRowSignature).getDimFilter();
}

private VirtualColumn buildUnnestVirtualColumn(PlannerContext plannerContext, SourceDesc inputDesc, String columnName)
{
final DruidExpression expressionToUnnest = Expressions.toDruidExpression(
plannerContext,
inputDesc.rowSignature,
unnestExpr
);

VirtualColumn virtualColumn = expressionToUnnest.toVirtualColumn(
columnName,
Calcites.getColumnTypeForRelDataType(
unnestExpr.getType()
),
plannerContext.getExpressionParser()
);
return virtualColumn;
}

private RowSignature computeRowOutputSignature(SourceDesc inputDesc)
{
return DruidJoinQueryRel.computeJoinRowSignature(
inputDesc.rowSignature,
RowSignature.builder().add(
"unnest",
Calcites.getColumnTypeForRelDataType(getUnnestedType())
).build(),
DruidJoinQueryRel.findExistingJoinPrefixes(inputDesc.dataSource)
).rhs;
}

private RelDataType getUnnestedType()
{
return rowType.getFieldList().get(rowType.getFieldCount() - 1).getType();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.rule.logical;

import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalConvention;

public class DruidUnnestRule extends ConverterRule
{
private static Config CONFIG = Config.INSTANCE.withConversion(
LogicalUnnest.class,
Convention.NONE,
DruidLogicalConvention.instance(),
DruidUnnestRule.class.getSimpleName()
);

public static final DruidUnnestRule INSTANCE = new DruidUnnestRule(CONFIG);

private DruidUnnestRule(Config config)
{
super(config);
}

@Override
public RelNode convert(RelNode rel)
{
LogicalUnnest unnest = (LogicalUnnest) rel;
RelTraitSet newTrait = unnest.getTraitSet().replace(DruidLogicalConvention.instance());
return new DruidUnnest(
rel.getCluster(),
newTrait,
convert(
unnest.getInput(),
DruidLogicalConvention.instance()
),
unnest.getUnnestExpr(),
unnest.getRowType(),
unnest.filter
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
*/
public class DruidValuesRule extends ConverterRule
{

public DruidValuesRule(
Class<? extends RelNode> clazz,
RelTrait in,
Expand Down
Loading