From 6b61a2a43ce4e1211f3d89d04f14455dce28cd50 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Tue, 20 Aug 2024 17:40:06 +0800 Subject: [PATCH] query cache in fe --- .../java/org/apache/doris/analysis/Expr.java | 25 +- .../doris/analysis/FunctionCallExpr.java | 12 + .../org/apache/doris/analysis/SlotRef.java | 12 + .../org/apache/doris/catalog/FunctionSet.java | 26 + .../apache/doris/catalog/PartitionKey.java | 60 ++ .../doris/catalog/RangePartitionInfo.java | 16 + .../apache/doris/nereids/NereidsPlanner.java | 35 +- .../org/apache/doris/nereids/util/Utils.java | 65 +++ .../apache/doris/planner/AggregationNode.java | 73 +++ .../apache/doris/planner/OlapScanNode.java | 122 ++++- .../apache/doris/planner/PlanFragment.java | 11 + .../org/apache/doris/planner/PlanNode.java | 93 ++++ .../NormalizedPartitionPredicates.java | 56 ++ .../doris/planner/normalize/Normalizer.java | 40 ++ .../PartitionRangePredicateNormalizer.java | 220 ++++++++ .../planner/normalize/PredicateToRange.java | 178 ++++++ .../normalize/QueryCacheNormalizer.java | 230 ++++++++ .../java/org/apache/doris/qe/Coordinator.java | 12 +- .../org/apache/doris/qe/SessionVariable.java | 48 ++ .../planner/QueryCacheNormalizerTest.java | 515 ++++++++++++++++++ .../doris/utframe/TestWithFeService.java | 1 + gensrc/thrift/Normalization.thrift | 61 +++ gensrc/thrift/Planner.thrift | 5 +- gensrc/thrift/QueryCache.thrift | 53 ++ 24 files changed, 1950 insertions(+), 19 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/normalize/NormalizedPartitionPredicates.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/normalize/Normalizer.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PartitionRangePredicateNormalizer.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PredicateToRange.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java create mode 100644 gensrc/thrift/Normalization.thrift create mode 100644 gensrc/thrift/QueryCache.thrift diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 6e9e713df9df9b..0b5e68e1f30346 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -44,6 +44,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.nereids.util.Utils; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.qe.SessionVariable; import org.apache.doris.rewrite.mvrewrite.MVExprEquivalent; import org.apache.doris.statistics.ExprStats; @@ -996,8 +997,12 @@ public void setFn(Function fn) { this.fn = fn; } - // Append a flattened version of this expr, including all children, to 'container'. protected void treeToThriftHelper(TExpr container) { + treeToThriftHelper(container, ((expr, exprNode) -> expr.toThrift(exprNode))); + } + + // Append a flattened version of this expr, including all children, to 'container'. + protected void treeToThriftHelper(TExpr container, ExprVisitor visitor) { TExprNode msg = new TExprNode(); msg.type = type.toThrift(); msg.num_children = children.size(); @@ -1009,13 +1014,17 @@ protected void treeToThriftHelper(TExpr container) { } msg.output_scale = getOutputScale(); msg.setIsNullable(nullableFromNereids.isPresent() ? nullableFromNereids.get() : isNullable()); - toThrift(msg); + visitor.visit(this, msg); container.addToNodes(msg); for (Expr child : children) { - child.treeToThriftHelper(container); + child.treeToThriftHelper(container, visitor); } } + public interface ExprVisitor { + void visit(Expr expr, TExprNode exprNode); + } + public static Type getAssignmentCompatibleType(List children) { Type assignmentCompatibleType = Type.INVALID; for (int i = 0; i < children.size() @@ -2193,6 +2202,16 @@ public String getStringValueForArray(FormatOptions options) { return null; } + public final TExpr normalize(Normalizer normalizer) { + TExpr result = new TExpr(); + treeToThriftHelper(result, (expr, texprNode) -> expr.normalize(texprNode, normalizer)); + return result; + } + + protected void normalize(TExprNode msg, Normalizer normalizer) { + this.toThrift(msg); + } + public static Expr getFirstBoundChild(Expr expr, List tids) { for (Expr child : expr.getChildren()) { if (child.isBoundByTupleIds(tids)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 4041fa4873b5be..c7a892630fab05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -42,6 +42,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TExprNode; @@ -2406,6 +2407,17 @@ public void readFields(DataInput in) throws IOException { isMergeAggFn = in.readBoolean(); } + @Override + protected void normalize(TExprNode msg, Normalizer normalizer) { + String functionName = fnName.getFunction().toUpperCase(); + if (FunctionSet.nonDeterministicFunctions.contains(functionName) + || "NOW".equals(functionName) + || (FunctionSet.nonDeterministicTimeFunctions.contains(functionName) && children.isEmpty())) { + throw new IllegalStateException("Can not normalize non deterministic functions"); + } + super.normalize(msg, normalizer); + } + public static FunctionCallExpr read(DataInput in) throws IOException { FunctionCallExpr func = new FunctionCallExpr(); func.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java index 2fe2b7b411500a..8db81c9ac10f34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -30,6 +30,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.ToSqlContext; +import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TExprNodeType; @@ -353,6 +354,17 @@ protected void toThrift(TExprNode msg) { msg.setLabel(label); } + @Override + protected void normalize(TExprNode msg, Normalizer normalizer) { + msg.node_type = TExprNodeType.SLOT_REF; + // we should eliminate the different tuple id to reuse query cache + msg.slot_ref = new TSlotRef( + normalizer.normalizeSlotId(desc.getId().asInt()), + 0 + ); + msg.slot_ref.setColUniqueId(desc.getUniqueId()); + } + @Override public void markAgg() { desc.setIsAgg(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index 74b2778373eacd..4165f9362214ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -155,6 +155,32 @@ public boolean isNullResultWithOneNullParamFunctions(String funcName) { .put(Type.DECIMAL128, Type.DECIMAL128) .build(); + public static final Set nonDeterministicFunctions = + ImmutableSet.builder() + .add("RAND") + .add("RANDOM") + .add("RANDOM_BYTES") + .add("CONNECTION_ID") + .add("DATABASE") + .add("USER") + .add("UUID") + .add("CURRENT_USER") + .add("UUID_NUMERIC") + .build(); + + public static final Set nonDeterministicTimeFunctions = + ImmutableSet.builder() + .add("NOW") + .add("CURDATE") + .add("CURRENT_DATE") + .add("UTC_TIMESTAMP") + .add("CURTIME") + .add("CURRENT_TIMESTAMP") + .add("CURRENT_TIME") + .add("UNIX_TIMESTAMP") + .add() + .build(); + private static final Map STDDEV_UPDATE_SYMBOL = ImmutableMap.builder() .put(Type.TINYINT, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java index d76293d6a75d25..f9640ccae14023 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -55,7 +55,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; +import java.time.LocalDateTime; import java.util.List; import java.util.stream.Collectors; import java.util.zip.CRC32; @@ -304,6 +306,64 @@ public int compareTo(PartitionKey other) { return Integer.compare(thisKeyLen, otherKeyLen); } + public PartitionKey successor() throws AnalysisException { + Preconditions.checkState( + keys.size() == 1, + "Only support compute successor for one partition column" + ); + LiteralExpr literal = keys.get(0); + PrimitiveType type = types.get(0); + + PartitionKey successor = new PartitionKey(); + + switch (type) { + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + long maxValueOfType = (1L << ((type.getSlotSize() << 3 /* multiply 8 bit */) - 1)) - 1L; + long successorInt = ((IntLiteral) literal).getValue(); + successorInt += successorInt < maxValueOfType ? 1 : 0; + successor.pushColumn(new IntLiteral(successorInt, Type.fromPrimitiveType(type)), type); + return successor; + case LARGEINT: + BigInteger maxValue = BigInteger.ONE.shiftLeft(127).subtract(BigInteger.ONE); + BigInteger successorLargeInt = (BigInteger) literal.getRealValue(); + successorLargeInt = successorLargeInt.add( + successorLargeInt.compareTo(maxValue) < 0 ? BigInteger.ONE : BigInteger.ZERO + ); + successor.pushColumn(new LargeIntLiteral(successorLargeInt), type); + return successor; + case DATE: + case DATEV2: + case DATETIME: + case DATETIMEV2: + DateLiteral dateLiteral = (DateLiteral) literal; + LocalDateTime successorDateTime = LocalDateTime.of( + (int) dateLiteral.getYear(), + (int) dateLiteral.getMonth(), + (int) dateLiteral.getDay(), + (int) dateLiteral.getHour(), + (int) dateLiteral.getMinute(), + (int) dateLiteral.getSecond(), + (int) dateLiteral.getMicrosecond() * 1000 + ); + if (type == PrimitiveType.DATE || type == PrimitiveType.DATEV2) { + successorDateTime = successorDateTime.plusDays(1); + } else if (type == PrimitiveType.DATETIME) { + successorDateTime = successorDateTime.plusSeconds(1); + } else { + int scale = Math.min(6, Math.max(0, ((ScalarType) literal.getType()).getScalarScale())); + long nanoSeconds = BigInteger.TEN.pow(9 - scale).longValue(); + successorDateTime = successorDateTime.plusNanos(nanoSeconds); + } + successor.pushColumn(new DateLiteral(successorDateTime, literal.getType()), type); + return successor; + default: + throw new AnalysisException("Unsupported type: " + type); + } + } + // return: ("100", "200", "300") public String toSql() { StringBuilder sb = new StringBuilder("("); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java index 87e1d5d19b438b..a3138253d4162b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java @@ -39,6 +39,7 @@ import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -66,6 +67,21 @@ public RangePartitionInfo(boolean isAutoCreatePartitions, ArrayList exprs, } } + public Map getPartitionItems(Collection partitionIds) { + Map columnRanges = Maps.newLinkedHashMapWithExpectedSize(partitionIds.size()); + for (Long partitionId : partitionIds) { + PartitionItem partitionItem = idToItem.get(partitionId); + if (partitionItem == null) { + partitionItem = idToTempItem.get(partitionId); + } + if (partitionItem == null) { + throw new IllegalStateException("Can not found partition item: " + partitionId); + } + columnRanges.put(partitionId, partitionItem); + } + return columnRanges; + } + @Override public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc, boolean isTemp) throws DdlException { Range newRange = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 9d32af433b5849..a304fe36062c80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -27,6 +27,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.mysql.FieldInfo; import org.apache.doris.nereids.CascadesContext.Lock; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -65,16 +66,20 @@ import org.apache.doris.planner.Planner; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.normalize.QueryCacheNormalizer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ResultSet; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TQueryCacheParam; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.commons.codec.binary.Hex; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -343,10 +348,11 @@ private void splitFragments(PhysicalPlan resultPlan) { if (statementContext.getConnectContext().getExecutor() != null) { statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsTranslateTime(); } - if (cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) { + SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable(); + if (sessionVariable.isEnableNereidsTrace()) { CounterEvent.clearCounter(); } - if (cascadesContext.getConnectContext().getSessionVariable().isPlayNereidsDump()) { + if (sessionVariable.isPlayNereidsDump()) { return; } PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan); @@ -355,8 +361,31 @@ private void splitFragments(PhysicalPlan resultPlan) { physicalRelations.addAll(planTranslatorContext.getPhysicalRelations()); descTable = planTranslatorContext.getDescTable(); fragments = new ArrayList<>(planTranslatorContext.getPlanFragments()); + + boolean enableQueryCache = sessionVariable.getEnableQueryCache(); + String queryId = DebugUtil.printId(cascadesContext.getConnectContext().queryId()); for (int seq = 0; seq < fragments.size(); seq++) { - fragments.get(seq).setFragmentSequenceNum(seq); + PlanFragment fragment = fragments.get(seq); + fragment.setFragmentSequenceNum(seq); + if (enableQueryCache) { + try { + QueryCacheNormalizer normalizer = new QueryCacheNormalizer(fragment, descTable); + Optional queryCacheParam = + normalizer.normalize(cascadesContext.getConnectContext()); + if (queryCacheParam.isPresent()) { + fragment.queryCacheParam = queryCacheParam.get(); + // after commons-codec 1.14 (include), Hex.encodeHexString will change ByteBuffer.pos, + // so we should copy a new byte buffer to print it + ByteBuffer digestCopy = fragment.queryCacheParam.digest.duplicate(); + LOG.info("Use query cache for fragment {}, node id: {}, digest: {}, queryId: {}", + seq, + fragment.queryCacheParam.node_id, + Hex.encodeHexString(digestCopy), queryId); + } + } catch (Throwable t) { + // do nothing + } + } } // set output exprs logicalPlanAdapter.setResultExprs(root.getOutputExprs()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java index ac098014cb65d1..42b99f6effdb84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java @@ -327,6 +327,71 @@ private static List> doAllCombinations(List> lists) { ).collect(ImmutableList.toImmutableList()); } + /** getAllCombinations */ + public static List> getAllCombinations(List list, int itemNum) { + List> result = Lists.newArrayList(); + generateCombinations(list, itemNum, 0, Lists.newArrayList(), result); + return result; + } + + private static void generateCombinations( + List list, int n, int start, List current, List> result) { + if (current.size() == n) { + result.add(new ArrayList<>(current)); + return; + } + + for (int i = start; i < list.size(); i++) { + current.add(list.get(i)); + generateCombinations(list, n, i + 1, current, result); + current.remove(current.size() - 1); + } + } + + public static List> allPermutations(List list) { + List> result = new ArrayList<>(); + generatePermutations(new ArrayList<>(list), new ArrayList<>(), result); + return result; + } + + private static void generatePermutations(List list, List current, List> result) { + if (!current.isEmpty()) { + result.add(new ArrayList<>(current)); + } + + for (int i = 0; i < list.size(); i++) { + T element = list.remove(i); + current.add(element); + generatePermutations(list, current, result); + current.remove(current.size() - 1); + list.add(i, element); + } + } + + /** permutations */ + public static List> permutations(List list) { + list = new ArrayList<>(list); + List> result = new ArrayList<>(); + if (list.isEmpty()) { + result.add(new ArrayList<>()); + return result; + } + + T firstElement = list.get(0); + List rest = list.subList(1, list.size()); + List> recursivePermutations = permutations(rest); + + for (List smallerPermutated : recursivePermutations) { + for (int index = 0; index <= smallerPermutated.size(); index++) { + List temp = new ArrayList<>(smallerPermutated); + temp.add(index, firstElement); + result.add(temp); + } + } + + return result; + } + public static List copyRequiredList(List list) { return ImmutableList.copyOf(Objects.requireNonNull(list, "non-null list is required")); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index e6ea3e9f263175..4dca9384d65794 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -26,21 +26,26 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; +import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TAggregationNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TNormalizedAggregateNode; +import org.apache.doris.thrift.TNormalizedPlanNode; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TSortInfo; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; @@ -50,6 +55,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Aggregation computation. @@ -300,6 +306,73 @@ protected void toThrift(TPlanNode msg) { } } + @Override + public void normalize(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) { + TNormalizedAggregateNode normalizedAggregateNode = new TNormalizedAggregateNode(); + + // if (aggInfo.getGroupingExprs().size() > 3) { + // throw new IllegalStateException("Too many grouping expressions, not use query cache"); + // } + + normalizedAggregateNode.setIntermediateTupleId( + normalizer.normalizeTupleId(aggInfo.getIntermediateTupleId().asInt())); + normalizedAggregateNode.setOutputTupleId( + normalizer.normalizeTupleId(aggInfo.getOutputTupleId().asInt())); + normalizedAggregateNode.setGroupingExprs(normalizeExprs(aggInfo.getGroupingExprs(), normalizer)); + normalizedAggregateNode.setAggregateFunctions(normalizeExprs(aggInfo.getAggregateExprs(), normalizer)); + normalizedAggregateNode.setIsFinalize(needsFinalize); + normalizedAggregateNode.setUseStreamingPreaggregation(useStreamingPreagg); + + normalizeAggIntermediateProjects(normalizedAggregateNode, normalizer); + normalizeAggOutputProjects(normalizedAggregateNode, normalizer); + + normalizedPlan.setNodeType(TPlanNodeType.AGGREGATION_NODE); + normalizedPlan.setAggregationNode(normalizedAggregateNode); + } + + @Override + protected void normalizeProjects(TNormalizedPlanNode normalizedPlanNode, Normalizer normalizer) { + List outputSlots = + getOutputTupleIds() + .stream() + .flatMap(tupleId -> normalizer.getDescriptorTable().getTupleDesc(tupleId).getSlots().stream()) + .collect(Collectors.toList()); + + List projectList = this.projectList; + if (projectList == null) { + projectList = this.aggInfo.getOutputTupleDesc() + .getSlots() + .stream() + .map(SlotRef::new) + .collect(Collectors.toList()); + } + + List projectThrift = normalizeProjects(outputSlots, projectList, normalizer); + normalizedPlanNode.setProjects(projectThrift); + } + + private void normalizeAggIntermediateProjects(TNormalizedAggregateNode aggregateNode, Normalizer normalizer) { + List projectToIntermediateTuple = ImmutableList.builder() + .addAll(aggInfo.getGroupingExprs()) + .addAll(aggInfo.getAggregateExprs()) + .build(); + + List intermediateSlots = aggInfo.getIntermediateTupleDesc().getSlots(); + List projects = normalizeProjects(intermediateSlots, projectToIntermediateTuple, normalizer); + aggregateNode.setProjectToAggIntermediateTuple(projects); + } + + private void normalizeAggOutputProjects(TNormalizedAggregateNode aggregateNode, Normalizer normalizer) { + List projectToIntermediateTuple = ImmutableList.builder() + .addAll(aggInfo.getGroupingExprs()) + .addAll(aggInfo.getAggregateExprs()) + .build(); + + List intermediateSlots = aggInfo.getOutputTupleDesc().getSlots(); + List projects = normalizeProjects(intermediateSlots, projectToIntermediateTuple, normalizer); + aggregateNode.setProjectToAggOutputTuple(projects); + } + protected String getDisplayLabelDetail() { if (useStreamingPreagg) { return "STREAMING"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 1763c99efcf145..1fc7155d64b9bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -61,10 +61,13 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.normalize.Normalizer; +import org.apache.doris.planner.normalize.PartitionRangePredicateNormalizer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; import org.apache.doris.statistics.StatisticalType; @@ -74,7 +77,10 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TNormalizedOlapScanNode; +import org.apache.doris.thrift.TNormalizedPlanNode; import org.apache.doris.thrift.TOlapScanNode; import org.apache.doris.thrift.TOlapTableIndex; import org.apache.doris.thrift.TPaloScanRange; @@ -756,15 +762,20 @@ private void addScanRangeLocations(Partition partition, int useFixReplica = -1; boolean needCheckTags = false; boolean skipMissingVersion = false; - if (ConnectContext.get() != null) { - allowedTags = ConnectContext.get().getResourceTags(); - needCheckTags = ConnectContext.get().isResourceTagsSet(); - useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica; + ConnectContext context = ConnectContext.get(); + if (context != null) { + allowedTags = context.getResourceTags(); + needCheckTags = context.isResourceTagsSet(); + useFixReplica = context.getSessionVariable().useFixReplica; + if (useFixReplica == -1 + && context.getState().isNereids() && context.getSessionVariable().getEnableQueryCache()) { + useFixReplica = 0; + } // if use_fix_replica is set to true, set skip_missing_version to false - skipMissingVersion = useFixReplica == -1 && ConnectContext.get().getSessionVariable().skipMissingVersion; + skipMissingVersion = useFixReplica == -1 && context.getSessionVariable().skipMissingVersion; if (LOG.isDebugEnabled()) { LOG.debug("query id: {}, partition id:{} visibleVersion: {}", - DebugUtil.printId(ConnectContext.get().queryId()), partition.getId(), visibleVersion); + DebugUtil.printId(context.queryId()), partition.getId(), visibleVersion); } } for (Tablet tablet : tablets) { @@ -798,7 +809,7 @@ private void addScanRangeLocations(Partition partition, List replicas = tablet.getQueryableReplicas(visibleVersion, backendAlivePathHashs, skipMissingVersion); if (replicas.isEmpty()) { - if (ConnectContext.get().getSessionVariable().skipBadTablet) { + if (context.getSessionVariable().skipBadTablet) { continue; } LOG.warn("no queryable replica found in tablet {}. visible version {}", tabletId, visibleVersion); @@ -827,7 +838,7 @@ private void addScanRangeLocations(Partition partition, // sort by replica id replicas.sort(Replica.ID_COMPARATOR); Replica replica = replicas.get(useFixReplica >= replicas.size() ? replicas.size() - 1 : useFixReplica); - if (ConnectContext.get().getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) { + if (context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) { Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendId()); // If the fixed replica is bad, then not clear the replicas using random replica if (backend == null || !backend.isAlive()) { @@ -1541,6 +1552,101 @@ protected void toThrift(TPlanNode msg) { super.toThrift(msg); } + @Override + public void normalize(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) { + TNormalizedOlapScanNode normalizedOlapScanNode = new TNormalizedOlapScanNode(); + normalizedOlapScanNode.setTableId(olapTable.getId()); + + long selectIndexId = selectedIndexId == -1 ? olapTable.getBaseIndexId() : selectedIndexId; + normalizedOlapScanNode.setIndexId(selectIndexId); + normalizedOlapScanNode.setIsPreaggregation(isPreAggregation); + normalizedOlapScanNode.setSortColumn(sortColumn); + normalizedOlapScanNode.setRollupName(olapTable.getIndexNameById(selectIndexId)); + + normalizeSchema(normalizedOlapScanNode); + normalizeSelectColumns(normalizedOlapScanNode, normalizer); + + normalizedPlan.setNodeType(TPlanNodeType.OLAP_SCAN_NODE); + normalizedPlan.setOlapScanNode(normalizedOlapScanNode); + } + + private void normalizeSelectColumns(TNormalizedOlapScanNode normalizedOlapScanNode, Normalizer normalizer) { + List slots = tupleIds + .stream() + .flatMap(tupleId -> normalizer.getDescriptorTable().getTupleDesc(tupleId).getSlots().stream()) + .collect(Collectors.toList()); + List> selectColumns = slots.stream() + .map(slot -> Pair.of(slot.getId(), slot.getColumn().getName())) + .collect(Collectors.toList()); + for (Column partitionColumn : olapTable.getPartitionInfo().getPartitionColumns()) { + boolean selectPartitionColumn = false; + String partitionColumnName = partitionColumn.getName(); + for (Pair selectColumn : selectColumns) { + if (selectColumn.second.equalsIgnoreCase(partitionColumnName)) { + selectPartitionColumn = true; + break; + } + } + if (!selectPartitionColumn) { + selectColumns.add(Pair.of(new SlotId(-1), partitionColumnName)); + } + } + + selectColumns.sort(Comparator.comparing(Pair::value)); + + for (Pair selectColumn : selectColumns) { + normalizer.normalizeSlotId(selectColumn.first.asInt()); + } + + normalizedOlapScanNode.setSelectColumns( + selectColumns.stream().map(Pair::value).collect(Collectors.toList()) + ); + } + + private void normalizeSchema(TNormalizedOlapScanNode normalizedOlapScanNode) { + List columns = selectedIndexId == -1 + ? olapTable.getBaseSchema() : olapTable.getSchemaByIndexId(selectedIndexId); + List keyColumns = columns.stream().filter(Column::isKey).collect(Collectors.toList()); + + normalizedOlapScanNode.setKeyColumnNames( + keyColumns.stream() + .map(Column::getName) + .collect(Collectors.toList()) + ); + + normalizedOlapScanNode.setKeyColumnTypes( + keyColumns.stream() + .map(column -> column.getDataType().toThrift()) + .collect(Collectors.toList()) + ); + } + + @Override + protected void normalizeConjuncts(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) { + List normalizedPredicates = new PartitionRangePredicateNormalizer(normalizer, this) + .normalize(); + + List normalizedConjuncts = normalizeExprs(normalizedPredicates, normalizer); + normalizedPlan.setConjuncts(normalizedConjuncts); + } + + @Override + protected void normalizeProjects(TNormalizedPlanNode normalizedPlanNode, Normalizer normalizer) { + List outputSlots = + getOutputTupleIds() + .stream() + .flatMap(tupleId -> normalizer.getDescriptorTable().getTupleDesc(tupleId).getSlots().stream()) + .collect(Collectors.toList()); + + List projectList = this.projectList; + if (projectList == null) { + projectList = outputSlots.stream().map(SlotRef::new).collect(Collectors.toList()); + } + + List projectThrift = normalizeProjects(outputSlots, projectList, normalizer); + normalizedPlanNode.setProjects(projectThrift); + } + public void collectColumns(Analyzer analyzer, Set equivalenceColumns, Set unequivalenceColumns) { // 1. Get columns which has predicate on it. for (Expr expr : conjuncts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 8a26ec4af66ac1..bb41761cfd550e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -33,11 +33,13 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragment; +import org.apache.doris.thrift.TQueryCacheParam; import org.apache.doris.thrift.TResultSinkType; import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; import com.google.common.collect.Lists; +import org.apache.commons.codec.binary.Hex; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -159,6 +161,8 @@ public class PlanFragment extends TreeNode { public Optional> specifyInstances = Optional.empty(); + public TQueryCacheParam queryCacheParam; + /** * C'tor for fragment with specific partition; the output is by default broadcast. */ @@ -350,6 +354,13 @@ public String getExplainString(TExplainLevel explainLevel) { str.append("\n"); str.append(" PARTITION: " + dataPartition.getExplainString(explainLevel) + "\n"); str.append(" HAS_COLO_PLAN_NODE: " + hasColocatePlanNode + "\n"); + if (queryCacheParam != null) { + str.append("\n"); + str.append(" QUERY_CACHE:\n"); + str.append(" CACHE_NODE_ID: " + queryCacheParam.getNodeId() + "\n"); + str.append(" DIGEST: " + Hex.encodeHexString(queryCacheParam.getDigest()) + "\n"); + } + str.append("\n"); if (sink != null) { str.append(sink.getExplainString(" ", explainLevel) + "\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 37f121a5c23d98..1e9d5646939896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -27,6 +27,7 @@ import org.apache.doris.analysis.ExprId; import org.apache.doris.analysis.ExprSubstitutionMap; import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; @@ -34,13 +35,18 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Id; import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.Pair; import org.apache.doris.common.TreeNode; import org.apache.doris.common.UserException; +import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.statistics.PlanStats; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsDeriveResult; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TNormalizedPlanNode; import org.apache.doris.thrift.TPlan; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPushAggOp; @@ -49,15 +55,18 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; @@ -884,6 +893,90 @@ public void getMaterializedIds(Analyzer analyzer, List ids) { // the node type and the node-specific field. protected abstract void toThrift(TPlanNode msg); + public TNormalizedPlanNode normalize(Normalizer normalizer) { + TNormalizedPlanNode normalizedPlan = new TNormalizedPlanNode(); + normalizedPlan.setNodeId(normalizer.normalizePlanId(id.asInt())); + normalizedPlan.setNumChildren(children.size()); + Set tupleIds = this.tupleIds + .stream() + .map(Id::asInt) + .collect(Collectors.toSet()); + normalizedPlan.setTupleIds( + tupleIds.stream() + .map(normalizer::normalizeTupleId) + .collect(Collectors.toSet()) + ); + normalizedPlan.setNullableTuples( + nullableTupleIds + .stream() + .map(Id::asInt) + .filter(tupleIds::contains) + .map(normalizer::normalizeTupleId) + .collect(Collectors.toSet()) + ); + normalize(normalizedPlan, normalizer); + normalizeConjuncts(normalizedPlan, normalizer); + normalizeProjects(normalizedPlan, normalizer); + normalizedPlan.setLimit(limit); + return normalizedPlan; + } + + public void normalize(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) { + throw new IllegalStateException("Unsupported normalization"); + } + + protected void normalizeProjects(TNormalizedPlanNode normalizedPlanNode, Normalizer normalizer) { + throw new IllegalStateException("Unsupported normalize project for " + getClass().getSimpleName()); + } + + public List normalizeProjects( + List outputSlotDescs, List projects, Normalizer normalizer) { + Map outputSlotToProject = Maps.newLinkedHashMap(); + for (int i = 0; i < outputSlotDescs.size(); i++) { + SlotId slotId = outputSlotDescs.get(i).getId(); + Expr projectExpr = projects.get(i); + if (projectExpr instanceof SlotRef) { + int outputSlotId = slotId.asInt(); + int refId = ((SlotRef) projectExpr).getSlotId().asInt(); + normalizer.setSlotIdToNormalizeId(outputSlotId, normalizer.normalizeSlotId(refId)); + } + outputSlotToProject.put(slotId, projectExpr); + } + return normalizeProjects(outputSlotToProject, normalizer); + } + + protected void normalizeConjuncts(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) { + normalizedPlan.setConjuncts(normalizeExprs(getConjuncts(), normalizer)); + } + + protected List normalizeProjects(Map project, Normalizer normalizer) { + List> sortByTExpr = Lists.newArrayListWithCapacity(project.size()); + for (Entry kv : project.entrySet()) { + SlotId slotId = kv.getKey(); + Expr expr = kv.getValue(); + TExpr thriftExpr = expr.normalize(normalizer); + sortByTExpr.add(Pair.of(slotId, thriftExpr)); + } + sortByTExpr.sort(Comparator.comparing(Pair::value)); + + // we should normalize slot id by fix order, then the upper nodes can reference the same normalized slot id + for (Pair pair : sortByTExpr) { + int originOutputSlotId = pair.first.asInt(); + normalizer.normalizeSlotId(originOutputSlotId); + } + + return sortByTExpr.stream().map(Pair::value).collect(Collectors.toList()); + } + + public static List normalizeExprs(Collection exprs, Normalizer normalizer) { + List normalizedWithoutSort = Lists.newArrayListWithCapacity(exprs.size()); + for (Expr expr : exprs) { + normalizedWithoutSort.add(expr.normalize(normalizer)); + } + normalizedWithoutSort.sort(Comparator.naturalOrder()); + return normalizedWithoutSort; + } + protected String debugString() { // not using Objects.toStrHelper because StringBuilder output = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/NormalizedPartitionPredicates.java b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/NormalizedPartitionPredicates.java new file mode 100644 index 00000000000000..c6741fb5d55d54 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/NormalizedPartitionPredicates.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.normalize; + +import org.apache.doris.analysis.Expr; + +import java.util.List; +import java.util.Map; + +/** NormalizedPartitionPredicates */ +public class NormalizedPartitionPredicates { + // the predicates which can not compute intersect range with the partitions + // case 1: no partition columns, say `non_partition_column = 1` + // case 2: partition column compute with non partition column, say `non_partition_column + partition_column = 1` + // case 3: the partition predicate contains some function which currently not supported convert to range, + // say `date(partition_column) = '2024-08-14'` + public final List remainedPredicates; + + // the partitionId to partition range string, + // for example: + // partitions is: + // P1 values [('2024-08-01'), ('2024-08-10')), the partition id is 10001 + // P2 values [('2024-08-10'), ('2024-08-20')), the partition id is 10002 + // + // predicate is: part_column between '2024-08-08' and '2024-08-12' + // + // the intersectPartitionRanges like is: + // { + // 10001: "[('2024-08-08'), ('2024-08-10'))", + // 10002: "[('2024-08-10'), ('2024-08-13'))" + // } + // + // we should normalize the intersect range to closeOpened range to maintain the same format + public final Map intersectPartitionRanges; + + public NormalizedPartitionPredicates( + List remainedPredicates, Map intersectPartitionRanges) { + this.remainedPredicates = remainedPredicates; + this.intersectPartitionRanges = intersectPartitionRanges; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/Normalizer.java b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/Normalizer.java new file mode 100644 index 00000000000000..2d76182a451a94 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/Normalizer.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java +// and modified by Doris + + +package org.apache.doris.planner.normalize; + +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.planner.OlapScanNode; + +/** Normalizer */ +public interface Normalizer { + int normalizeSlotId(int slotId); + + void setSlotIdToNormalizeId(int slotId, int normalizedId); + + int normalizeTupleId(int tupleId); + + int normalizePlanId(int planId); + + DescriptorTable getDescriptorTable(); + + void setNormalizedPartitionPredicates(OlapScanNode olapScanNode, NormalizedPartitionPredicates predicates); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PartitionRangePredicateNormalizer.java b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PartitionRangePredicateNormalizer.java new file mode 100644 index 00000000000000..eceaf9eddd72ec --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PartitionRangePredicateNormalizer.java @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java +// and modified by Doris + +package org.apache.doris.planner.normalize; + +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.common.Pair; +import org.apache.doris.planner.OlapScanNode; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** PartitionRangePredicateNormalizer */ +public class PartitionRangePredicateNormalizer { + private final Normalizer normalizer; + private final OlapScanNode olapScanNode; + + public PartitionRangePredicateNormalizer(Normalizer normalizer, OlapScanNode olapScanNode) { + this.normalizer = Objects.requireNonNull(normalizer, "normalizer can not be null"); + this.olapScanNode = Objects.requireNonNull(olapScanNode, "olapScanNode can not be null"); + } + + public List normalize() { + NormalizedPartitionPredicates predicates = normalizePredicates(); + normalizer.setNormalizedPartitionPredicates(olapScanNode, predicates); + return predicates.remainedPredicates; + } + + private NormalizedPartitionPredicates normalizePredicates() { + OlapTable olapTable = olapScanNode.getOlapTable(); + List partitionColumns = olapTable.getPartitionColumns(); + + if (partitionColumns.isEmpty()) { + return cannotIntersectPartitionRange(); + } + + if (partitionColumns.size() > 1) { + return cannotIntersectPartitionRange(); + } + + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + if (partitionInfo.getType() != PartitionType.RANGE) { + return cannotIntersectPartitionRange(); + } + + return normalizeSingleRangePartitionColumnPredicates( + partitionColumns.get(0), (RangePartitionInfo) partitionInfo); + } + + private NormalizedPartitionPredicates normalizeSingleRangePartitionColumnPredicates( + Column partitionColumn, RangePartitionInfo rangePartitionInfo) { + + List>> partitionItemRanges + = rangePartitionInfo.getPartitionItems(olapScanNode.getSelectedPartitionIds()) + .entrySet() + .stream() + .map(entry -> { + RangeSet rangeSet = TreeRangeSet.create(); + rangeSet.add(entry.getValue().getItems()); + return Pair.of(entry.getKey(), rangeSet); + }) + .collect(Collectors.toList()); + + List conjuncts = extractConjuncts(olapScanNode.getConjuncts()); + + ToRangePredicatesExtractor extractor = ToRangePredicatesExtractor.extract( + conjuncts, olapScanNode, partitionColumn); + + PredicateToRange predicateToRange = new PredicateToRange(partitionColumn); + + for (Expr partitionPredicate : extractor.supportedToRangePredicates) { + RangeSet predicateRanges = predicateToRange.exprToRange(partitionPredicate); + partitionItemRanges = partitionItemRanges.stream() + .map(kv -> { + RangeSet partitionRangeSet = kv.second; + RangeSet intersect = TreeRangeSet.create(); + for (Range predicateRange : predicateRanges.asRanges()) { + intersect.addAll(partitionRangeSet.subRangeSet(predicateRange)); + } + return Pair.of(kv.first, intersect); + }).filter(kv -> !kv.second.isEmpty()) + .collect(Collectors.toList()); + } + + Map partitionToIntersectRange = partitionItemRanges.stream() + .map(pair -> Pair.of(pair.first, normalizeRangeSet(pair.second).toString())) + .collect(Collectors.toMap(Pair::key, Pair::value)); + + return new NormalizedPartitionPredicates(extractor.notSupportedToRangePredicates, partitionToIntersectRange); + } + + private RangeSet normalizeRangeSet(RangeSet rangeSet) { + // normalize range to closeOpened range, the between predicate and less than predicate + // maybe reuse the same cache to save memory + RangeSet normalized = TreeRangeSet.create(); + for (Range range : rangeSet.asRanges()) { + PartitionKey lowerEndpoint = range.lowerEndpoint(); + PartitionKey upperEndpoint = range.upperEndpoint(); + + try { + if (!lowerEndpoint.isMinValue() && !range.contains(lowerEndpoint)) { + lowerEndpoint = lowerEndpoint.successor(); + } + if (!upperEndpoint.isMaxValue() && range.contains(upperEndpoint)) { + upperEndpoint = upperEndpoint.successor(); + } + normalized.add(Range.closedOpen(lowerEndpoint, upperEndpoint)); + } catch (Throwable t) { + throw new IllegalStateException("Can not normalize range: " + t.getMessage(), t); + } + } + return normalized; + } + + private NormalizedPartitionPredicates cannotIntersectPartitionRange() { + return new NormalizedPartitionPredicates( + // conjuncts will be used as the part of the digest + olapScanNode.getConjuncts(), + // can not compute intersect range + ImmutableMap.of(olapScanNode.getSelectedPartitionIds().iterator().next(), "") + ); + } + + private List extractConjuncts(List conjuncts) { + List flattenedConjuncts = Lists.newArrayListWithCapacity(conjuncts.size()); + for (Expr conjunct : conjuncts) { + boolean findChildren = true; + conjunct.foreachDown(expr -> { + if (expr instanceof CompoundPredicate && ((CompoundPredicate) expr).getOp() == Operator.AND) { + return findChildren; + } else { + flattenedConjuncts.add((Expr) expr); + return !findChildren; + } + }); + } + return flattenedConjuncts; + } + + private static class ToRangePredicatesExtractor { + public final List supportedToRangePredicates; + public final List notSupportedToRangePredicates; + + private ToRangePredicatesExtractor( + List simplePartitionPredicates, List notSupportedToRangePredicates) { + this.supportedToRangePredicates = simplePartitionPredicates; + this.notSupportedToRangePredicates = notSupportedToRangePredicates; + } + + public static ToRangePredicatesExtractor extract( + List conjuncts, OlapScanNode olapScanNode, Column partitionColumn) { + List supportedPartitionPredicates = Lists.newArrayList(); + List otherPredicates = Lists.newArrayList(); + + Optional optPartitionId = findPartitionColumnSlotId(olapScanNode, partitionColumn); + + for (Expr conjunct : conjuncts) { + if (optPartitionId.isPresent() + && conjunct.isBound(optPartitionId.get()) + && PredicateToRange.supportedToRange(conjunct)) { + supportedPartitionPredicates.add(conjunct); + } else { + otherPredicates.add(conjunct); + } + } + + return new ToRangePredicatesExtractor(supportedPartitionPredicates, otherPredicates); + } + + private static Optional findPartitionColumnSlotId(OlapScanNode olapScanNode, Column partitionColumn) { + if (partitionColumn == null) { + return Optional.empty(); + } + + for (SlotDescriptor slot : olapScanNode.getTupleDesc().getSlots()) { + Column column = slot.getColumn(); + if (column.getName().equalsIgnoreCase(partitionColumn.getName())) { + return Optional.of(slot.getId()); + } + } + return Optional.empty(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PredicateToRange.java b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PredicateToRange.java new file mode 100644 index 00000000000000..44fee8efb39d5b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/PredicateToRange.java @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.normalize; + +import org.apache.doris.analysis.BetweenPredicate; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionKey; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; + +/** PredicateToRange */ +public class PredicateToRange { + private final Column partitionColumn; + private final PartitionKey minKey; + private final PartitionKey maxKey; + + public PredicateToRange(Column partitionColumn) { + this.partitionColumn = partitionColumn; + + try { + minKey = PartitionKey.createInfinityPartitionKey(ImmutableList.of(partitionColumn), false); + maxKey = PartitionKey.createInfinityPartitionKey(ImmutableList.of(partitionColumn), true); + } catch (Throwable t) { + throw new IllegalStateException( + "Can not create min/max partition key by the column: " + partitionColumn, t); + } + } + + /** supportedPartitionPredicate */ + public static boolean supportedToRange(Expr conjunct) { + if (conjunct instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) conjunct; + switch (binaryPredicate.getOp()) { + case EQ: + case LT: + case LE: + case GT: + case GE: + case NE: + break; + default: + return false; + } + return conjunct.getChild(0) instanceof SlotRef && conjunct.getChild(1) instanceof LiteralExpr; + } else if (conjunct instanceof BetweenPredicate) { + return conjunct.getChild(0) instanceof SlotRef + && conjunct.getChild(1) instanceof LiteralExpr + && conjunct.getChild(2) instanceof LiteralExpr; + } else if (conjunct instanceof InPredicate) { + if (!(conjunct.getChild(0) instanceof SlotRef)) { + return false; + } + for (int i = 1; i < conjunct.getChildren().size(); i++) { + if (!(conjunct.getChild(i) instanceof LiteralExpr)) { + return false; + } + } + return true; + } else { + return false; + } + } + + /** exprToRange */ + public RangeSet exprToRange(Expr predicate) { + if (predicate instanceof BinaryPredicate) { + return compareToRangeSet((BinaryPredicate) predicate); + } else if (predicate instanceof BetweenPredicate) { + return betweenToRange((BetweenPredicate) predicate); + } else if (predicate instanceof InPredicate) { + return inToRange((InPredicate) predicate); + } else { + throw new IllegalStateException("Unsupported to Range: " + predicate); + } + } + + private RangeSet compareToRangeSet(BinaryPredicate predicate) { + Operator op = predicate.getOp(); + Expr rightChild = predicate.getChild(1); + PartitionKey rightPartitionKey = toPartitionKey(rightChild); + + TreeRangeSet rangeSet = TreeRangeSet.create(); + switch (op) { + case EQ: + rangeSet.add(Range.closed(rightPartitionKey, rightPartitionKey)); + break; + case NE: + rangeSet.add(Range.open(minKey, rightPartitionKey)); + rangeSet.add(Range.open(rightPartitionKey, maxKey)); + break; + case LT: + rangeSet.add(Range.open(minKey, rightPartitionKey)); + break; + case LE: + rangeSet.add(Range.openClosed(minKey, rightPartitionKey)); + break; + case GT: + rangeSet.add(Range.open(rightPartitionKey, maxKey)); + break; + case GE: + rangeSet.add(Range.closedOpen(rightPartitionKey, maxKey)); + break; + default: + } + return rangeSet; + } + + private RangeSet betweenToRange(BetweenPredicate predicate) { + PartitionKey lowerBound = toPartitionKey(predicate.getChild(0)); + PartitionKey upperBound = toPartitionKey(predicate.getChild(1)); + TreeRangeSet rangeSet = TreeRangeSet.create(); + if (predicate.isNotBetween()) { + rangeSet.add(Range.open(minKey, lowerBound)); + rangeSet.add(Range.open(upperBound, maxKey)); + } else { + rangeSet.add(Range.closed(lowerBound, upperBound)); + } + return rangeSet; + } + + private RangeSet inToRange(InPredicate predicate) { + boolean isNotIn = predicate.isNotIn(); + RangeSet rangeSet = TreeRangeSet.create(); + for (Expr item : predicate.getListChildren()) { + PartitionKey itemKey = toPartitionKey(item); + if (isNotIn) { + RangeSet completeRangeSet = TreeRangeSet.create(); + completeRangeSet.add(Range.open(minKey, itemKey)); + completeRangeSet.add(Range.open(itemKey, maxKey)); + + RangeSet intersect = TreeRangeSet.create(); + for (Range completeRange : completeRangeSet.asRanges()) { + intersect.addAll(rangeSet.subRangeSet(completeRange)); + } + rangeSet = intersect; + } else { + rangeSet.add(Range.closed(itemKey, itemKey)); + } + } + + if (isNotIn) { + rangeSet = rangeSet.complement(); + } + return rangeSet; + } + + private PartitionKey toPartitionKey(Expr expr) { + // try to cast to literal, if wrong, query cache will be skipped + LiteralExpr literalExpr = (LiteralExpr) expr; + PartitionKey partitionKey = new PartitionKey(); + partitionKey.pushColumn(literalExpr, partitionColumn.getDataType()); + return partitionKey; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java new file mode 100644 index 00000000000000..14fa5746fc56fe --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java +// and modified by Doris + +package org.apache.doris.planner.normalize; + +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.Pair; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TNormalizedPlanNode; +import org.apache.doris.thrift.TQueryCacheParam; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TCompactProtocol; + +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** QueryCacheNormalizer */ +public class QueryCacheNormalizer implements Normalizer { + private final PlanFragment fragment; + private final DescriptorTable descriptorTable; + private final NormalizedIdGenerator normalizedPlanIds = new NormalizedIdGenerator(); + private final NormalizedIdGenerator normalizedTupleIds = new NormalizedIdGenerator(); + private final NormalizedIdGenerator normalizedSlotIds = new NormalizedIdGenerator(); + + // result + private final TQueryCacheParam queryCacheParam = new TQueryCacheParam(); + + public QueryCacheNormalizer(PlanFragment fragment, DescriptorTable descriptorTable) { + this.fragment = Objects.requireNonNull(fragment, "fragment can not be null"); + this.descriptorTable = Objects.requireNonNull(descriptorTable, "descriptorTable can not be null"); + } + + public Optional normalize(ConnectContext context) { + try { + Optional cachePoint = computeCachePoint(); + if (!cachePoint.isPresent()) { + return Optional.empty(); + } + List normalizedDigestPlans = normalizePlanTree(context, cachePoint.get()); + byte[] digest = computeDigest(normalizedDigestPlans); + return setQueryCacheParam(cachePoint.get(), digest, context); + } catch (Throwable t) { + return Optional.empty(); + } + } + + @VisibleForTesting + public List normalizePlans(ConnectContext context) { + Optional cachePoint = computeCachePoint(); + if (!cachePoint.isPresent()) { + return ImmutableList.of(); + } + return normalizePlanTree(context, cachePoint.get()); + } + + private Optional setQueryCacheParam( + CachePoint cachePoint, byte[] digest, ConnectContext context) { + queryCacheParam.setNodeId(cachePoint.cacheRoot.getId().asInt()); + queryCacheParam.setDigest(digest); + queryCacheParam.setForceRefreshQueryCache(context.getSessionVariable().isQueryCacheForceRefresh()); + queryCacheParam.setEntryMaxBytes(context.getSessionVariable().getQueryCacheEntryMaxBytes()); + queryCacheParam.setEntryMaxRows(context.getSessionVariable().getQueryCacheEntryMaxRows()); + + queryCacheParam.setOutputSlotMapping( + cachePoint.cacheRoot.getOutputTupleIds() + .stream() + .flatMap(tupleId -> descriptorTable.getTupleDesc(tupleId).getSlots().stream()) + .map(slot -> { + int slotId = slot.getId().asInt(); + return Pair.of(slotId, normalizeSlotId(slotId)); + }) + .collect(Collectors.toMap(Pair::key, Pair::value)) + ); + + return Optional.of(queryCacheParam); + } + + private Optional computeCachePoint() { + if (!fragment.getTargetRuntimeFilterIds().isEmpty()) { + return Optional.empty(); + } + PlanNode planRoot = fragment.getPlanRoot(); + return doComputeCachePoint(planRoot); + } + + private Optional doComputeCachePoint(PlanNode planRoot) { + if (planRoot instanceof AggregationNode) { + PlanNode child = planRoot.getChild(0); + if (child instanceof OlapScanNode) { + return Optional.of(new CachePoint(planRoot, planRoot)); + } else if (child instanceof AggregationNode) { + Optional childCachePoint = doComputeCachePoint(child); + if (childCachePoint.isPresent()) { + return Optional.of(new CachePoint(planRoot, planRoot)); + } + } + } + return Optional.empty(); + } + + private List normalizePlanTree(ConnectContext context, CachePoint cachePoint) { + List normalizedPlans = new ArrayList<>(); + doNormalizePlanTree(context, cachePoint.digestRoot, normalizedPlans); + return normalizedPlans; + } + + private void doNormalizePlanTree( + ConnectContext context, PlanNode plan, List normalizedPlans) { + for (PlanNode child : plan.getChildren()) { + doNormalizePlanTree(context, child, normalizedPlans); + } + normalizedPlans.add(plan.normalize(this)); + } + + public static byte[] computeDigest(List normalizedDigestPlans) throws Exception { + TSerializer serializer = new TSerializer(new TCompactProtocol.Factory()); + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + + for (TNormalizedPlanNode node : normalizedDigestPlans) { + digest.update(serializer.serialize(node)); + } + return digest.digest(); + } + + @Override + public int normalizeSlotId(int slotId) { + return normalizedSlotIds.normalize(slotId); + } + + @Override + public void setSlotIdToNormalizeId(int slotId, int normalizedId) { + normalizedSlotIds.set(slotId, normalizedId); + } + + @Override + public int normalizeTupleId(int tupleId) { + return normalizedTupleIds.normalize(tupleId); + } + + @Override + public int normalizePlanId(int planId) { + return normalizedPlanIds.normalize(planId); + } + + @Override + public DescriptorTable getDescriptorTable() { + return descriptorTable; + } + + @Override + public void setNormalizedPartitionPredicates(OlapScanNode olapScanNode, NormalizedPartitionPredicates predicates) { + OlapTable olapTable = olapScanNode.getOlapTable(); + long selectIndexId = olapScanNode.getSelectedIndexId() == -1 + ? olapTable.getBaseIndexId() + : olapScanNode.getSelectedIndexId(); + + Map tabletToRange = Maps.newLinkedHashMap(); + for (Long partitionId : olapScanNode.getSelectedPartitionIds()) { + Set tabletIds = olapTable.getPartition(partitionId) + .getIndex(selectIndexId) + .getTablets() + .stream() + .map(Tablet::getId) + .collect(Collectors.toSet()); + + String filterRange = predicates.intersectPartitionRanges.get(partitionId); + for (Long tabletId : tabletIds) { + tabletToRange.put(tabletId, filterRange); + } + } + queryCacheParam.setTabletToRange(tabletToRange); + } + + private static class CachePoint { + PlanNode digestRoot; + PlanNode cacheRoot; + + public CachePoint(PlanNode digestRoot, PlanNode cacheRoot) { + this.digestRoot = digestRoot; + this.cacheRoot = cacheRoot; + } + } + + private static class NormalizedIdGenerator { + private final AtomicInteger idGenerator = new AtomicInteger(0); + private final Map originIdToNormalizedId = Maps.newLinkedHashMap(); + + public Integer normalize(Integer originId) { + return originIdToNormalizedId.computeIfAbsent(originId, id -> idGenerator.getAndIncrement()); + } + + public void set(int originId, Integer normalizedId) { + originIdToNormalizedId.put(originId, normalizedId); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0cbe3eebbab9f1..d675441d165d8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1884,9 +1884,9 @@ protected void computeFragmentHosts() throws Exception { boolean forceToLocalShuffle = context != null && context.getSessionVariable().isForceToLocalShuffle() && !fragment.hasNullAwareLeftAntiJoin() && useNereids; - boolean ignoreStorageDataDistribution = forceToLocalShuffle || (node.isPresent() + boolean ignoreStorageDataDistribution = (forceToLocalShuffle || (node.isPresent() && node.get().ignoreStorageDataDistribution(context, addressToBackendID.size()) - && useNereids); + && useNereids)) && fragment.queryCacheParam == null; if (node.isPresent() && ignoreStorageDataDistribution) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and no conjuncts, only need 1 instance to save cpu and @@ -1907,6 +1907,9 @@ protected void computeFragmentHosts() throws Exception { if (node.get().shouldUseOneInstance(context)) { expectedInstanceNum = 1; } + if (fragment.queryCacheParam != null) { + expectedInstanceNum = perNodeScanRanges.size(); + } perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); @@ -2733,12 +2736,12 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc */ boolean forceToLocalShuffle = context != null && context.getSessionVariable().isForceToLocalShuffle() && !hasNullAwareLeftAntiJoin && useNereids; - boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() + boolean ignoreStorageDataDistribution = (forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, addressToBackendID.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; - }) && useNereids); + }) && useNereids)) && params.fragment.queryCacheParam == null; FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry>>>> addressScanRange @@ -3102,6 +3105,7 @@ Map toThrift(int backendNum) { Map res = new HashMap(); Map instanceIdx = new HashMap(); TPlanFragment fragmentThrift = fragment.toThrift(); + fragmentThrift.query_cache_param = fragment.queryCacheParam; for (int i = 0; i < instanceExecParams.size(); ++i) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); Map> scanRanges = instanceExecParam.perNodeScanRanges; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 4140835b657004..44244256de5df9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -150,6 +150,10 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_REWRITE_ELEMENT_AT_TO_SLOT = "enable_rewrite_element_at_to_slot"; public static final String ENABLE_ODBC_TRANSCATION = "enable_odbc_transcation"; public static final String ENABLE_SQL_CACHE = "enable_sql_cache"; + public static final String ENABLE_QUERY_CACHE = "enable_query_cache"; + public static final String QUERY_CACHE_FORCE_REFRESH = "query_cache_force_refresh"; + public static final String QUERY_CACHE_ENTRY_MAX_BYTES = "query_cache_entry_max_bytes"; + public static final String QUERY_CACHE_ENTRY_MAX_ROWS = "query_cache_entry_max_rows"; public static final String ENABLE_COST_BASED_JOIN_REORDER = "enable_cost_based_join_reorder"; @@ -939,6 +943,18 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SQL_CACHE) public boolean enableSqlCache = false; + @VariableMgr.VarAttr(name = ENABLE_QUERY_CACHE) + public boolean enableQueryCache = false; + + @VarAttr(name = QUERY_CACHE_FORCE_REFRESH) + private boolean queryCacheForceRefresh = false; + + @VarAttr(name = QUERY_CACHE_ENTRY_MAX_BYTES) + private long queryCacheEntryMaxBytes = 5242880; + + @VarAttr(name = QUERY_CACHE_ENTRY_MAX_ROWS) + private long queryCacheEntryMaxRows = 500000; + @VariableMgr.VarAttr(name = FORWARD_TO_MASTER) public boolean forwardToMaster = true; @@ -2798,6 +2814,38 @@ public void setEnableSqlCache(boolean enableSqlCache) { this.enableSqlCache = enableSqlCache; } + public boolean getEnableQueryCache() { + return enableQueryCache; + } + + public void setEnableQueryCache(boolean enableQueryCache) { + this.enableQueryCache = enableQueryCache; + } + + public boolean isQueryCacheForceRefresh() { + return queryCacheForceRefresh; + } + + public void setQueryCacheForceRefresh(boolean queryCacheForceRefresh) { + this.queryCacheForceRefresh = queryCacheForceRefresh; + } + + public long getQueryCacheEntryMaxBytes() { + return queryCacheEntryMaxBytes; + } + + public void setQueryCacheEntryMaxBytes(long queryCacheEntryMaxBytes) { + this.queryCacheEntryMaxBytes = queryCacheEntryMaxBytes; + } + + public long getQueryCacheEntryMaxRows() { + return queryCacheEntryMaxRows; + } + + public void setQueryCacheEntryMaxRows(long queryCacheEntryMaxRows) { + this.queryCacheEntryMaxRows = queryCacheEntryMaxRows; + } + public int getPartitionedHashJoinRowsThreshold() { return partitionedHashJoinRowsThreshold; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java new file mode 100644 index 00000000000000..829d1b0d1e507c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java @@ -0,0 +1,515 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.planner.normalize.QueryCacheNormalizer; +import org.apache.doris.thrift.TNormalizedPlanNode; +import org.apache.doris.thrift.TQueryCacheParam; +import org.apache.doris.thrift.TRuntimeFilterMode; +import org.apache.doris.thrift.TRuntimeFilterType; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class QueryCacheNormalizerTest extends TestWithFeService { + private static final Logger LOG = LogManager.getLogger(QueryCacheNormalizerTest.class); + + @Override + protected void runBeforeAll() throws Exception { + connectContext.getSessionVariable().setEnableNereidsPlanner(true); + + // Create database `db1`. + createDatabase("db1"); + + useDatabase("db1"); + + // Create tables. + String nonPart = "create table db1.non_part(" + + " k1 varchar(32),\n" + + " k2 varchar(32),\n" + + " k3 varchar(32),\n" + + " v1 int,\n" + + " v2 int)\n" + + "DUPLICATE KEY(k1, k2, k3)\n" + + "distributed by hash(k1) buckets 3\n" + + "properties('replication_num' = '1')"; + + String part1 = "create table db1.part1(" + + " dt date,\n" + + " k1 varchar(32),\n" + + " k2 varchar(32),\n" + + " k3 varchar(32),\n" + + " v1 int,\n" + + " v2 int)\n" + + "DUPLICATE KEY(dt, k1, k2, k3)\n" + + "PARTITION BY RANGE(dt)\n" + + "(\n" + + " PARTITION p202403 VALUES [('2024-03-01'), ('2024-04-01')),\n" + + " PARTITION p202404 VALUES [('2024-04-01'), ('2024-05-01')),\n" + + " PARTITION p202405 VALUES [('2024-05-01'), ('2024-06-01'))\n" + + ")\n" + + "distributed by hash(k1) buckets 3\n" + + "properties('replication_num' = '1')"; + + String part2 = "create table db1.part2(" + + " dt date,\n" + + " k1 varchar(32),\n" + + " k2 varchar(32),\n" + + " k3 varchar(32),\n" + + " v1 int,\n" + + " v2 int)\n" + + "DUPLICATE KEY(dt, k1, k2, k3)\n" + + "PARTITION BY RANGE(dt)\n" + + "(\n" + + " PARTITION p202403 VALUES [('2024-03-01'), ('2024-04-01')),\n" + + " PARTITION p202404 VALUES [('2024-04-01'), ('2024-05-01')),\n" + + " PARTITION p202405 VALUES [('2024-05-01'), ('2024-06-01'))\n" + + ")\n" + + "distributed by hash(k1) buckets 3\n" + + "properties('replication_num' = '1')"; + + String multiLeveParts = "create table db1.multi_level_parts(" + + " k1 int,\n" + + " dt date,\n" + + " hour int,\n" + + " v1 int,\n" + + " v2 int)\n" + + "DUPLICATE KEY(k1)\n" + + "PARTITION BY RANGE(dt, hour)\n" + + "(\n" + + " PARTITION p202403 VALUES [('2024-03-01', '0'), ('2024-03-01', '1')),\n" + + " PARTITION p202404 VALUES [('2024-03-01', '1'), ('2024-03-01', '2')),\n" + + " PARTITION p202405 VALUES [('2024-03-01', '2'), ('2024-03-01', '3'))\n" + + ")\n" + + "distributed by hash(k1) buckets 3\n" + + "properties('replication_num' = '1')"; + + createTables(nonPart, part1, part2, multiLeveParts); + + connectContext.getSessionVariable().setEnableNereidsPlanner(true); + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + + @Test + public void testNormalize() throws Exception { + String digest1 = getDigest("select k1 as k, sum(v1) as v from db1.non_part group by 1"); + String digest2 = getDigest("select sum(v1) as v1, k1 as k from db1.non_part group by 2"); + Assertions.assertEquals(64, digest1.length()); + Assertions.assertEquals(digest1, digest2); + + String digest3 = getDigest("select k1 as k, sum(v1) as v from db1.non_part where v1 between 1 and 10 group by 1"); + Assertions.assertNotEquals(digest1, digest3); + + String digest4 = getDigest("select k1 as k, sum(v1) as v from db1.non_part where v1 >= 1 and v1 <= 10 group by 1"); + Assertions.assertEquals(digest3, digest4); + Assertions.assertEquals(64, digest3.length()); + + String digest5 = getDigest("select k1 as k, sum(v1) as v from db1.non_part where v1 >= 1 and v1 < 11 group by 1"); + Assertions.assertNotEquals(digest3, digest5); + Assertions.assertEquals(64, digest5.length()); + } + + @Test + public void testProjectOnOlapScan() throws Exception { + String digest1 = getDigest("select k1 + 1, k2, sum(v1), sum(v2) as v from db1.non_part group by 1, 2"); + String digest2 = getDigest("select sum(v2), k2, sum(v1), k1 + 1 from db1.non_part group by 2, 4"); + Assertions.assertEquals(digest1, digest2); + Assertions.assertEquals(64, digest1.length()); + + String digest3 = getDigest("select k1 + 1, k2, sum(v1 + 1), sum(v2) as v from db1.non_part group by 1, 2"); + Assertions.assertNotEquals(digest1, digest3); + Assertions.assertEquals(64, digest3.length()); + } + + @Test + public void testProjectOnAggregate() throws Exception { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION,TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT"); + try { + String digest1 = getDigest( + "select k1 + 1, k2 + 2, sum(v1) + 3, sum(v2) + 4 as v from db1.non_part group by k1, k2" + ); + String digest2 = getDigest( + "select sum(v2) + 4, k2 + 2, sum(v1) + 3, k1 + 1 as v from db1.non_part group by k2, k1" + ); + Assertions.assertEquals(digest1, digest2); + Assertions.assertEquals(64, digest1.length()); + } finally { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + } + + @Test + public void testPartitionTable() throws Throwable { + TQueryCacheParam queryCacheParam1 = getQueryCacheParam( + "select k1 as k, sum(v1) as v from db1.part1 group by 1"); + TQueryCacheParam queryCacheParam2 = getQueryCacheParam( + "select k1 as k, sum(v1) as v from db1.part1 where dt < '2025-01-01' group by 1"); + Assertions.assertEquals(queryCacheParam1.digest, queryCacheParam2.digest); + Assertions.assertEquals(32, queryCacheParam1.digest.remaining()); + Assertions.assertEquals(queryCacheParam1.tablet_to_range, queryCacheParam2.tablet_to_range); + + TQueryCacheParam queryCacheParam3 = getQueryCacheParam( + "select k1 as k, sum(v1) as v from db1.part1 where dt < '2024-05-20' group by 1"); + Assertions.assertEquals(queryCacheParam1.digest, queryCacheParam3.digest); + Assertions.assertNotEquals( + Lists.newArrayList(queryCacheParam1.tablet_to_range.values()), + Lists.newArrayList(queryCacheParam3.tablet_to_range.values()) + ); + Assertions.assertEquals(32, queryCacheParam3.digest.remaining()); + + TQueryCacheParam queryCacheParam4 = getQueryCacheParam( + "select k1 as k, sum(v1) as v from db1.part1 where dt < '2024-04-20' group by 1"); + Assertions.assertEquals(queryCacheParam1.digest, queryCacheParam4.digest); + Assertions.assertNotEquals( + Lists.newArrayList(queryCacheParam1.tablet_to_range.values()), + Lists.newArrayList(queryCacheParam4.tablet_to_range.values()) + ); + Assertions.assertEquals(32, queryCacheParam4.digest.remaining()); + } + + @Test + public void testMultiLevelPartitionTable() throws Throwable { + List queryCacheParams = normalize( + "select k1, sum(v1) as v from db1.multi_level_parts group by 1"); + Assertions.assertEquals(1, queryCacheParams.size()); + } + + @Test + public void testHaving() throws Throwable { + List normalizedPlanNodes = onePhaseAggWithoutDistinct(() -> normalizePlans( + "select k1, sum(v1) as v from db1.part1 where dt='2024-05-01' group by 1 having v > 10")); + Assertions.assertEquals(2, normalizedPlanNodes.size()); + Assertions.assertTrue( + normalizedPlanNodes.get(0).getOlapScanNode() != null + && CollectionUtils.isEmpty(normalizedPlanNodes.get(0).getConjuncts()) + ); + Assertions.assertTrue( + normalizedPlanNodes.get(1).getAggregationNode() != null + && !CollectionUtils.isEmpty(normalizedPlanNodes.get(1).getConjuncts()) + ); + } + + @Test + public void testRuntimeFilter() throws Throwable { + connectContext.getSessionVariable().setRuntimeFilterMode(TRuntimeFilterMode.GLOBAL.toString()); + connectContext.getSessionVariable().setRuntimeFilterType(TRuntimeFilterType.IN_OR_BLOOM.getValue()); + List queryCacheParams = normalize( + "select * from (select k1, count(*) from db1.part1 where k1 < 15 group by k1)a\n" + + "join (select k1, count(*) from db1.part1 where k1 < 10 group by k1)b\n" + + "on a.k1 = b.k1"); + + // only non target side can use query cache + Assertions.assertEquals(1, queryCacheParams.size()); + } + + @Test + public void testSelectFromWhereNoGroupBy() throws Throwable { + List plans1 = normalizePlans("select sum(v1) from db1.part1"); + List plans2 = normalizePlans("select sum(v1) as v from db1.part1"); + Assertions.assertEquals(plans1, plans2); + + List plans3 = normalizePlans("select sum(v1) from db1.part1 where dt < '2025-01-01'"); + Assertions.assertEquals(plans1, plans3); + + List plans4 = normalizePlans("select sum(v1) from db1.part1 where dt < '2024-05-01'"); + Assertions.assertEquals(plans1, plans4); + + TQueryCacheParam queryCacheParam1 = getQueryCacheParam("select sum(v1) from db1.part1"); + TQueryCacheParam queryCacheParam2 = getQueryCacheParam("select sum(v1) from db1.part1 where dt <= '2024-04-20'"); + Assertions.assertEquals(queryCacheParam1.digest, queryCacheParam2.digest); + Assertions.assertEquals(32, queryCacheParam1.digest.remaining()); + Assertions.assertNotEquals( + Lists.newArrayList(queryCacheParam1.tablet_to_range.values()), + Lists.newArrayList(queryCacheParam2.tablet_to_range.values()) + ); + + Assertions.assertEquals( + queryCacheParam1.tablet_to_range.values().stream().sorted().collect(Collectors.toList()), + ImmutableList.of( + "[[types: [DATEV2]; keys: [2024-03-01]; ..types: [DATEV2]; keys: [2024-04-01]; )]", + "[[types: [DATEV2]; keys: [2024-03-01]; ..types: [DATEV2]; keys: [2024-04-01]; )]", + "[[types: [DATEV2]; keys: [2024-03-01]; ..types: [DATEV2]; keys: [2024-04-01]; )]", + "[[types: [DATEV2]; keys: [2024-04-01]; ..types: [DATEV2]; keys: [2024-05-01]; )]", + "[[types: [DATEV2]; keys: [2024-04-01]; ..types: [DATEV2]; keys: [2024-05-01]; )]", + "[[types: [DATEV2]; keys: [2024-04-01]; ..types: [DATEV2]; keys: [2024-05-01]; )]", + "[[types: [DATEV2]; keys: [2024-05-01]; ..types: [DATEV2]; keys: [2024-06-01]; )]", + "[[types: [DATEV2]; keys: [2024-05-01]; ..types: [DATEV2]; keys: [2024-06-01]; )]", + "[[types: [DATEV2]; keys: [2024-05-01]; ..types: [DATEV2]; keys: [2024-06-01]; )]" + ) + ); + + Assertions.assertEquals( + queryCacheParam2.tablet_to_range.values().stream().sorted().collect(Collectors.toList()), + ImmutableList.of( + "[[types: [DATEV2]; keys: [2024-03-01]; ..types: [DATEV2]; keys: [2024-04-01]; )]", + "[[types: [DATEV2]; keys: [2024-03-01]; ..types: [DATEV2]; keys: [2024-04-01]; )]", + "[[types: [DATEV2]; keys: [2024-03-01]; ..types: [DATEV2]; keys: [2024-04-01]; )]", + "[[types: [DATEV2]; keys: [2024-04-01]; ..types: [DATEV2]; keys: [2024-04-21]; )]", + "[[types: [DATEV2]; keys: [2024-04-01]; ..types: [DATEV2]; keys: [2024-04-21]; )]", + "[[types: [DATEV2]; keys: [2024-04-01]; ..types: [DATEV2]; keys: [2024-04-21]; )]" + ) + ); + + TQueryCacheParam queryCacheParam3 = getQueryCacheParam( + "select sum(v1), sum(v1 + 1), sum(v2), sum(v2 + 2) + 2 from db1.part1" + ); + TQueryCacheParam queryCacheParam4 = getQueryCacheParam( + "select sum(v2 + 2) + 2, sum(v2), sum(v1 + 1), sum(v1) from db1.part1" + ); + Assertions.assertEquals(queryCacheParam3.digest, queryCacheParam4.digest); + Assertions.assertEquals(32, queryCacheParam3.digest.remaining()); + Assertions.assertEquals(queryCacheParam3.tablet_to_range, queryCacheParam4.tablet_to_range); + + TQueryCacheParam queryCacheParam5 = getQueryCacheParam( + "select sum(v1) from db1.part1 where dt between '2024-04-15' and '2024-04-20' and k1 = 1" + ); + TQueryCacheParam queryCacheParam6 = getQueryCacheParam( + "select sum(v1) from db1.part1 where dt between '2024-04-15' and '2024-04-20' and k1 = 2" + ); + Assertions.assertNotEquals(queryCacheParam5.digest, queryCacheParam6.digest); + Assertions.assertEquals(32, queryCacheParam5.digest.remaining()); + Assertions.assertEquals(32, queryCacheParam6.digest.remaining()); + Assertions.assertEquals(queryCacheParam5.tablet_to_range, queryCacheParam6.tablet_to_range); + } + + @Test + public void testSelectFromGroupBy() { + twoPhaseAggWithoutDistinct(() -> { + TQueryCacheParam queryCacheParam1 = getQueryCacheParam("select k1, sum(v1) from db1.part1 group by k1"); + TQueryCacheParam queryCacheParam2 = getQueryCacheParam("select k1, sum(v1) from db1.part1 group by k1 limit 10"); + Assertions.assertNotEquals(queryCacheParam1.digest, queryCacheParam2.digest); + Assertions.assertEquals(32, queryCacheParam1.digest.remaining()); + Assertions.assertEquals(32, queryCacheParam2.digest.remaining()); + + TQueryCacheParam queryCacheParam3 = getQueryCacheParam("select sum(v1), k1 from db1.part1 group by k1"); + Assertions.assertEquals(queryCacheParam1.digest, queryCacheParam3.digest); + Assertions.assertEquals( + Lists.newArrayList(queryCacheParam1.tablet_to_range.values()), + Lists.newArrayList(queryCacheParam3.tablet_to_range.values()) + ); + + List plans1 = normalizePlans("select k1, sum(v1) from db1.part1 group by k1"); + List plans2 = normalizePlans( + "select sum(v1), k1 from db1.part1 where dt between '2024-04-10' and '2024-05-06' group by k1"); + Assertions.assertEquals(plans1, plans2); + }); + } + + @Test + public void phasesAgg() { + List onePhaseAggPlans = onePhaseAggWithoutDistinct(() -> normalizePlans( + "select sum(v1), k1 from db1.part1 where dt = '2024-04-10' group by k1")); + List onePhaseAggPlans2 = onePhaseAggWithoutDistinct(() -> normalizePlans( + "select k1, sum(v1) from db1.part1 where dt = '2024-04-10' group by k1")); + Assertions.assertEquals(onePhaseAggPlans, onePhaseAggPlans2); + + List twoPhaseAggPlans = twoPhaseAggWithoutDistinct(() -> normalizePlans( + "select sum(v1), k1 from db1.part1 where dt = '2024-04-10' group by k1")); + Assertions.assertNotEquals(onePhaseAggPlans, twoPhaseAggPlans); + } + + @Test + public void phasesDistinctAgg() { + List noDistinctPlans = onePhaseAggWithoutDistinct(() -> normalizePlans( + "select k1 from db1.part1 where dt = '2024-04-10' group by k1")); + List onePhaseAggPlans = onePhaseAggWithDistinct(() -> normalizePlans( + "select distinct k1 from db1.part1 where dt = '2024-04-10'")); + Assertions.assertEquals(noDistinctPlans, onePhaseAggPlans); + List twoPhaseAggPlans = twoPhaseAggWithDistinct(() -> normalizePlans( + "select distinct k1 from db1.part1 where dt = '2024-04-10'")); + Assertions.assertEquals(onePhaseAggPlans, twoPhaseAggPlans); + + List threePhaseAggPlans = threePhaseAggWithDistinct(() -> normalizePlans( + "select sum(distinct v1), k1 from db1.part1 where dt = '2024-04-10' group by k1")); + List fourPhaseAggPlans = fourPhaseAggWithDistinct(() -> normalizePlans( + "select sum(distinct v1), k1 from db1.part1 where dt = '2024-04-10' group by k1")); + Assertions.assertNotEquals(fourPhaseAggPlans, threePhaseAggPlans); + } + + private String getDigest(String sql) throws Exception { + return Hex.encodeHexString(getQueryCacheParam(sql).digest); + } + + private TQueryCacheParam getQueryCacheParam(String sql) throws Exception { + List queryCaches = normalize(sql); + Assertions.assertEquals(1, queryCaches.size()); + return queryCaches.get(0); + } + + private List normalize(String sql) throws Exception { + Planner planner = getSqlStmtExecutor(sql).planner(); + DescriptorTable descTable = planner.getDescTable(); + List fragments = planner.getFragments(); + List queryCacheParams = new ArrayList<>(); + for (PlanFragment fragment : fragments) { + QueryCacheNormalizer normalizer = new QueryCacheNormalizer(fragment, descTable); + Optional queryCacheParam = normalizer.normalize(connectContext); + if (queryCacheParam.isPresent()) { + queryCacheParams.add(queryCacheParam.get()); + } + } + return queryCacheParams; + } + + private List normalizePlans(String sql) throws Exception { + Planner planner = getSqlStmtExecutor(sql).planner(); + DescriptorTable descTable = planner.getDescTable(); + List fragments = planner.getFragments(); + List normalizedPlans = new ArrayList<>(); + for (PlanFragment fragment : fragments) { + QueryCacheNormalizer normalizer = new QueryCacheNormalizer(fragment, descTable); + normalizedPlans.addAll(normalizer.normalizePlans(connectContext)); + } + return normalizedPlans; + } + + private void onePhaseAggWithoutDistinct(Callback callback) { + onePhaseAggWithoutDistinct(() -> { + callback.run(); + return null; + }); + } + + private T onePhaseAggWithDistinct(ResultCallback callback) { + try { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION," + + "TWO_PHASE_AGGREGATE_WITH_DISTINCT," + + "THREE_PHASE_AGGREGATE_WITH_DISTINCT," + + "FOUR_PHASE_AGGREGATE_WITH_DISTINCT," + + "FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE," + + "TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI"); + return callback.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + } + + private T twoPhaseAggWithDistinct(ResultCallback callback) { + try { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION," + + "ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI," + + "THREE_PHASE_AGGREGATE_WITH_DISTINCT," + + "FOUR_PHASE_AGGREGATE_WITH_DISTINCT," + + "FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE," + + "ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI"); + return callback.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + } + + private T threePhaseAggWithDistinct(ResultCallback callback) { + try { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION," + + "ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI," + + "FOUR_PHASE_AGGREGATE_WITH_DISTINCT," + + "FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE," + + "ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI," + + "TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI"); + return callback.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + } + + private T fourPhaseAggWithDistinct(ResultCallback callback) { + try { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION," + + "ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI," + + "THREE_PHASE_AGGREGATE_WITH_DISTINCT," + + "ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI," + + "TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI"); + return callback.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + } + + + private T onePhaseAggWithoutDistinct(ResultCallback callback) { + try { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION,TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT"); + return callback.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + } + + private void twoPhaseAggWithoutDistinct(Callback callback) { + twoPhaseAggWithoutDistinct(() -> { + callback.run(); + return null; + }); + } + + private T twoPhaseAggWithoutDistinct(ResultCallback callback) { + try { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION,ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT"); + return callback.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + connectContext.getSessionVariable() + .setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + } + + private interface Callback { + void run() throws Throwable; + } + + private interface ResultCallback { + T run() throws Throwable; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 510b2ba00eb159..407c1544a4b00b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -605,6 +605,7 @@ public StmtExecutor getSqlStmtExecutor(String queryStr) throws Exception { && connectContext.getState().getErrorCode() == null) { return stmtExecutor; } else { + // throw new IllegalStateException(connectContext.getState().getErrorMessage()); return null; } } diff --git a/gensrc/thrift/Normalization.thrift b/gensrc/thrift/Normalization.thrift new file mode 100644 index 00000000000000..1eedfef6fbee97 --- /dev/null +++ b/gensrc/thrift/Normalization.thrift @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace java org.apache.doris.thrift + +include "Exprs.thrift" +include "Types.thrift" +include "Opcodes.thrift" +include "Descriptors.thrift" +include "Partitions.thrift" +include "PlanNodes.thrift" + +struct TNormalizedOlapScanNode { + 1: optional i64 table_id + 2: optional i64 index_id + 3: optional bool is_preaggregation + 4: optional list key_column_names + 5: optional list key_column_types + 6: optional string rollup_name + 7: optional string sort_column + 8: optional list select_columns +} + +struct TNormalizedAggregateNode { + 1: optional list grouping_exprs + 2: optional list aggregate_functions + 3: optional Types.TTupleId intermediate_tuple_id + 4: optional Types.TTupleId output_tuple_id + 5: optional bool is_finalize + 6: optional bool use_streaming_preaggregation + 7: optional list projectToAggIntermediateTuple + 8: optional list projectToAggOutputTuple +} + +struct TNormalizedPlanNode { + 1: optional Types.TPlanNodeId node_id + 2: optional PlanNodes.TPlanNodeType node_type + 3: optional i32 num_children + 5: optional set tuple_ids + 6: optional set nullable_tuples + 7: optional list conjuncts + 8: optional list projects + 9: optional i64 limit + + 10: optional TNormalizedOlapScanNode olap_scan_node + 11: optional TNormalizedAggregateNode aggregation_node +} \ No newline at end of file diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift index 08205b9399e5af..866d8d45320243 100644 --- a/gensrc/thrift/Planner.thrift +++ b/gensrc/thrift/Planner.thrift @@ -23,6 +23,7 @@ include "Exprs.thrift" include "DataSinks.thrift" include "PlanNodes.thrift" include "Partitions.thrift" +include "QueryCache.thrift" // TPlanFragment encapsulates info needed to execute a particular // plan fragment, including how to produce and how to partition its output. @@ -61,6 +62,8 @@ struct TPlanFragment { // sink) in a single instance of this fragment. This is used for an optimization in // InitialReservation. Measured in bytes. required in V1 8: optional i64 initial_reservation_total_claims + + 9: optional QueryCache.TQueryCacheParam query_cache_param } // location information for a single scan range @@ -79,4 +82,4 @@ struct TScanRangeLocations { 1: required PlanNodes.TScanRange scan_range // non-empty list 2: list locations -} +} \ No newline at end of file diff --git a/gensrc/thrift/QueryCache.thrift b/gensrc/thrift/QueryCache.thrift new file mode 100644 index 00000000000000..048b27f4572dd1 --- /dev/null +++ b/gensrc/thrift/QueryCache.thrift @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace cpp doris +namespace java org.apache.doris.thrift + +struct TQueryCacheParam { + 1: optional i32 node_id + + 2: optional binary digest + + // the query slots order can different to the query cache slots order, + // so we should mapping current slot id in planNode to normalized slot id + // say: + // SQL1: select id, count(*) cnt, sum(value) s from tbl group by id + // SQL2: select sum(value) s, count(*) cnt, id from tbl group by id + // the id always has normalized slot id 0, + // the cnt always has normalized slot id 1 + // the s always has normalized slot id 2 + // but in SQL1, id, cnt, s can has slot id 5, 6, 7 + // in SQL2, s, cnt, id can has slot id 10, 11, 12 + // if generate plan cache in SQL1, we will make output_slot_mapping: {5: 0, 6: 1, 7: 2}, + // the SQL2 read plan cache and make output_slot_mapping: {10: 2, 11: 1, 12: 0}, + // even the select order is different, the normalized slot id is always equals: + // the id always is 0, the cnt always is 1, the s always is 2. + // then backend can mapping the current slots in the tuple to the query cached slots + 3: optional map output_slot_mapping + + // mapping tablet to filter range, + // BE will use as the key to search query cache. + // note that, BE not care what the filter range content is, just use as the part of the key. + 4: optional map tablet_to_range + + 5: optional bool force_refresh_query_cache + + 6: optional i64 entry_max_bytes + + 7: optional i64 entry_max_rows +} \ No newline at end of file