diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 0683e54082a401..54ccc40e9e2b28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -28,6 +28,7 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; @@ -199,6 +200,11 @@ public class OlapScanNode extends ScanNode { private boolean shouldColoScan = false; + // cached for prepared statement to quickly prune partition + // only used in short circuit plan at present + private final PartitionPruneV2ForShortCircuitPlan cachedPartitionPruner = + new PartitionPruneV2ForShortCircuitPlan(); + // Constructs node to scan given data files of table 'tbl'. public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE); @@ -674,8 +680,17 @@ private Collection partitionPrune(PartitionInfo partitionInfo, } else { keyItemMap = partitionInfo.getIdToItem(false); } - if (partitionInfo.getType() == PartitionType.RANGE) { + if (isPointQuery() && partitionInfo.getPartitionColumns().size() == 1) { + // short circuit, a quick path to find partition + ColumnRange filterRange = columnNameToRange.get(partitionInfo.getPartitionColumns().get(0).getName()); + LiteralExpr lowerBound = filterRange.getRangeSet().get().asRanges().stream() + .findFirst().get().lowerEndpoint().getValue(); + LiteralExpr upperBound = filterRange.getRangeSet().get().asRanges().stream() + .findFirst().get().upperEndpoint().getValue(); + cachedPartitionPruner.update(keyItemMap); + return cachedPartitionPruner.prune(lowerBound, upperBound); + } partitionPruner = new RangePartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(), columnNameToRange); } else if (partitionInfo.getType() == PartitionType.LIST) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java new file mode 100644 index 00000000000000..8d39ed4d4fcbd6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Range; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; + +public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base { + private static final Logger LOG = LogManager.getLogger(PartitionPruneV2ForShortCircuitPlan.class); + // map to record literal range to find specific partition + private RangeMap partitionRangeMapByLiteral = new RangeMap<>(); + // last timestamp partitionRangeMapByLiteral updated + private long lastPartitionRangeMapUpdateTimestampMs = 0; + + PartitionPruneV2ForShortCircuitPlan() { + super(); + } + + public boolean update(Map keyItemMap) { + // interval to update partitionRangeMapByLiteral + long partitionRangeMapUpdateIntervalS = 10; + if (System.currentTimeMillis() - lastPartitionRangeMapUpdateTimestampMs + > partitionRangeMapUpdateIntervalS * 1000) { + partitionRangeMapByLiteral = new RangeMap<>(); + // recalculate map + for (Entry entry : keyItemMap.entrySet()) { + Range range = entry.getValue().getItems(); + LiteralExpr partitionLowerBound = (LiteralExpr) range.lowerEndpoint().getKeys().get(0); + LiteralExpr partitionUpperBound = (LiteralExpr) range.upperEndpoint().getKeys().get(0); + Range partitionRange = Range.closedOpen(partitionLowerBound, partitionUpperBound); + partitionRangeMapByLiteral.put(partitionRange, entry.getKey()); + } + LOG.debug("update partitionRangeMapByLiteral"); + this.lastPartitionRangeMapUpdateTimestampMs = System.currentTimeMillis(); + return true; + } + return false; + } + + public Collection prune(LiteralExpr lowerBound, LiteralExpr upperBound) throws AnalysisException { + Range filterRangeValue = Range.closed(lowerBound, upperBound); + return partitionRangeMapByLiteral.getOverlappingRangeValues(filterRangeValue); + } + + @Override + public Collection prune() throws AnalysisException { + throw new AnalysisException("Not implemented"); + } + + @Override + void genSingleColumnRangeMap() { + } + + @Override + FinalFilters getFinalFilters(ColumnRange columnRange, + Column column) throws AnalysisException { + throw new AnalysisException("Not implemented"); + } + + @Override + Collection pruneMultipleColumnPartition(Map columnToFilters) throws AnalysisException { + throw new AnalysisException("Not implemented"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java index 376e2a4c7f064c..1d9f163ca804e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java @@ -48,6 +48,13 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { // currently only used for list partition private Map.Entry defaultPartition; + // Only called in PartitionPruneV2ByShortCircuitPlan constructor + PartitionPrunerV2Base() { + this.idToPartitionItem = null; + this.partitionColumns = null; + this.columnNameToRange = null; + } + public PartitionPrunerV2Base(Map idToPartitionItem, List partitionColumns, Map columnNameToRange) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java new file mode 100644 index 00000000000000..c8fd07f940d1b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RangeMap.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import com.google.common.collect.Range; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public class RangeMap, V> { + + private final NavigableMap, V> rangeMap = new TreeMap<>(new RangeComparator()); + + public void put(Range range, V value) { + rangeMap.put(range, value); + } + + public List getOverlappingRangeValues(Range searchRange) { + return getOverlappingRanges(searchRange).stream() + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + } + + public List, V>> getOverlappingRanges(Range searchRange) { + List, V>> overlappingRanges = new ArrayList<>(); + + // Find the possible starting point for the search + Map.Entry, V> floorEntry = rangeMap.floorEntry(searchRange); + Map.Entry, V> ceilingEntry = rangeMap.ceilingEntry(searchRange); + + // Start iterating from the earlier of the floor or ceiling entry + Map.Entry, V> startEntry = (floorEntry != null) ? floorEntry : ceilingEntry; + if (startEntry == null) { + return overlappingRanges; + } + + for (Map.Entry, V> entry : rangeMap.tailMap(startEntry.getKey()).entrySet()) { + if (entry.getKey().lowerEndpoint().compareTo(searchRange.upperEndpoint()) > 0) { + break; // No more overlapping ranges possible + } + if (entry.getKey().isConnected(searchRange) && !entry.getKey().intersection(searchRange).isEmpty()) { + overlappingRanges.add(entry); + } + } + return overlappingRanges; + } + + private static class RangeComparator> implements java.util.Comparator> { + @Override + public int compare(Range r1, Range r2) { + return r1.lowerEndpoint().compareTo(r2.lowerEndpoint()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index 50c422a1979340..06308fd6341987 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -121,6 +121,9 @@ void setScanRangeLocations() throws Exception { OlapScanNode planRoot = getPlanRoot(); // compute scan range List locations = planRoot.lazyEvaluateRangeLocations(); + if (planRoot.getScanTabletIds().isEmpty()) { + return; + } Preconditions.checkState(planRoot.getScanTabletIds().size() == 1); this.tabletID = planRoot.getScanTabletIds().get(0); @@ -167,6 +170,10 @@ public void cancel(Types.PPlanFragmentCancelReason cancelReason) { @Override public RowBatch getNext() throws Exception { setScanRangeLocations(); + // No partition/tablet found return emtpy row batch + if (candidateBackends == null || candidateBackends.isEmpty()) { + return new RowBatch(); + } Iterator backendIter = candidateBackends.iterator(); RowBatch rowBatch = null; int tryCount = 0; diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out b/regression-test/data/point_query_p0/test_point_query_partition.out new file mode 100644 index 00000000000000..cd22e6c93ec4bb --- /dev/null +++ b/regression-test/data/point_query_p0/test_point_query_partition.out @@ -0,0 +1,33 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !point_select -- +1 a + +-- !point_select -- +2 b + +-- !point_select -- +11 d + +-- !point_select -- +-1 c + +-- !point_select -- +11 d + +-- !point_select -- + +-- !point_select -- + +-- !point_select -- +33 f + +-- !point_select -- +45 g + +-- !point_select -- + +-- !point_select -- +999 h + +-- !point_select -- + diff --git a/regression-test/suites/point_query_p0/test_point_query_partition.groovy b/regression-test/suites/point_query_p0/test_point_query_partition.groovy new file mode 100644 index 00000000000000..7b5966db0c1915 --- /dev/null +++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.math.BigDecimal; + +suite("test_point_query_partition") { + def user = context.config.jdbcUser + def password = context.config.jdbcPassword + def realDb = "regression_test_serving_p0" + def tableName = realDb + ".tbl_point_query_partition" + sql "CREATE DATABASE IF NOT EXISTS ${realDb}" + + // Parse url + String jdbcUrl = context.config.jdbcUrl + String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + // set server side prepared statement url + def prepare_url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb + "?&useServerPrepStmts=true" + + def generateString = {len -> + def str = "" + for (int i = 0; i < len; i++) { + str += "a" + } + return str + } + + def nprep_sql = { sql_str -> + def url_without_prep = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb + connect(user = user, password = password, url = url_without_prep) { + sql sql_str + } + } + + sql """DROP TABLE IF EXISTS ${tableName}""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL COMMENT "", + `value` text NULL COMMENT "" + ) ENGINE=OLAP + UNIQUE KEY(`k1`) + PARTITION BY RANGE(`k1`) + ( + PARTITION `p1` VALUES LESS THAN ("1"), + PARTITION `p2` VALUES LESS THAN ("10"), + PARTITION `p3` VALUES LESS THAN ("30"), + PARTITION `p4` VALUES LESS THAN ("40"), + PARTITION `p5` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "storage_format" = "V2") + """ + + sql """INSERT INTO ${tableName} VALUES (1, 'a')""" + sql """INSERT INTO ${tableName} VALUES (2, 'b')""" + sql """INSERT INTO ${tableName} VALUES (-1, 'c')""" + sql """INSERT INTO ${tableName} VALUES (11, 'd')""" + sql """INSERT INTO ${tableName} VALUES (15, 'e')""" + sql """INSERT INTO ${tableName} VALUES (33, 'f')""" + sql """INSERT INTO ${tableName} VALUES (45, 'g')""" + sql """INSERT INTO ${tableName} VALUES (999, 'h')""" + def result1 = connect(user=user, password=password, url=prepare_url) { + def stmt = prepareStatement "select * from ${tableName} where k1 = ?" + assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); + stmt.setInt(1, 1) + qe_point_select stmt + stmt.setInt(1, 2) + qe_point_select stmt + stmt.setInt(1, 11) + qe_point_select stmt + stmt.setInt(1, -1) + qe_point_select stmt + stmt.setInt(1, 11) + qe_point_select stmt + stmt.setInt(1, 12) + qe_point_select stmt + stmt.setInt(1, 34) + qe_point_select stmt + stmt.setInt(1, 33) + qe_point_select stmt + stmt.setInt(1, 45) + qe_point_select stmt + stmt.setInt(1, 666) + qe_point_select stmt + stmt.setInt(1, 999) + qe_point_select stmt + stmt.setInt(1, 1000) + qe_point_select stmt + } +} \ No newline at end of file