From 74599bb027854936e2f80c5981907d891a5a3b49 Mon Sep 17 00:00:00 2001 From: Volodymyr Vysotskyi Date: Sun, 18 Sep 2022 13:58:59 +0300 Subject: [PATCH] DRILL-8190: Fix mongo project pushdown for queries with joins --- .../JdbcIntermediatePrelConverterRule.java | 3 +- .../mongo/plan/MongoPluginImplementor.java | 5 +- .../store/mongo/TestMongoProjectPushDown.java | 22 +++++++++ .../PhoenixIntermediatePrelConverterRule.java | 3 +- .../cost/DrillRelMdDistinctRowCount.java | 9 ++-- .../planner/cost/DrillRelMdMaxRowCount.java | 7 ++- .../exec/planner/cost/DrillRelMdRowCount.java | 20 ++++++-- .../planner/cost/DrillRelMdSelectivity.java | 6 +-- .../ColumnConverterFactoryProvider.java | 2 + .../DynamicTypeResolverBuilder.java | 47 +++++++++++++++++++ .../enumerable/EnumerableBatchCreator.java | 2 +- .../store/enumerable/EnumerableSubScan.java | 2 +- ...umerableIntermediatePrelConverterRule.java | 3 +- .../store/enumerable/plan/VertexDrel.java | 18 +++++++ .../store/plan/rel/PluginAggregateRel.java | 2 +- .../plan/rel/StoragePluginTableScan.java | 20 ++++++++ .../PluginIntermediatePrelConverterRule.java | 3 +- .../org/apache/drill/exec/util/Utilities.java | 30 ++++++------ 18 files changed, 160 insertions(+), 44 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java index 8995aab6d49..62250b4662d 100644 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.store.enumerable.plan.VertexDrel; @@ -50,7 +51,7 @@ public void onMatch(RelOptRuleCall call) { VertexDrel in = call.rel(0); RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel( in.getCluster(), - in.getTraitSet().replace(outTrait), + in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON), in.getInput(0), username); call.transformTo(jdbcIntermediatePrel); } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java index b55fa6b1730..774a23265bf 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java @@ -55,7 +55,6 @@ import org.apache.drill.exec.store.plan.rel.PluginSortRel; import org.apache.drill.exec.store.plan.rel.PluginUnionRel; import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan; -import org.apache.drill.exec.util.Utilities; import org.bson.BsonDocument; import org.bson.BsonElement; import org.bson.BsonInt32; @@ -220,8 +219,8 @@ public void implement(PluginUnionRel union) throws IOException { } @Override - public void implement(StoragePluginTableScan scan) throws IOException { - groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan(); + public void implement(StoragePluginTableScan scan) { + groupScan = (MongoGroupScan) scan.getGroupScan(); operations = this.groupScan.getScanSpec().getOperations().stream() .map(BsonDocument::parse) .collect(Collectors.toList()); diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java index a691443311e..372ec6d5e4e 100644 --- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java +++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java @@ -101,4 +101,26 @@ public void testStarProject() throws Exception { .go(); } + @Test // DRILL-8190 + public void testProjectWithJoin() throws Exception { + String query = "SELECT sum(s1.sales) s1_sales,\n" + + "sum(s2.sales) s2_sales\n" + + "FROM mongo.%s.`%s` s1\n" + + "JOIN mongo.%s.`%s` s2 ON s1._id = s2._id"; + + queryBuilder() + .sql(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION) + .planMatcher() + .include("columns=\\[`_id`, `sales`]") + .exclude("columns=\\[`\\*\\*`") + .match(); + + testBuilder() + .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION) + .unOrdered() + .baselineColumns("s1_sales", "s2_sales") + .baselineValues(1194L, 1194L) + .go(); + } + } diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java index c5eaaf1ddae..7d6a88d0cd2 100644 --- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java +++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.store.enumerable.plan.VertexDrel; @@ -48,7 +49,7 @@ public void onMatch(RelOptRuleCall call) { VertexDrel in = call.rel(0); RelNode intermediatePrel = new PhoenixIntermediatePrel( in.getCluster(), - in.getTraitSet().replace(outTrait), + in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON), in.getInput(0)); call.transformTo(intermediatePrel); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java index 25c2fe2c39d..cdfeb6c594f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java @@ -30,6 +30,7 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.metadata.BuiltInMetadata; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdDistinctRowCount; import org.apache.calcite.rel.metadata.RelMdUtil; @@ -42,7 +43,6 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.util.BuiltInMethod; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.planner.common.DrillJoinRelBase; @@ -61,12 +61,9 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{ private static final Logger logger = LoggerFactory.getLogger(DrillRelMdDistinctRowCount.class); - private static final DrillRelMdDistinctRowCount INSTANCE = - new DrillRelMdDistinctRowCount(); - public static final RelMetadataProvider SOURCE = - ReflectiveRelMetadataProvider.reflectiveSource( - BuiltInMethod.DISTINCT_ROW_COUNT.method, INSTANCE); + ReflectiveRelMetadataProvider.reflectiveSource( + new DrillRelMdDistinctRowCount(), BuiltInMetadata.DistinctRowCount.Handler.class); /** * We need to override this method since Calcite and Drill calculate diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java index ed96025189f..09e555c9ff6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java @@ -18,18 +18,17 @@ package org.apache.drill.exec.planner.cost; import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.metadata.BuiltInMetadata; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdMaxRowCount; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.util.BuiltInMethod; public class DrillRelMdMaxRowCount extends RelMdMaxRowCount { - private static final DrillRelMdMaxRowCount INSTANCE = new DrillRelMdMaxRowCount(); - public static final RelMetadataProvider SOURCE = - ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.MAX_ROW_COUNT.method, INSTANCE); + ReflectiveRelMetadataProvider.reflectiveSource( + new DrillRelMdMaxRowCount(), BuiltInMetadata.MaxRowCount.Handler.class); // The method is overriden because of changes done in CALCITE-2991 and // TODO: should be discarded when CALCITE-1048 is fixed. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java index eaaf7d1158d..2f59ab30271 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java @@ -23,11 +23,12 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.metadata.BuiltInMetadata; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdRowCount; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.schema.Table; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.planner.common.DrillLimitRelBase; import org.apache.drill.exec.planner.common.DrillRelOptUtil; @@ -40,10 +41,12 @@ import org.apache.drill.metastore.statistics.TableStatisticsKind; -public class DrillRelMdRowCount extends RelMdRowCount{ - private static final DrillRelMdRowCount INSTANCE = new DrillRelMdRowCount(); +public class DrillRelMdRowCount extends RelMdRowCount { - public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.ROW_COUNT.method, INSTANCE); + public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource( + new DrillRelMdRowCount(), BuiltInMetadata.RowCount.Handler.class); + + private static final Double DEFAULT_SCAN_ROW_COUNT = 1e9; @Override public Double getRowCount(Aggregate rel, RelMetadataQuery mq) { @@ -96,7 +99,14 @@ public Double getRowCount(TableScan rel, RelMetadataQuery mq) { PlannerSettings settings = PrelUtil.getSettings(rel.getCluster()); // If guessing, return selectivity from RelMDRowCount if (DrillRelOptUtil.guessRows(rel)) { - return super.getRowCount(rel, mq); + if (rel instanceof DrillScanRelBase + || rel.getTable().unwrap(Table.class).getStatistic().getRowCount() != null) { + return super.getRowCount(rel, mq); + } else { + // if table doesn't have row count statistics, return large row count + // to make sure that limit will be pushed down + return DEFAULT_SCAN_ROW_COUNT; + } } // Return rowcount from statistics, if available. Otherwise, delegate to parent. try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java index 5732f910028..bda0b16d0b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java @@ -33,6 +33,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.BuiltInMetadata; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdSelectivity; import org.apache.calcite.rel.metadata.RelMdUtil; @@ -46,7 +47,6 @@ import org.apache.calcite.rex.RexVisitor; import org.apache.calcite.rex.RexVisitorImpl; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.util.BuiltInMethod; import org.apache.calcite.util.Util; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.DbGroupScan; @@ -71,8 +71,8 @@ public class DrillRelMdSelectivity extends RelMdSelectivity { private static final Logger logger = LoggerFactory.getLogger(DrillRelMdSelectivity.class); - private static final DrillRelMdSelectivity INSTANCE = new DrillRelMdSelectivity(); - public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.SELECTIVITY.method, INSTANCE); + public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource( + new DrillRelMdSelectivity(), BuiltInMetadata.Selectivity.Handler.class); /* * For now, we are treating all LIKE predicates to have the same selectivity irrespective of the number or position * of wildcard characters (%). This is no different than the present Drill/Calcite behaviour w.r.t to LIKE predicates. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java index 28aec71d0f2..b91e4cbb133 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java @@ -18,10 +18,12 @@ package org.apache.drill.exec.store.enumerable; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonTypeResolver; import org.apache.drill.exec.record.ColumnConverterFactory; import org.apache.drill.exec.record.metadata.TupleMetadata; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) +@JsonTypeResolver(DynamicTypeResolverBuilder.class) public interface ColumnConverterFactoryProvider { ColumnConverterFactory getFactory(TupleMetadata schema); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java new file mode 100644 index 00000000000..64fa9864cfd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.enumerable; + +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.jsontype.TypeDeserializer; +import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder; +import org.reflections.Reflections; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +public class DynamicTypeResolverBuilder extends StdTypeResolverBuilder { + + @Override + public TypeDeserializer buildTypeDeserializer(DeserializationConfig config, + JavaType baseType, Collection subtypes) { + + Reflections reflections = new Reflections("org.apache.drill.exec.store"); + @SuppressWarnings("unchecked") + Class rawClass = (Class) baseType.getRawClass(); + List dynamicSubtypes = reflections.getSubTypesOf(rawClass).stream() + .map(NamedType::new) + .collect(Collectors.toList()); + dynamicSubtypes.addAll(subtypes); + + return super.buildTypeDeserializer(config, baseType, dynamicSubtypes); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java index 2dec45a61aa..9930484a2b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java @@ -61,7 +61,7 @@ private ManagedScanFramework.ScanFrameworkBuilder createBuilder(EnumerableSubSca builder.providedSchema(subScan.getSchema()); ManagedReader reader = new EnumerableRecordReader(subScan.getColumns(), - subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.factoryProvider()); + subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.getConverterFactoryProvider()); ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator()); builder.setReaderFactory(readerFactory); builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java index 4476be8c538..0c7245b6ccc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java @@ -79,7 +79,7 @@ public String getSchemaPath() { return schemaPath; } - public ColumnConverterFactoryProvider factoryProvider() { + public ColumnConverterFactoryProvider getConverterFactoryProvider() { return converterFactoryProvider; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java index 7272a36bf63..c5ede3a22e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.Prel; public class EnumerableIntermediatePrelConverterRule extends RelOptRule { @@ -48,7 +49,7 @@ public void onMatch(RelOptRuleCall call) { VertexDrel in = call.rel(0); RelNode intermediatePrel = new EnumerableIntermediatePrel( in.getCluster(), - in.getTraitSet().replace(outTrait), + in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON), in.getInput(0), context); call.transformTo(intermediatePrel); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java index edbc5912fbf..d202ebdb893 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java @@ -18,15 +18,22 @@ package org.apache.drill.exec.store.enumerable.plan; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.exec.planner.logical.DrillImplementor; import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.util.Utilities; +import org.checkerframework.checker.nullness.qual.Nullable; import java.util.List; +import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST; + /** * The vertex simply holds the child nodes but contains its own traits. * Used for completing Drill logical planning when child nodes have some specific traits. @@ -51,4 +58,15 @@ protected Object clone() { public LogicalOperator implement(DrillImplementor implementor) { throw new UnsupportedOperationException(); } + + @Override + public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + double rowCount = estimateRowCount(mq); + double columnCount = Utilities.isStarQuery(getRowType()) ? STAR_COLUMN_COST : getRowType().getFieldCount(); + double valueCount = rowCount * columnCount; + // columns count is considered during cost calculation to make preferable plans + // with pushed plugin project operators since in the opposite case planner wouldn't consider + // a plan with additional plugin projection that reduces columns as better than a plan without it + return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java index 02885e90abc..3c7f115843c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java @@ -51,7 +51,7 @@ public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet group @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { - return super.computeSelfCost(planner, mq).multiplyBy(0.1); + return super.computeLogicalAggCost(planner, mq).multiplyBy(0.1); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java index 2be69ef7c98..38525e2575a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.store.plan.rel; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelWriter; @@ -27,11 +29,14 @@ import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.common.DrillScanRelBase; import org.apache.drill.exec.store.plan.PluginImplementor; +import org.apache.drill.exec.util.Utilities; import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST; + /** * Storage plugin table scan rel implementation. */ @@ -75,6 +80,21 @@ public boolean canImplement(PluginImplementor implementor) { return implementor.canImplement(this); } + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + List columns = groupScan.getColumns(); + // column count should be adjusted to consider the case of projecting nested columns, + // such a scan should be preferable compared to the scan where root columns are projected only + double columnCount = Utilities.isStarQuery(columns) + ? STAR_COLUMN_COST + : Math.pow(getRowType().getFieldCount(), 2) / Math.max(columns.size(), 1); + + double rowCount = estimateRowCount(mq); + double valueCount = rowCount * columnCount; + + return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1); + } + private static List getColumns(RelDataType rowType) { return rowType.getFieldList().stream() .map(filed -> filed.isDynamicStar() diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java index 279241efea3..a13dc25e329 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.store.enumerable.plan.VertexDrel; import org.apache.drill.exec.store.plan.PluginImplementor; @@ -53,7 +54,7 @@ public void onMatch(RelOptRuleCall call) { VertexDrel in = call.rel(0); RelNode intermediatePrel = new PluginIntermediatePrel( in.getCluster(), - in.getTraitSet().replace(outTrait), + in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON), in.getInput(0), implementorFactory); call.transformTo(intermediatePrel); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java index 87f22018049..1d160d07ed4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java @@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexLiteral; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; @@ -33,8 +34,6 @@ import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.base.Predicate; -import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; public class Utilities { @@ -52,17 +51,13 @@ public static String getFileNameForQueryFragment(FragmentContext context, String int majorFragmentId = handle.getMajorFragmentId(); int minorFragmentId = handle.getMinorFragmentId(); - String fileName = String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag); - - return fileName; + return String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag); } /** * Create {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given defaultSchemaName. Rest of the members of the * QueryContextInformation is derived from the current state of the process. * - * @param defaultSchemaName - * @param sessionId * @return A {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given defaultSchemaName. */ public static QueryContextInformation createQueryContextInfo(final String defaultSchemaName, @@ -82,22 +77,25 @@ public static QueryContextInformation createQueryContextInfo(final String defaul * @return The Drill version. */ public static String getDrillVersion() { - String v = Utilities.class.getPackage().getImplementationVersion(); - return v; + return Utilities.class.getPackage().getImplementationVersion(); } /** * Return true if list of schema path has star column. - * @param projected + * * @return True if the list of {@link org.apache.drill.common.expression.SchemaPath}s has star column. */ public static boolean isStarQuery(Collection projected) { - return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate() { - @Override - public boolean apply(SchemaPath path) { - return Preconditions.checkNotNull(path).equals(SchemaPath.STAR_COLUMN); - } - }).isPresent(); + return Preconditions.checkNotNull(projected, COL_NULL_ERROR).stream() + .anyMatch(SchemaPath::isDynamicStar); + } + + /** + * Return true if the row type has star column. + */ + public static boolean isStarQuery(RelDataType projected) { + return projected.getFieldNames().stream() + .anyMatch(SchemaPath.DYNAMIC_STAR::equals); } /**