diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 959f0cf0d43bac..23112d377dd067 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -1568,6 +1568,7 @@ functionCallExpression : functionIdentifier LEFT_PAREN ( (DISTINCT|ALL)? + (LEFT_BRACKET identifier RIGHT_BRACKET)? arguments+=expression (COMMA arguments+=expression)* (ORDER BY sortItem (COMMA sortItem)*)? )? RIGHT_PAREN diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundFunction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundFunction.java index 9b46293086ad73..c10b0a52880404 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundFunction.java @@ -38,6 +38,7 @@ public class UnboundFunction extends Function implements Unbound, PropagateNullable { private final String dbName; private final boolean isDistinct; + private final boolean isSkew; // for create view stmt, the start and end position of the function string in original sql private final Optional> indexInSqlString; // the start and end position of the function string in original sql @@ -69,29 +70,31 @@ public FunctionIndexInSql indexInQueryPart(int offset) { } public UnboundFunction(String name, List arguments) { - this(null, name, false, arguments, Optional.empty(), Optional.empty()); + this(null, name, false, arguments, false, Optional.empty(), Optional.empty()); } public UnboundFunction(String dbName, String name, List arguments) { - this(dbName, name, false, arguments, Optional.empty(), Optional.empty()); + this(dbName, name, false, arguments, false, Optional.empty(), Optional.empty()); } public UnboundFunction(String name, boolean isDistinct, List arguments) { - this(null, name, isDistinct, arguments, Optional.empty(), Optional.empty()); + this(null, name, isDistinct, arguments, false, Optional.empty(), Optional.empty()); } - public UnboundFunction(String dbName, String name, boolean isDistinct, List arguments) { - this(dbName, name, isDistinct, arguments, Optional.empty(), Optional.empty()); + public UnboundFunction(String dbName, String name, boolean isDistinct, List arguments, boolean isSkew) { + this(dbName, name, isDistinct, arguments, isSkew, Optional.empty(), Optional.empty()); } + /**UnboundFunction*/ public UnboundFunction(String dbName, String name, boolean isDistinct, - List arguments, Optional functionIndexInSql, + List arguments, boolean isSkew, Optional functionIndexInSql, Optional> indexInSqlString) { super(name, arguments); this.dbName = dbName; this.isDistinct = isDistinct; this.functionIndexInSql = functionIndexInSql; this.indexInSqlString = indexInSqlString; + this.isSkew = isSkew; } @Override @@ -110,6 +113,10 @@ public boolean isDistinct() { return isDistinct; } + public boolean isSkew() { + return isSkew; + } + public List getArguments() { return children(); } @@ -135,7 +142,8 @@ public R accept(ExpressionVisitor visitor, C context) { @Override public UnboundFunction withChildren(List children) { - return new UnboundFunction(dbName, getName(), isDistinct, children, functionIndexInSql, indexInSqlString); + return new UnboundFunction(dbName, getName(), isDistinct, children, isSkew, functionIndexInSql, + indexInSqlString); } public Optional getFunctionIndexInSql() { @@ -143,7 +151,8 @@ public Optional getFunctionIndexInSql() { } public UnboundFunction withIndexInSqlString(Optional functionIndexInSql) { - return new UnboundFunction(dbName, getName(), isDistinct, children, functionIndexInSql, indexInSqlString); + return new UnboundFunction(dbName, getName(), isDistinct, children, isSkew, functionIndexInSql, + indexInSqlString); } @Override @@ -167,7 +176,7 @@ public int computeHashCode() { } public UnboundFunction withIndexInSql(Pair index) { - return new UnboundFunction(dbName, getName(), isDistinct, children, functionIndexInSql, + return new UnboundFunction(dbName, getName(), isDistinct, children, isSkew, functionIndexInSql, Optional.ofNullable(index)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index 80c8760d0e545e..72d9d953301c09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -20,6 +20,7 @@ import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.rewrite.CostBasedRewriteJob; import org.apache.doris.nereids.jobs.rewrite.RewriteJob; +import org.apache.doris.nereids.rules.JoinSplitForNullSkew; import org.apache.doris.nereids.rules.RuleSet; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet; @@ -325,7 +326,8 @@ public class Rewriter extends AbstractBatchJobExecutor { bottomUp(new EliminateEmptyRelation()), // when union has empty relation child and constantExprsList is not empty, // after EliminateEmptyRelation, project can be pushed into union - topDown(new PushProjectIntoUnion()) + topDown(new PushProjectIntoUnion()), + costBased(topDown(new JoinSplitForNullSkew())) ), topic("infer In-predicate from Or-predicate", topDown(new InferInPredicateFromOr()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 67fa8eafdb6f5b..61181dd3ba082d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -2692,6 +2692,7 @@ public Expression visitFunctionCallExpression(DorisParser.FunctionCallExpression params.addAll(visit(ctx.expression(), Expression.class)); List orderKeys = visit(ctx.sortItem(), OrderKey.class); params.addAll(orderKeys.stream().map(OrderExpression::new).collect(Collectors.toList())); + boolean isSkew = ctx.identifier() != null && ctx.identifier().getText().equalsIgnoreCase("skew"); List unboundStars = ExpressionUtils.collectAll(params, UnboundStar.class::isInstance); if (!unboundStars.isEmpty()) { @@ -2717,7 +2718,7 @@ public Expression visitFunctionCallExpression(DorisParser.FunctionCallExpression if (ctx.functionIdentifier().dbName != null) { dbName = ctx.functionIdentifier().dbName.getText(); } - UnboundFunction function = new UnboundFunction(dbName, functionName, isDistinct, params); + UnboundFunction function = new UnboundFunction(dbName, functionName, isDistinct, params, isSkew); if (ctx.windowSpec() != null) { if (isDistinct) { throw new ParseException("DISTINCT not allowed in analytic function: " + functionName, ctx); @@ -4182,7 +4183,7 @@ public Object visitCallProcedure(CallProcedureContext ctx) { .map(this::typedVisit) .collect(ImmutableList.toImmutableList()); UnboundFunction unboundFunction = new UnboundFunction(procedureName.getDbName(), procedureName.getName(), - true, arguments); + true, arguments, false); return new CallCommand(unboundFunction, getOriginSql(ctx)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/JoinSplitForNullSkew.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/JoinSplitForNullSkew.java new file mode 100644 index 00000000000000..d4161a47dddfd1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/JoinSplitForNullSkew.java @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules; + +import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; +import org.apache.doris.nereids.trees.copier.DeepCopierContext; +import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.IsNull; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Not; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.ArrayList; +import java.util.List; + +/** + * LogicalLeftOuterJoin(hashConjuncts:t1.a=t2.a) + * +--Plan1(output:t1.a) + * +--Plan2(output:t2.a) + * -> + * LogicalUnion + * +--LogicalProject + * +--LogicalFilter(t1.a is null) + * +--Plan1 + * +--LogicalLeftOuterJoin(t1.a=t2.a) + * +--LogicalFilter(t1.a is not null) + * +--Plan1 + * +--Plan2 + * + * LogicalRightOuterJoin(hashConjuncts:t1.a=t2.a) + * +--Plan1(output:t1.a) + * +--Plan2(output:t2.a) + * -> + * LogicalUnion + * +--LogicalProject + * +--LogicalFilter(t2.a is null) + * +--Plan2 + * +--LogicalRightOuterJoin(t1.a=t2.a) + * +--Plan1 + * +--LogicalFilter(t2.a is not null) + * +--Plan2 + * */ +public class JoinSplitForNullSkew extends OneRewriteRuleFactory { + @Override + public Rule build() { + return logicalJoin() + .when(join -> join.getJoinType().isOneSideOuterJoin()) + .whenNot(join -> join.isMarkJoin() || !join.getMarkJoinConjuncts().isEmpty()) + .when(join -> join.getHashJoinConjuncts().size() == 1) + .then(this::splitJoin) + .toRule(RuleType.JOIN_SPLIT_FOR_NULL_SKEW); + } + + private Plan splitJoin(LogicalJoin join) { + boolean isLeftJoin = join.getJoinType().isLeftOuterJoin(); + Plan primarySide = isLeftJoin ? join.left() : join.right(); + Plan associatedSide = isLeftJoin ? join.right() : join.left(); + Expression conjunct = join.getHashJoinConjuncts().get(0); + if (!(conjunct instanceof EqualTo)) { + return null; + } + EqualTo equalTo = (EqualTo) conjunct; + Expression splitExpr; + if (primarySide.getOutputSet().containsAll(equalTo.left().getInputSlots())) { + splitExpr = equalTo.left(); + } else { + splitExpr = equalTo.right(); + } + if (!splitExpr.nullable()) { + return null; + } + // avoid duplicate application of rules + Expression isNotNull = new Not(new IsNull(splitExpr)); + if (primarySide instanceof LogicalFilter + && ((LogicalFilter) primarySide).getConjuncts().contains(isNotNull)) { + return null; + } + + // is not null side construct + LogicalFilter isNotNullFilter = new LogicalFilter<>(ImmutableSet.of(isNotNull), primarySide); + LogicalJoin newJoin; + if (isLeftJoin) { + newJoin = join.withChildren(ImmutableList.of(isNotNullFilter, associatedSide)); + } else { + newJoin = join.withChildren(ImmutableList.of(associatedSide, isNotNullFilter)); + } + Plan deepCopyJoin = LogicalPlanDeepCopier.INSTANCE.deepCopy(newJoin, new DeepCopierContext()); + + // is null side construct + LogicalFilter isNullFilter = new LogicalFilter<>(ImmutableSet.of(new IsNull(splitExpr)), primarySide); + Plan deepCopyFilter = LogicalPlanDeepCopier.INSTANCE.deepCopy(isNullFilter, new DeepCopierContext()); + List newProjects = new ArrayList<>(join.getOutput().size()); + if (isLeftJoin) { + newProjects.addAll(deepCopyFilter.getOutput()); + for (Slot slot : associatedSide.getOutput()) { + newProjects.add(new Alias(new NullLiteral(slot.getDataType()))); + } + } else { + for (Slot slot : associatedSide.getOutput()) { + newProjects.add(new Alias(new NullLiteral(slot.getDataType()))); + } + newProjects.addAll(deepCopyFilter.getOutput()); + } + LogicalProject isNullProject = new LogicalProject<>(newProjects, deepCopyFilter); + + // regularChildrenOutputs construct + List> regularChildrenOutputs = new ArrayList<>(); + regularChildrenOutputs.add((List) isNullProject.getOutput()); + regularChildrenOutputs.add((List) deepCopyJoin.getOutput()); + + return new LogicalUnion(Qualifier.ALL, (List) join.getOutput(), regularChildrenOutputs, + ImmutableList.of(), false, ImmutableList.of(isNullProject, deepCopyJoin)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 9cee3077f1331f..b07fc257427703 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -335,7 +335,7 @@ public enum RuleType { CROSS_TO_INNER_JOIN(RuleTypeClass.REWRITE), PRUNE_EMPTY_PARTITION(RuleTypeClass.REWRITE), PROJECT_OTHER_JOIN_CONDITION(RuleTypeClass.REWRITE), - + JOIN_SPLIT_FOR_NULL_SKEW(RuleTypeClass.REWRITE), // split limit SPLIT_LIMIT(RuleTypeClass.REWRITE), PULL_UP_JOIN_FROM_UNION_ALL(RuleTypeClass.REWRITE), @@ -505,6 +505,7 @@ public enum RuleType { LOGICAL_INTERSECT_TO_PHYSICAL_INTERSECT(RuleTypeClass.IMPLEMENTATION), LOGICAL_GENERATE_TO_PHYSICAL_GENERATE(RuleTypeClass.IMPLEMENTATION), LOGICAL_WINDOW_TO_PHYSICAL_WINDOW_RULE(RuleTypeClass.IMPLEMENTATION), + COUNT_DISTINCT_AGG_SKEW_REWRITE(RuleTypeClass.IMPLEMENTATION), IMPLEMENTATION_SENTINEL(RuleTypeClass.IMPLEMENTATION), // sentinel, use to count rules diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java index 485ba677ff4ecc..1f606e51ecc57b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java @@ -405,6 +405,7 @@ public Expression visitUnboundFunction(UnboundFunction unboundFunction, Expressi List arguments = unboundFunction.isDistinct() ? ImmutableList.builderWithExpectedSize(unboundFunction.arity() + 1) .add(unboundFunction.isDistinct()) + .add(unboundFunction.isSkew()) .addAll(unboundFunction.getArguments()) .build() : (List) unboundFunction.getArguments(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 0a1b1c4e9b2d30..4d28251c4b1cb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -37,8 +37,10 @@ import org.apache.doris.nereids.trees.expressions.AggregateExpression; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.IsNull; +import org.apache.doris.nereids.trees.expressions.Mod; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Or; import org.apache.doris.nereids.trees.expressions.OrderExpression; @@ -56,8 +58,11 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0; import org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct; import org.apache.doris.nereids.trees.expressions.functions.scalar.If; +import org.apache.doris.nereids.trees.expressions.functions.scalar.XxHash32; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.expressions.literal.SmallIntLiteral; +import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; import org.apache.doris.nereids.trees.plans.AggMode; import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.GroupPlan; @@ -73,11 +78,16 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate.PushDownAggOp; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.types.SmallIntType; +import org.apache.doris.nereids.types.StringType; import org.apache.doris.nereids.types.TinyIntType; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Function; @@ -88,6 +98,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -327,6 +338,7 @@ public List buildRules() { basePattern .when(agg -> agg.getDistinctArguments().size() == 1 && couldConvertToMulti(agg)) .when(agg -> agg.supportAggregatePhase(AggregatePhase.TWO)) + .whenNot(agg -> agg.hasSkewHint() && agg.canSkewRewrite()) .thenApplyMulti(ctx -> twoPhaseAggregateWithMultiDistinct(ctx.root, ctx.connectContext)) ), RuleType.TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT.build( @@ -347,6 +359,7 @@ && couldConvertToMulti(agg)) basePattern .when(agg -> agg.getDistinctArguments().size() == 1) .whenNot(agg -> agg.mustUseMultiDistinctAgg()) + .whenNot(agg -> agg.hasSkewHint() && agg.canSkewRewrite()) .when(agg -> agg.supportAggregatePhase(AggregatePhase.THREE)) .thenApplyMulti(ctx -> threePhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) ), @@ -421,6 +434,7 @@ && couldConvertToMulti(agg)) return couldConvertToMulti(agg); }) .when(agg -> agg.supportAggregatePhase(AggregatePhase.FOUR)) + .whenNot(agg -> agg.hasSkewHint() && agg.canSkewRewrite()) .thenApplyMulti(ctx -> { Function, RequireProperties> secondPhaseRequireGroupByAndDistinctHash = groupByAndDistinct -> RequireProperties.of( @@ -438,6 +452,9 @@ && couldConvertToMulti(agg)) secondPhaseRequireGroupByAndDistinctHash, fourPhaseRequireGroupByHash ); }) + ), + RuleType.COUNT_DISTINCT_AGG_SKEW_REWRITE.build( + basePattern.thenApply(ctx -> countDistinctSkewRewrite(ctx.root, ctx.cascadesContext)) ) ); } @@ -2088,4 +2105,113 @@ private boolean couldConvertToMulti(LogicalAggregate aggregate) } return true; } + + // select a , count(distinct b) group by a + PhysicalHashAggregate countDistinctSkewRewrite(LogicalAggregate logicalAgg, + CascadesContext cascadesContext) { + // check + if (!logicalAgg.canSkewRewrite()) { + return null; + } + + // 1.local agg + List localAggGroupBy = new ArrayList<>(logicalAgg.getGroupByExpressions()); + Count count = (Count) logicalAgg.getAggregateFunctions().iterator().next(); + if (!(count.child(0) instanceof Slot)) { + return null; + } + localAggGroupBy.add(count.child(0)); + List localAggOutput = Utils.fastToImmutableList((List) localAggGroupBy); + RequireProperties requireAny = RequireProperties.of(PhysicalProperties.ANY); + boolean maybeUsingStreamAgg = maybeUsingStreamAgg(cascadesContext.getConnectContext(), + localAggGroupBy); + boolean couldBanned = false; + AggregateParam localParam = new AggregateParam(AggPhase.LOCAL, AggMode.INPUT_TO_BUFFER, couldBanned); + PhysicalHashAggregate localAgg = new PhysicalHashAggregate<>( + Utils.fastToImmutableList(localAggGroupBy), localAggOutput, + Optional.empty(), localParam, + maybeUsingStreamAgg, Optional.empty(), null, + requireAny, logicalAgg.child()); + // add shuffle expr in project + List projections = new ArrayList<>(localAgg.getOutputs()); + Alias modAlias = getShuffleExpr(count, cascadesContext); + projections.add(modAlias); + PhysicalProject physicalProject = new PhysicalProject<>(Utils.fastToImmutableList(projections), + null, localAgg); + + // 2.second phase agg group by a,h, multi_distinct求count(distinct b)。 + List secondPhaseAggGroupBy = new ArrayList<>(logicalAgg.getGroupByExpressions()); + secondPhaseAggGroupBy.add(modAlias.toSlot()); + List secondPhaseAggOutput = new ArrayList<>((List) secondPhaseAggGroupBy); + Alias aliasTarget = new Alias(new TinyIntLiteral((byte) 0)); + for (NamedExpression ne : logicalAgg.getOutputExpressions()) { + if (ne instanceof Alias) { + if (((Alias) ne).child().equals(count)) { + aliasTarget = (Alias) ne; + } + } + } + AggregateParam secondParam = new AggregateParam(AggPhase.GLOBAL, AggMode.INPUT_TO_RESULT, couldBanned); + AggregateFunction multiDistinct = count.convertToMultiDistinct(); + Alias multiDistinctAlias = new Alias(new AggregateExpression(multiDistinct, secondParam)); + secondPhaseAggOutput.add(multiDistinctAlias); + List shuffleIds = new ArrayList<>(); + for (Expression expr : secondPhaseAggGroupBy) { + if (expr instanceof Slot) { + shuffleIds.add(((Slot) expr).getExprId()); + } + } + RequireProperties secondRequireProperties = RequireProperties.of( + PhysicalProperties.createHash(shuffleIds, ShuffleType.REQUIRE)); + PhysicalHashAggregate secondPhaseAgg = new PhysicalHashAggregate<>( + secondPhaseAggGroupBy, Utils.fastToImmutableList(secondPhaseAggOutput), + Optional.empty(), secondParam, + false, Optional.empty(), logicalAgg.getLogicalProperties(), + secondRequireProperties, physicalProject); + + // 3. third phase agg + List thirdPhaseAggGroupBy = Utils.fastToImmutableList(logicalAgg.getGroupByExpressions()); + List thirdPhaseAggOutput = new ArrayList<>((List) thirdPhaseAggGroupBy); + AggregateParam thirdParam = new AggregateParam(AggPhase.DISTINCT_LOCAL, AggMode.INPUT_TO_BUFFER, couldBanned); + Count thirdCount = new Count(multiDistinctAlias.toSlot()); + Alias thirdCountAlias = new Alias(new AggregateExpression(thirdCount, thirdParam)); + thirdPhaseAggOutput.add(thirdCountAlias); + PhysicalHashAggregate thirdPhaseAgg = new PhysicalHashAggregate<>( + thirdPhaseAggGroupBy, Utils.fastToImmutableList(thirdPhaseAggOutput), + Optional.empty(), thirdParam, + false, Optional.empty(), logicalAgg.getLogicalProperties(), + secondRequireProperties, secondPhaseAgg); + + // 4. fourth phase agg + List fourthPhaseAggOutput = new ArrayList<>((List) thirdPhaseAggGroupBy); + AggregateParam fourthParam = new AggregateParam(AggPhase.DISTINCT_GLOBAL, AggMode.BUFFER_TO_RESULT, couldBanned); + Alias sumAliasFour = new Alias(aliasTarget.getExprId(), + new AggregateExpression(thirdCount, fourthParam, thirdCountAlias.toSlot()), + aliasTarget.getName()); + fourthPhaseAggOutput.add(sumAliasFour); + List shuffleIdsFour = new ArrayList<>(); + for (Expression expr : logicalAgg.getExpressions()) { + if (expr instanceof Slot) { + shuffleIdsFour.add(((Slot) expr).getExprId()); + } + } + RequireProperties fourthRequireProperties = RequireProperties.of( + PhysicalProperties.createHash(shuffleIdsFour, ShuffleType.REQUIRE)); + return new PhysicalHashAggregate<>(thirdPhaseAggGroupBy, + Utils.fastToImmutableList(fourthPhaseAggOutput), Optional.empty(), fourthParam, + false, Optional.empty(), logicalAgg.getLogicalProperties(), + fourthRequireProperties, thirdPhaseAgg); + } + + private Alias getShuffleExpr(Count count, CascadesContext cascadesContext) { + int bucketNum = cascadesContext.getConnectContext().getSessionVariable().aggDistinctSkewBucketNum; + Preconditions.checkState(bucketNum > 0 && bucketNum <= 65536); + DataType type = bucketNum <= 256 ? TinyIntType.INSTANCE : SmallIntType.INSTANCE; + int bucket = bucketNum / 2; + Mod mod = new Mod(new XxHash32(TypeCoercionUtils.castIfNotSameType( + count.child(0), + StringType.INSTANCE)), new SmallIntLiteral((short) bucket)); + Cast cast = new Cast(mod, type); + return new Alias(cast); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java index ba16b07ed5fafa..4af0d8690f85ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java @@ -46,22 +46,29 @@ public class Count extends NotNullableAggregateFunction ); private final boolean isStar; + private final boolean isSkew; public Count() { super("count"); this.isStar = true; + this.isSkew = false; } /** * this constructor use for COUNT(c1, c2) to get correct error msg. */ public Count(Expression child, Expression... varArgs) { - this(false, child, varArgs); + this(false, false, child, varArgs); } public Count(boolean distinct, Expression arg0, Expression... varArgs) { + this(distinct, false, arg0, varArgs); + } + + public Count(boolean distinct, boolean isSkew, Expression arg0, Expression... varArgs) { super("count", distinct, ExpressionUtils.mergeArguments(arg0, varArgs)); this.isStar = false; + this.isSkew = isSkew; } public boolean isCountStar() { @@ -93,6 +100,10 @@ public boolean isStar() { return isStar; } + public boolean isSkew() { + return isSkew; + } + @Override public boolean isConstant() { return false; @@ -111,9 +122,9 @@ public Count withDistinctAndChildren(boolean distinct, List children } return new Count(); } else if (children.size() == 1) { - return new Count(distinct, children.get(0)); + return new Count(distinct, isSkew, children.get(0)); } else { - return new Count(distinct, children.get(0), + return new Count(distinct, isSkew, children.get(0), children.subList(1, children.size()).toArray(new Expression[0])); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java index 7a283c740e5912..a30d33f41bbdc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java @@ -21,6 +21,7 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.functions.agg.Count; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.UnaryPlan; import org.apache.doris.nereids.trees.plans.logical.OutputPrunable; @@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -106,4 +108,22 @@ default boolean isDistinct() { return getOutputExpressions().stream().allMatch(e -> e instanceof Slot) && getGroupByExpressions().stream().allMatch(e -> e instanceof Slot); } + + default boolean hasSkewHint() { + return getAggregateFunctions().stream().anyMatch(f -> f instanceof Count && ((Count) f).isSkew()); + } + + /**canSkewRewrite*/ + default boolean canSkewRewrite() { + Set distinctArguments = getDistinctArguments(); + if (distinctArguments.size() == 1 + && getAggregateFunctions().size() == 1 + && getAggregateFunctions().iterator().next() instanceof Count + && getAggregateFunctions().iterator().next().arity() == 1 + && ((Count) getAggregateFunctions().iterator().next()).isSkew() + && !getGroupByExpressions().equals(new ArrayList<>(distinctArguments))) { + return true; + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 86c47de612fe63..057b4cbf1710aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -716,6 +716,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_TEXT_VALIDATE_UTF8 = "enable_text_validate_utf8"; + public static final String AGG_DISTINCT_SKEW_BUCKET_NUM = "agg_distinct_skew_bucket_num"; + /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -2190,6 +2192,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { needForward = true) public boolean enableExternalTableBatchMode = true; + @VariableMgr.VarAttr(name = AGG_DISTINCT_SKEW_BUCKET_NUM, needForward = true, + description = {"agg distinct 倾斜场景的聚合分桶数"}) + public int aggDistinctSkewBucketNum = 1024; + public Set getIgnoredRuntimeFilterIds() { Set ids = Sets.newLinkedHashSet(); if (ignoreRuntimeFilterIds.isEmpty()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java index e1e03e64d9849a..de8ffe9f9ba27b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggregateStrategiesTest.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.Add; import org.apache.doris.nereids.trees.expressions.AggregateExpression; import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -451,4 +452,50 @@ private boolean verifyAlwaysNullableFlag(Set functions, boole } return true; } + + @Test + public void skewCountDistinctRewrite() { + Slot id = rStudent.getOutput().get(0).toSlot(); + Slot age = rStudent.getOutput().get(3).toSlot(); + List groupExpressionList = Lists.newArrayList(); + groupExpressionList.add(age); + List outputExpressionList = Lists.newArrayList( + age, new Alias(new Count(true, true, id), "count_id")); + Plan root = new LogicalAggregate<>(groupExpressionList, outputExpressionList, + true, Optional.empty(), rStudent); + + // select count(distinct id) group by age; + PlanChecker.from(MemoTestUtils.createConnectContext(), root) + .applyImplementation(skewRewriteRule()) + .matches( + physicalHashAggregate( + physicalHashAggregate( + physicalHashAggregate( + physicalProject( + physicalHashAggregate() + .when(agg -> agg.getAggPhase().equals(AggPhase.LOCAL)) + .when(agg -> agg.getGroupByExpressions().get(0).equals(age) + && agg.getGroupByExpressions().get(1).equals(id)) + ) + .when(proj -> proj.getProjects().get(2).child(0) instanceof Cast) + ) + .when(agg -> agg.getAggPhase().equals(AggPhase.GLOBAL)) + .when(agg -> agg.getGroupByExpressions().get(0).equals(age) && agg.getGroupByExpressions().size() == 2) + ) + .when(agg -> agg.getAggPhase().equals(AggPhase.DISTINCT_LOCAL)) + .when(agg -> agg.getGroupByExpressions().get(0).equals(age)) + ) + .when(agg -> agg.getAggPhase().equals(AggPhase.DISTINCT_GLOBAL)) + .when(agg -> agg.getGroupByExpressions().get(0).equals(age)) + ); + } + + private Rule skewRewriteRule() { + return new AggregateStrategies().buildRules() + .stream() + .filter(rule -> rule.getRuleType() == RuleType.COUNT_DISTINCT_AGG_SKEW_REWRITE) + .findFirst() + .get(); + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/JoinSplitForNullSkewTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/JoinSplitForNullSkewTest.java new file mode 100644 index 00000000000000..98a8555597479b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/JoinSplitForNullSkewTest.java @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.IsNull; +import org.apache.doris.nereids.trees.expressions.Not; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Test; + +public class JoinSplitForNullSkewTest extends TestWithFeService implements MemoPatternMatchSupported { + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + connectContext.setDatabase("test"); + createTables( + "create table split_join_for_null_skew_t(a int null, b int not null, c varchar(10) null, d date, dt datetime)" + + "distributed by hash(a) properties('replication_num'='1')" + ); + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION, LOGICAL_SEMI_JOIN_COMMUTE"); + } + + @Test + void testRewriteLeftJoin() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.c,t2.dt from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a=t2.a order by 1,2,3,4") + .rewrite() + .printlnTree() + .matches( + logicalUnion( + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof IsNull)), + logicalProject( + logicalJoin( + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof Not)), + any() + ) + ) + ) + ); + } + + @Test + void testRewriteLeftJoinSelectAll() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ * from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a=t2.a order by 1,2,3,4") + .rewrite() + .printlnTree() + .matches( + logicalUnion( + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof IsNull)), + logicalProject( + logicalJoin( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof Not), + any() + ) + ) + ) + ); + } + + @Test + void testRewriteLeftJoinSelectSomeColumns() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.dt,t2.c from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.c=t2.c order by 1,2,3,4") + .rewrite() + .printlnTree() + .matches(logicalUnion(logicalProject(logicalFilter(any()).when(f -> { + if (f.getConjuncts().size() != 1) { + return false; + } + Expression firstConjunct = f.getConjuncts().iterator().next(); + if (!(firstConjunct instanceof IsNull)) { + return false; + } + if (!(firstConjunct.child(0) instanceof SlotReference)) { + return false; + } + if (!((SlotReference) firstConjunct.child(0)).getQualifiedName().equals("internal.test.t1.c")) { + return false; + } + return true; + })), + logicalProject(logicalJoin(logicalProject(logicalFilter(any()).when(f -> { + if (f.getConjuncts().size() != 1) { + return false; + } + Expression firstConjunct = f.getConjuncts().iterator().next(); + if (!(firstConjunct instanceof Not) || !(firstConjunct.child(0) instanceof IsNull)) { + return false; + } + Expression expr = firstConjunct.child(0).child(0); + if (!(expr instanceof SlotReference)) { + return false; + } + if (!((SlotReference) expr).getQualifiedName().equals("internal.test.t1.c")) { + return false; + } + return true; + })), any())))); + } + + @Test + void testRewriteRightJoinSelectSomeColumns() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.dt,t2.c from split_join_for_null_skew_t t1 right join split_join_for_null_skew_t t2 on t1.c=t2.c order by 1,2,3,4") + .rewrite() + .printlnTree() + .matches(logicalUnion(logicalProject(logicalFilter(any()).when(f -> { + if (f.getConjuncts().size() != 1) { + return false; + } + Expression firstConjunct = f.getConjuncts().iterator().next(); + if (!(firstConjunct instanceof IsNull)) { + return false; + } + if (!(firstConjunct.child(0) instanceof SlotReference)) { + return false; + } + if (!((SlotReference) firstConjunct.child(0)).getQualifiedName().equals("internal.test.t2.c")) { + return false; + } + return true; + })), + logicalProject(logicalJoin(any(), logicalProject(logicalFilter(any()).when(f -> { + if (f.getConjuncts().size() != 1) { + return false; + } + Expression firstConjunct = f.getConjuncts().iterator().next(); + if (!(firstConjunct instanceof Not) || !(firstConjunct.child(0) instanceof IsNull)) { + return false; + } + Expression expr = firstConjunct.child(0).child(0); + if (!(expr instanceof SlotReference)) { + return false; + } + if (!((SlotReference) expr).getQualifiedName().equals("internal.test.t2.c")) { + return false; + } + return true; + })))))); + } + + @Test + void testRewriteWhenLeftChildHasIsNotNullFilter() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.dt,t1.b,t2.a,t2.b from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a=t2.a where t1.a is not null order by 1,2,3,4") + .rewrite() + .printlnTree() + .nonMatch(logicalUnion()); + } + + @Test + void testRewriteRightJoin() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.c,t2.dt from split_join_for_null_skew_t t1 right join split_join_for_null_skew_t t2 on t1.a=t2.a order by 1,2,3,4") + .rewrite() + .printlnTree() + .matches( + logicalUnion( + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof IsNull)), + logicalProject( + logicalJoin( + any(), + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof Not)) + ) + ) + ) + ); + } + + @Test + void testRewriteRightJoinWhenSplitExprNonNullable() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.c,t2.dt from split_join_for_null_skew_t t1 right join split_join_for_null_skew_t t2 on t1.b=t2.b order by 1,2,3,4") + .rewrite() + .printlnTree() + .nonMatch( + logicalUnion( + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof IsNull)), + logicalProject( + logicalJoin( + any(), + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof Not)) + ) + ) + ) + ); + } + + @Test + void testRewriteRightJoinSelectAll() { + PlanChecker.from(connectContext) + .analyze("select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ * from split_join_for_null_skew_t t1 right join split_join_for_null_skew_t t2 on t1.a=t2.a order by 1,2,3,4") + .rewrite() + .printlnTree() + .matches( + logicalUnion( + logicalProject( + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof IsNull)), + logicalProject( + logicalJoin( + any(), + logicalFilter().when(f -> f.getConjuncts().size() == 1 && f.getConjuncts().iterator().next() instanceof Not)) + ) + ) + ); + } +} diff --git a/regression-test/data/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.out b/regression-test/data/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.out new file mode 100644 index 00000000000000..a6fb0e886221f3 --- /dev/null +++ b/regression-test/data/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.out @@ -0,0 +1,36 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !hint -- +1 2 +2 2 +3 6 + +-- !hint_other_agg_func -- +1 2 3 +2 2 2 +3 6 6 + +-- !hint_other_agg_func_expr -- +1 2 +2 2 +3 6 + +-- !hint_same_column_with_group_by -- +2 1 +3 1 +4 1 +5 1 +6 1 +7 1 +8 1 +10 1 + +-- !hint_same_column_with_group_by_expr -- +2 1 +3 1 +4 1 +5 1 +6 1 +7 1 +8 1 +10 1 + diff --git a/regression-test/data/nereids_rules_p0/split_join_for_null_skew/split_join_for_null_skew.out b/regression-test/data/nereids_rules_p0/split_join_for_null_skew/split_join_for_null_skew.out new file mode 100644 index 00000000000000..321a738ad18256 --- /dev/null +++ b/regression-test/data/nereids_rules_p0/split_join_for_null_skew/split_join_for_null_skew.out @@ -0,0 +1,424 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !int -- +\N 103 \N \N +\N 107 \N \N +1 100 apple 2023-01-01T10:00 +1 100 apple 2023-01-01T10:00 +1 100 apple 2023-01-01T10:00 +1 100 apple 2023-01-01T10:00 +2 101 banana 2023-01-02T11:00 +3 102 cherry 2023-01-03T12:00 +3 102 cherry 2023-01-03T12:00 +3 102 cherry 2023-01-03T12:00 +3 102 cherry 2023-01-03T12:00 +4 104 elderberry 2023-01-05T14:00 +5 105 \N 2023-01-06T15:00 +5 105 \N 2023-01-06T15:00 +5 105 \N 2023-01-06T15:00 +5 105 \N 2023-01-06T15:00 +6 106 fig 2023-01-07T16:00 + +-- !on_condition_has_plus_expr -- +\N 103 \N \N +\N 107 \N \N +1 100 2 101 +1 100 2 101 +2 101 3 102 +2 101 3 102 +3 102 4 104 +3 102 4 104 +4 104 5 105 +4 104 5 105 +5 105 6 106 +5 105 6 106 +6 106 \N \N + +-- !on_condition_has_abs_expr -- +\N 103 \N \N +\N 107 \N \N +1 100 1 100 +1 100 1 100 +1 100 1 100 +1 100 1 100 +2 101 2 101 +3 102 3 102 +3 102 3 102 +3 102 3 102 +3 102 3 102 +4 104 4 104 +5 105 5 105 +5 105 5 105 +5 105 5 105 +5 105 5 105 +6 106 6 106 + +-- !varchar -- +\N 103 2023-01-04T13:00 date +\N 107 2023-01-08T17:00 grape +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +2 101 2023-01-02T11:00 banana +3 102 2023-01-03T12:00 cherry +3 102 2023-01-03T12:00 cherry +3 102 2023-01-03T12:00 cherry +3 102 2023-01-03T12:00 cherry +4 104 2023-01-05T14:00 elderberry +5 105 \N \N +5 105 \N \N +6 106 2023-01-07T16:00 fig + +-- !datetime -- +100 100 1 apple +100 100 1 apple +100 100 1 apple +100 100 1 apple +101 101 2 banana +102 102 3 cherry +102 102 3 cherry +102 102 3 cherry +102 102 3 cherry +103 103 \N date +104 104 4 elderberry +105 105 5 \N +105 105 5 \N +105 105 5 \N +105 105 5 \N +106 106 6 fig +107 107 \N grape + +-- !int_has_filter -- +1 100 2023-01-01T10:00 100 +1 100 2023-01-01T10:00 100 +1 100 2023-01-01T10:00 100 +1 100 2023-01-01T10:00 100 + +-- !int_has_is_not_null -- +2023-01-01T10:00 100 1 100 +2023-01-01T10:00 100 1 100 +2023-01-01T10:00 100 1 100 +2023-01-01T10:00 100 1 100 +2023-01-02T11:00 101 2 101 +2023-01-03T12:00 102 3 102 +2023-01-03T12:00 102 3 102 +2023-01-03T12:00 102 3 102 +2023-01-03T12:00 102 3 102 +2023-01-05T14:00 104 4 104 +2023-01-06T15:00 105 5 105 +2023-01-06T15:00 105 5 105 +2023-01-06T15:00 105 5 105 +2023-01-06T15:00 105 5 105 +2023-01-07T16:00 106 6 106 + +-- !int_has_is_null -- +2023-01-04T13:00 103 \N \N +2023-01-08T17:00 107 \N \N + +-- !multi_slots_split_expr -- +\N 103 \N \N +\N 107 \N \N +1 100 \N \N +1 100 \N \N +2 101 \N \N +3 102 \N \N +3 102 \N \N +4 104 \N \N +5 105 \N \N +5 105 \N \N +6 106 \N \N + +-- !have_nonequal_join_conjuncts -- +\N 103 \N \N +\N 107 \N \N +1 100 \N \N +1 100 \N \N +2 101 \N \N +3 102 \N \N +3 102 \N \N +4 104 \N \N +5 105 \N \N +5 105 \N \N +6 106 \N \N + +-- !test_multi_left_join -- +\N 103 \N \N +\N 107 \N \N +1 100 \N \N +1 100 \N \N +1 100 \N \N +1 100 \N \N +2 101 \N \N +3 102 \N \N +3 102 \N \N +3 102 \N \N +3 102 \N \N +4 104 \N \N +5 105 \N \N +5 105 \N \N +5 105 \N \N +5 105 \N \N +6 106 \N \N + +-- !test_upper_ref_agg -- +1 100 +2 101 +3 102 +4 104 +5 105 +6 106 + +-- !right_join_varchar -- +\N \N 2023-01-06T15:00 \N +\N \N 2023-01-06T15:00 \N +\N 103 2023-01-04T13:00 date +\N 107 2023-01-08T17:00 grape +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +2 101 2023-01-02T11:00 banana +3 102 2023-01-03T12:00 cherry +3 102 2023-01-03T12:00 cherry +3 102 2023-01-03T12:00 cherry +3 102 2023-01-03T12:00 cherry +4 104 2023-01-05T14:00 elderberry +6 106 2023-01-07T16:00 fig + +-- !right_join_datetime -- +100 100 1 apple +100 100 1 apple +100 100 1 apple +100 100 1 apple +101 101 2 banana +102 102 3 cherry +102 102 3 cherry +102 102 3 cherry +102 102 3 cherry +103 103 \N date +104 104 4 elderberry +105 105 5 \N +105 105 5 \N +105 105 5 \N +105 105 5 \N +106 106 6 fig +107 107 \N grape + +-- !right_join_varchar_has_filter -- +\N \N 2023-01-03T12:00 cherry +\N \N 2023-01-03T12:00 cherry +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple + +-- !right_join_has_filter_int -- +\N \N 2023-01-03T12:00 cherry +\N \N 2023-01-03T12:00 cherry +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple + +-- !right_join_split_expt_not_null_not_transform -- +\N \N 2023-01-03T12:00 cherry +\N \N 2023-01-03T12:00 cherry +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple +1 100 2023-01-01T10:00 apple + +-- !on_condition_has_plus_expr_right_join -- +\N \N \N 103 +\N \N \N 107 +\N \N 1 100 +\N \N 1 100 +\N \N 2 101 +\N \N 3 102 +\N \N 3 102 +\N \N 4 104 +\N \N 5 105 +\N \N 5 105 +\N \N 6 106 + +-- !on_condition_has_abs_expr_right_join -- +\N \N \N 103 +\N \N \N 107 +1 100 1 100 +1 100 1 100 +1 100 1 100 +1 100 1 100 +2 101 2 101 +3 102 3 102 +3 102 3 102 +3 102 3 102 +3 102 3 102 +4 104 4 104 +5 105 5 105 +5 105 5 105 +5 105 5 105 +5 105 5 105 +6 106 6 106 + +-- !test_join_key_has_cast -- + +-- !multi_left_join_and_non_equal -- +\N 103 \N \N 2023-01-04 +\N 107 \N \N \N +1 100 \N \N 2023-01-01 +1 100 \N \N 2023-01-01 +2 101 \N banana 2023-01-02 +3 102 \N \N 2023-01-03 +3 102 \N \N 2023-01-03 +4 104 \N \N 2023-01-05 +5 105 \N elderberry \N +5 105 \N elderberry \N +6 106 \N \N \N + +-- !multi_right_join_and_non_equal -- +\N 102 \N \N +\N 102 \N \N +\N 103 \N \N +\N 104 \N \N +\N 106 \N \N +\N 107 \N \N +1 100 1 apple +1 100 1 apple +1 100 1 apple +1 100 1 apple +2 101 2 banana +5 105 5 elderberry +5 105 5 elderberry +5 105 5 elderberry +5 105 5 elderberry + +-- !mixed_join_types -- +\N 104 \N 2023-01-04 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +2 101 banana 2023-01-02 +3 \N \N 2023-01-03 +3 \N \N 2023-01-03 +4 105 \N 2023-01-05 +4 105 \N 2023-01-05 + +-- !non_equi_join_chain -- +\N \N \N \N +\N \N \N \N +1 \N 201 \N +1 \N 201 \N +1 \N 203 \N +1 \N 203 \N +1 \N 204 \N +1 \N 204 \N +2 \N 203 \N +2 \N 204 \N +3 \N 203 \N +3 \N 203 \N +3 \N 204 \N +3 \N 204 \N +4 \N 204 \N +5 \N \N \N +5 \N \N \N +6 \N \N \N + +-- !mixed_inner_left -- +\N 104 \N 2023-01-04 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +2 101 banana 2023-01-02 +3 \N \N 2023-01-03 +3 \N \N 2023-01-03 +4 105 \N 2023-01-05 +4 105 \N 2023-01-05 + +-- !mixed_inner_right -- +\N 102 \N \N +\N 102 \N \N +\N 103 \N \N +\N 104 \N \N +\N 106 \N \N +\N 107 \N \N +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +1 100 apple 2023-01-01 +2 101 banana 2023-01-02 +5 105 elderberry 2023-01-05 +5 105 elderberry 2023-01-05 +5 105 elderberry 2023-01-05 +5 105 elderberry 2023-01-05 + +-- !join_with_agg -- +\N \N +1 200.0 +2 201.0 +3 \N +4 203.0 +5 204.0 +6 \N + +-- !join_with_window -- +\N \N 1 +\N \N 2 +1 100 1 +1 100 2 +1 100 3 +1 100 4 +2 101 1 +3 102 1 +3 102 2 +3 102 3 +3 102 4 +4 104 1 +5 105 1 +5 105 2 +5 105 3 +5 105 4 +6 106 1 + +-- !join_with_sort_filter -- +5 304 elderberry +5 304 elderberry +2 301 banana +\N 302 \N +\N 303 \N + +-- !multi_column_expr_join -- +\N 103 \N \N +\N 107 \N \N +1 100 \N \N +1 100 \N \N +2 101 \N \N +3 102 \N \N +3 102 \N \N +4 104 \N \N +5 105 5 204 +5 105 5 204 +6 106 4 203 + +-- !mixed_left_right_condition -- +\N 302 \N +\N 303 \N +\N 304 \N +1 300 apple +1 300 apple +2 301 banana + +-- !complex_expr_join -- +\N 103 cherry +\N 107 cherry +1 100 apple +1 100 apple +2 101 banana +3 102 \N +3 102 \N +4 104 \N +5 105 elderberry +5 105 elderberry +6 106 \N + diff --git a/regression-test/suites/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.groovy b/regression-test/suites/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.groovy new file mode 100644 index 00000000000000..e206c10724a450 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.groovy @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_agg_skew_hint") { + sql "drop table if exists test_skew_hint" + sql "create table test_skew_hint (a int, b int, c int) distributed by hash(a) properties('replication_num'='1');" + sql "insert into test_skew_hint values(1,2,3),(1,2,4),(1,3,4),(2,3,5),(2,4,5),(3,4,5),(3,5,6),(3,6,7),(3,7,8),(3,8,9),(3,10,11);" + qt_hint "select a , count(distinct [skew] b)from test_skew_hint group by a order by 1,2" + qt_hint_other_agg_func "select a , count(distinct [skew] b), count(a) from test_skew_hint group by a order by 1,2" + qt_hint_other_agg_func_expr "select a , count(distinct [skew] b+1) from test_skew_hint group by a order by 1,2" + qt_hint_same_column_with_group_by "select b , count(distinct [skew] b) from test_skew_hint group by b order by 1,2" + qt_hint_same_column_with_group_by_expr "select b , count(distinct [skew] b+1) from test_skew_hint group by b order by 1,2" +} \ No newline at end of file diff --git a/regression-test/suites/nereids_rules_p0/split_join_for_null_skew/split_join_for_null_skew.groovy b/regression-test/suites/nereids_rules_p0/split_join_for_null_skew/split_join_for_null_skew.groovy new file mode 100644 index 00000000000000..b933d81272931a --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/split_join_for_null_skew/split_join_for_null_skew.groovy @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("split_join_for_null_skew") { + sql "drop table if exists split_join_for_null_skew_t" + sql """create table split_join_for_null_skew_t(a int null, b int not null, c varchar(10) null, d date, dt datetime) + distributed by hash(a) properties("replication_num"="1"); + """ + sql """ + INSERT INTO split_join_for_null_skew_t (a, b, c, d, dt) VALUES + (1, 100, 'apple', '2023-01-01', '2023-01-01 10:00:00'), + (1, 100, 'apple', '2023-01-01', '2023-01-01 10:00:00'), + (2, 101, 'banana', '2023-01-02', '2023-01-02 11:00:00'), + (3, 102, 'cherry', '2023-01-03', '2023-01-03 12:00:00'), + (3, 102, 'cherry', '2023-01-03', '2023-01-03 12:00:00'), + (NULL, 103, 'date', '2023-01-04', '2023-01-04 13:00:00'), + (4, 104, 'elderberry', '2023-01-05', '2023-01-05 14:00:00'), + (5, 105, NULL, '2023-01-06', '2023-01-06 15:00:00'), + (5, 105, NULL, '2023-01-06', '2023-01-06 15:00:00'), + (6, 106, 'fig', '2023-01-07', '2023-01-07 16:00:00'), + (NULL, 107, 'grape', '2023-01-08', '2023-01-08 17:00:00'); + """ + + // left join on slot + qt_int "select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.c,t2.dt from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a=t2.a order by 1,2,3,4" + qt_on_condition_has_plus_expr "select/*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.a,t2.b from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a+1=t2.a order by 1,2,3,4" + qt_on_condition_has_abs_expr "select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.a,t2.b from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on abs(t1.a)=t2.a order by 1,2,3,4" + qt_varchar "select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.dt,t2.c from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.c=t2.c order by 1,2,3,4" + qt_datetime "select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t2.b,t1.b,t2.a,t2.c from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.dt=t2.dt order by 1,2,3,4" + + // left join child has filter + qt_int_has_filter "select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.dt,t2.b from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a=t2.a where t1.a =1 order by 1,2,3,4" + qt_int_has_is_not_null "select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.dt,t1.b,t2.a,t2.b from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a=t2.a where t1.a is not null order by 1,2,3,4" + qt_int_has_is_null "select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.dt,t1.b,t2.a,t2.b from split_join_for_null_skew_t t1 left join split_join_for_null_skew_t t2 on t1.a=t2.a where t1.a is null order by 1,2,3,4" + + // split expr has multi slots + qt_multi_slots_split_expr """select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.c,t2.dt from split_join_for_null_skew_t t1 + left join split_join_for_null_skew_t t2 on t1.a+t1.b=t2.a order by 1,2,3,4;""" + qt_have_nonequal_join_conjuncts """select /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ t1.a,t1.b,t2.c,t2.dt from split_join_for_null_skew_t t1 + left join split_join_for_null_skew_t t2 on t1.a=t2.a and t1.b 200 + LEFT JOIN t4 ON t1.d = t4.d + ORDER BY 1,2,3,4,5; + """ + + qt_multi_right_join_and_non_equal """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t2.b, t3.x, t4.c + FROM split_join_for_null_skew_t t1 + RIGHT JOIN t3 ON t1.a = t3.x + RIGHT JOIN t4 ON t3.z = t4.c + RIGHT JOIN split_join_for_null_skew_t t2 ON t4.a = t2.a + ORDER BY 1,2,3,4; + """ + qt_mixed_join_types """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t2.b, t3.z, t4.d + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON t1.a = t3.x + RIGHT JOIN t4 ON t1.d = t4.d + LEFT JOIN split_join_for_null_skew_t t2 ON t4.a = t2.a + ORDER BY 1,2,3,4; + """ + + qt_non_equi_join_chain """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t2.b, t3.y, t4.c + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON t1.a < t3.x + LEFT JOIN t4 ON t3.y > t4.b + LEFT JOIN split_join_for_null_skew_t t2 ON t4.a = t2.a + ORDER BY 1,2,3,4; + """ + + qt_mixed_inner_left """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t2.b, t3.z, t4.d + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON t1.a = t3.x + INNER JOIN t4 ON t1.d = t4.d + LEFT JOIN split_join_for_null_skew_t t2 ON t4.a = t2.a + ORDER BY 1,2,3,4; + """ + + qt_mixed_inner_right """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t2.b, t3.z, t4.d + FROM split_join_for_null_skew_t t1 + RIGHT JOIN t3 ON t1.a = t3.x + INNER JOIN t4 ON t3.z = t4.c + RIGHT JOIN split_join_for_null_skew_t t2 ON t4.a = t2.a + ORDER BY 1,2,3,4; + """ + + qt_join_with_agg """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, AVG(t3.y) + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON t1.a = t3.x + LEFT JOIN t4 ON t3.x = t4.a + GROUP BY t1.a + ORDER BY 1,2; + """ + + qt_join_with_window """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t2.b, + ROW_NUMBER() OVER (PARTITION BY t1.a ORDER BY t2.dt) + FROM split_join_for_null_skew_t t1 + LEFT JOIN split_join_for_null_skew_t t2 ON t1.a = t2.a + ORDER BY 1,2,3; + """ + + qt_join_with_sort_filter """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t4.b, t3.z + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON t1.a = t3.x + RIGHT JOIN t4 ON t3.z = t4.c + WHERE t4.b > 300 + ORDER BY t1.a DESC, t4.b; + """ + + qt_multi_column_expr_join """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t1.b, t3.x, t3.y + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON t1.a + t3.x = 10 AND t1.b < t3.y + ORDER BY 1,2,3,4; + """ + + qt_mixed_left_right_condition """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t4.b, t3.z + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON t1.a = t3.x AND t3.y > t1.b + RIGHT JOIN t4 ON t1.d = t4.d AND t4.a = t3.x + ORDER BY 1,2,3; + """ + + qt_complex_expr_join """ + SELECT /*+use_cbo_rule(JOIN_SPLIT_FOR_NULL_SKEW)*/ + t1.a, t1.b, t3.z + FROM split_join_for_null_skew_t t1 + LEFT JOIN t3 ON COALESCE(t1.a, 0) = COALESCE(t3.x, 0) + ORDER BY 1,2,3; + """ +} \ No newline at end of file