Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.store.enumerable.plan.VertexDrel;

Expand All @@ -50,7 +51,7 @@ public void onMatch(RelOptRuleCall call) {
VertexDrel in = call.rel(0);
RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
in.getCluster(),
in.getTraitSet().replace(outTrait),
in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
in.getInput(0), username);
call.transformTo(jdbcIntermediatePrel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.drill.exec.store.plan.rel.PluginSortRel;
import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
import org.apache.drill.exec.util.Utilities;
import org.bson.BsonDocument;
import org.bson.BsonElement;
import org.bson.BsonInt32;
Expand Down Expand Up @@ -220,8 +219,8 @@ public void implement(PluginUnionRel union) throws IOException {
}

@Override
public void implement(StoragePluginTableScan scan) throws IOException {
groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
public void implement(StoragePluginTableScan scan) {
groupScan = (MongoGroupScan) scan.getGroupScan();
operations = this.groupScan.getScanSpec().getOperations().stream()
.map(BsonDocument::parse)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,26 @@ public void testStarProject() throws Exception {
.go();
}

@Test // DRILL-8190
public void testProjectWithJoin() throws Exception {
String query = "SELECT sum(s1.sales) s1_sales,\n" +
"sum(s2.sales) s2_sales\n" +
"FROM mongo.%s.`%s` s1\n" +
"JOIN mongo.%s.`%s` s2 ON s1._id = s2._id";

queryBuilder()
.sql(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
.planMatcher()
.include("columns=\\[`_id`, `sales`]")
.exclude("columns=\\[`\\*\\*`")
.match();

testBuilder()
.sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
.unOrdered()
.baselineColumns("s1_sales", "s2_sales")
.baselineValues(1194L, 1194L)
.go();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.store.enumerable.plan.VertexDrel;

Expand All @@ -48,7 +49,7 @@ public void onMatch(RelOptRuleCall call) {
VertexDrel in = call.rel(0);
RelNode intermediatePrel = new PhoenixIntermediatePrel(
in.getCluster(),
in.getTraitSet().replace(outTrait),
in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
in.getInput(0));
call.transformTo(intermediatePrel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.metadata.BuiltInMetadata;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
import org.apache.calcite.rel.metadata.RelMdUtil;
Expand All @@ -42,7 +43,6 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.common.DrillJoinRelBase;
Expand All @@ -61,12 +61,9 @@
public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{
private static final Logger logger = LoggerFactory.getLogger(DrillRelMdDistinctRowCount.class);

private static final DrillRelMdDistinctRowCount INSTANCE =
new DrillRelMdDistinctRowCount();

public static final RelMetadataProvider SOURCE =
ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.DISTINCT_ROW_COUNT.method, INSTANCE);
ReflectiveRelMetadataProvider.reflectiveSource(
new DrillRelMdDistinctRowCount(), BuiltInMetadata.DistinctRowCount.Handler.class);

/**
* We need to override this method since Calcite and Drill calculate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
package org.apache.drill.exec.planner.cost;

import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.metadata.BuiltInMetadata;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdMaxRowCount;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.BuiltInMethod;

public class DrillRelMdMaxRowCount extends RelMdMaxRowCount {

private static final DrillRelMdMaxRowCount INSTANCE = new DrillRelMdMaxRowCount();

public static final RelMetadataProvider SOURCE =
ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.MAX_ROW_COUNT.method, INSTANCE);
ReflectiveRelMetadataProvider.reflectiveSource(
new DrillRelMdMaxRowCount(), BuiltInMetadata.MaxRowCount.Handler.class);

// The method is overriden because of changes done in CALCITE-2991 and
// TODO: should be discarded when CALCITE-1048 is fixed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.metadata.BuiltInMetadata;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdRowCount;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.schema.Table;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.planner.common.DrillLimitRelBase;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
Expand All @@ -40,10 +41,12 @@
import org.apache.drill.metastore.statistics.TableStatisticsKind;


public class DrillRelMdRowCount extends RelMdRowCount{
private static final DrillRelMdRowCount INSTANCE = new DrillRelMdRowCount();
public class DrillRelMdRowCount extends RelMdRowCount {

public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.ROW_COUNT.method, INSTANCE);
public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
new DrillRelMdRowCount(), BuiltInMetadata.RowCount.Handler.class);

private static final Double DEFAULT_SCAN_ROW_COUNT = 1e9;

@Override
public Double getRowCount(Aggregate rel, RelMetadataQuery mq) {
Expand Down Expand Up @@ -96,7 +99,14 @@ public Double getRowCount(TableScan rel, RelMetadataQuery mq) {
PlannerSettings settings = PrelUtil.getSettings(rel.getCluster());
// If guessing, return selectivity from RelMDRowCount
if (DrillRelOptUtil.guessRows(rel)) {
return super.getRowCount(rel, mq);
if (rel instanceof DrillScanRelBase
|| rel.getTable().unwrap(Table.class).getStatistic().getRowCount() != null) {
return super.getRowCount(rel, mq);
} else {
// if table doesn't have row count statistics, return large row count
// to make sure that limit will be pushed down
return DEFAULT_SCAN_ROW_COUNT;
}
}
// Return rowcount from statistics, if available. Otherwise, delegate to parent.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.BuiltInMetadata;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdSelectivity;
import org.apache.calcite.rel.metadata.RelMdUtil;
Expand All @@ -46,7 +47,6 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Util;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.DbGroupScan;
Expand All @@ -71,8 +71,8 @@
public class DrillRelMdSelectivity extends RelMdSelectivity {
private static final Logger logger = LoggerFactory.getLogger(DrillRelMdSelectivity.class);

private static final DrillRelMdSelectivity INSTANCE = new DrillRelMdSelectivity();
public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.SELECTIVITY.method, INSTANCE);
public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
new DrillRelMdSelectivity(), BuiltInMetadata.Selectivity.Handler.class);
/*
* For now, we are treating all LIKE predicates to have the same selectivity irrespective of the number or position
* of wildcard characters (%). This is no different than the present Drill/Calcite behaviour w.r.t to LIKE predicates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.drill.exec.store.enumerable;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonTypeResolver;
import org.apache.drill.exec.record.ColumnConverterFactory;
import org.apache.drill.exec.record.metadata.TupleMetadata;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonTypeResolver(DynamicTypeResolverBuilder.class)
public interface ColumnConverterFactoryProvider {

ColumnConverterFactory getFactory(TupleMetadata schema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.enumerable;

import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
import org.reflections.Reflections;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public class DynamicTypeResolverBuilder extends StdTypeResolverBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this new serde builder become necessary because of the changes made in this PR for pushing down projections to Mongo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixes for Mongo (in the java-exec module) helped to find out that serde for enumerable plugins is broken, so fixed it here.


@Override
public TypeDeserializer buildTypeDeserializer(DeserializationConfig config,
JavaType baseType, Collection<NamedType> subtypes) {

Reflections reflections = new Reflections("org.apache.drill.exec.store");
@SuppressWarnings("unchecked")
Class<Object> rawClass = (Class<Object>) baseType.getRawClass();
List<NamedType> dynamicSubtypes = reflections.getSubTypesOf(rawClass).stream()
.map(NamedType::new)
.collect(Collectors.toList());
dynamicSubtypes.addAll(subtypes);

return super.buildTypeDeserializer(config, baseType, dynamicSubtypes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private ManagedScanFramework.ScanFrameworkBuilder createBuilder(EnumerableSubSca
builder.providedSchema(subScan.getSchema());

ManagedReader<SchemaNegotiator> reader = new EnumerableRecordReader(subScan.getColumns(),
subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.factoryProvider());
subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.getConverterFactoryProvider());
ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator());
builder.setReaderFactory(readerFactory);
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public String getSchemaPath() {
return schemaPath;
}

public ColumnConverterFactoryProvider factoryProvider() {
public ColumnConverterFactoryProvider getConverterFactoryProvider() {
return converterFactoryProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.Prel;

public class EnumerableIntermediatePrelConverterRule extends RelOptRule {
Expand All @@ -48,7 +49,7 @@ public void onMatch(RelOptRuleCall call) {
VertexDrel in = call.rel(0);
RelNode intermediatePrel = new EnumerableIntermediatePrel(
in.getCluster(),
in.getTraitSet().replace(outTrait),
in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
in.getInput(0),
context);
call.transformTo(intermediatePrel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@
package org.apache.drill.exec.store.enumerable.plan;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.exec.planner.logical.DrillImplementor;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.util.Utilities;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;

import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;

/**
* The vertex simply holds the child nodes but contains its own traits.
* Used for completing Drill logical planning when child nodes have some specific traits.
Expand All @@ -51,4 +58,15 @@ protected Object clone() {
public LogicalOperator implement(DrillImplementor implementor) {
throw new UnsupportedOperationException();
}

@Override
public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
double rowCount = estimateRowCount(mq);
double columnCount = Utilities.isStarQuery(getRowType()) ? STAR_COLUMN_COST : getRowType().getFieldCount();
double valueCount = rowCount * columnCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it only queries with a pushed down join that were affected by there being no cost saving for adding a plugin projection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think more queries were affected, but probably it was simpler to reproduce it on queries with joins.

// columns count is considered during cost calculation to make preferable plans
// with pushed plugin project operators since in the opposite case planner wouldn't consider
// a plan with additional plugin projection that reduces columns as better than a plan without it
return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet group

@Override
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
return super.computeSelfCost(planner, mq).multiplyBy(0.1);
return super.computeLogicalAggCost(planner, mq).multiplyBy(0.1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these Plugin*Rel RelNodes used when operations are pushed down to storage plugins?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they represent part of the pushed-down query, so it is possible to find out the most optimal query for both Drill and actual storage where the query is pushed down.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.drill.exec.store.plan.rel;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelWriter;
Expand All @@ -27,11 +29,14 @@
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
import org.apache.drill.exec.store.plan.PluginImplementor;
import org.apache.drill.exec.util.Utilities;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;

/**
* Storage plugin table scan rel implementation.
*/
Expand Down Expand Up @@ -75,6 +80,21 @@ public boolean canImplement(PluginImplementor implementor) {
return implementor.canImplement(this);
}

@Override
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
List<SchemaPath> columns = groupScan.getColumns();
// column count should be adjusted to consider the case of projecting nested columns,
// such a scan should be preferable compared to the scan where root columns are projected only
double columnCount = Utilities.isStarQuery(columns)
? STAR_COLUMN_COST
: Math.pow(getRowType().getFieldCount(), 2) / Math.max(columns.size(), 1);

double rowCount = estimateRowCount(mq);
double valueCount = rowCount * columnCount;

return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
}

private static List<SchemaPath> getColumns(RelDataType rowType) {
return rowType.getFieldList().stream()
.map(filed -> filed.isDynamicStar()
Expand Down
Loading