From 470988687d66bd4af84420d1d0eef396b4a1c56d Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Wed, 3 Jul 2024 16:38:18 +0800 Subject: [PATCH 1/4] [Optimize] Add session variable `max_fetch_remote_schema_tablet_count` to limit tablets size for remote schema fetch Describing tables with many partitions and tablets can cause high CPU usage. To mitigate this, we estimate and pick sample tablets for schema fetch, reducing the overall cost. --- .../org/apache/doris/catalog/OlapTable.java | 45 +++++++++++++++++++ .../common/proc/RemoteIndexSchemaProcDir.java | 4 +- .../proc/RemoteIndexSchemaProcNode.java | 9 ++++ .../util/FetchRemoteTabletSchemaUtil.java | 2 + .../org/apache/doris/qe/SessionVariable.java | 5 +++ 5 files changed, 64 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index d5b1c258c5ddf7..df3fce47fb9d98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -101,6 +101,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -2761,6 +2762,50 @@ public List getAllTablets() throws AnalysisException { return tablets; } + // Get sample tablets for remote desc schema + // 1. Estimate tablets for a partition, 1 at least + // 2. Pick the partition sorted with id in desc order, greater id with the newest partition + // 3. Truncate to sampleSize + public List getSampleTablets(int sampleSize) throws AnalysisException { + List sampleTablets = new ArrayList<>(); + Collection partitions = getPartitions(); + if (partitions.isEmpty()) { + throw new AnalysisException("No partitions available."); + } + // 1. Estimate tablets for a partition, 1 at least + int estimatePartitionTablets = Math.max(sampleSize / partitions.size(), 1); + + // 2. Sort the partitions by id in descending order (greater id means the newest partition) + List sortedPartitions = partitions.stream().sorted(new Comparator() { + @Override + public int compare(Partition p1, Partition p2) { + // compare with desc order + return Long.compare(p2.getId(), p1.getId()); + } + }).collect(Collectors.toList()); + + // 3. Collect tablets from partitions + for (Partition partition : sortedPartitions) { + List targetTablets = new ArrayList<>(partition.getBaseIndex().getTablets()); + Collections.shuffle(targetTablets); + + // Ensure we do not exceed the available number of tablets + int tabletsToFetch = Math.min(targetTablets.size(), estimatePartitionTablets); + sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch)); + + if (sampleTablets.size() >= sampleSize) { + break; + } + } + + // 4. Truncate to sample size if needed + if (sampleTablets.size() > sampleSize) { + sampleTablets = sampleTablets.subList(0, sampleSize); + } + + return sampleTablets; + } + // During `getNextVersion` and `updateVisibleVersionAndTime` period, // the write lock on the table should be held continuously public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersionTime) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java index 195e66b86e043a..f2531b7ec15fb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -62,7 +63,8 @@ public ProcResult fetchResult() throws AnalysisException { table.readLock(); try { OlapTable olapTable = (OlapTable) table; - tablets = olapTable.getAllTablets(); + // Get sample tablets for remote desc schema + tablets = olapTable.getSampleTablets(ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount); } finally { table.readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java index 8176b09bbf778d..cdb1bbc133e356 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java @@ -23,11 +23,13 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -62,6 +64,13 @@ public ProcResult fetchResult() throws AnalysisException { tablets.add(tablet); } } + // Get the maximum number of Remote Tablets that can be fetched + int maxFetchCount = ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount; + // If the number of tablets is greater than the maximum fetch count, randomly select maxFetchCount tablets + if (tablets.size() > maxFetchCount) { + Collections.shuffle(tablets); + tablets = tablets.subList(0, maxFetchCount); + } List remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); this.schema.addAll(remoteSchema); return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java index db9700f744870b..0e96dc8c5930d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java @@ -92,6 +92,8 @@ public List fetch() { Long backendId = entry.getKey(); Set tabletIds = entry.getValue(); Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + LOG.debug("fetch schema from coord backend {}, sample tablets count {}", + backend.getId(), tabletIds.size()); // only need alive be if (!backend.isAlive()) { continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 68c6505beb37ee..47529812577eb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -615,6 +615,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS = "fetch_remote_schema_timeout_seconds"; + public static final String MAX_FETCH_REMOTE_TABLET_COUNT = "max_fetch_remote_schema_tablet_count"; + // CLOUD_VARIABLES_BEGIN public static final String CLOUD_CLUSTER = "cloud_cluster"; public static final String DISABLE_EMPTY_PARTITION_PRUNE = "disable_empty_partition_prune"; @@ -1911,6 +1913,9 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { // fetch remote schema rpc timeout @VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy = true) public long fetchRemoteSchemaTimeoutSeconds = 120; + // max tablet count for fetch remote schema + @VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true) + public int maxFetchRemoteTabletCount = 512; @VariableMgr.VarAttr( name = ENABLE_JOIN_SPILL, From 6be039c22ee0cde2af8bbc38381a0634afed1dbe Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Wed, 3 Jul 2024 16:41:31 +0800 Subject: [PATCH 2/4] add test --- regression-test/data/variant_p0/desc.out | 8 ++++++++ regression-test/suites/variant_p0/desc.groovy | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/regression-test/data/variant_p0/desc.out b/regression-test/data/variant_p0/desc.out index b46b5f9b4b08d8..b3ebce2b887835 100644 --- a/regression-test/data/variant_p0/desc.out +++ b/regression-test/data/variant_p0/desc.out @@ -198,3 +198,11 @@ v.金额 SMALLINT Yes false \N NONE k BIGINT Yes true \N v VARIANT Yes false \N NONE +-- !sql15 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a TINYINT Yes false \N NONE +v.b TINYINT Yes false \N NONE +v.c TINYINT Yes false \N NONE +v.d TINYINT Yes false \N NONE + diff --git a/regression-test/suites/variant_p0/desc.groovy b/regression-test/suites/variant_p0/desc.groovy index f600496eae549e..b4d3020310ca33 100644 --- a/regression-test/suites/variant_p0/desc.groovy +++ b/regression-test/suites/variant_p0/desc.groovy @@ -238,6 +238,23 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql """ insert into ${table_name} values (0, '100')""" sql """set describe_extend_variant_column = true""" qt_sql_12 """desc ${table_name}""" + + + // desc with large tablets + table_name = "large_tablets" + create_table_partition.call(table_name, "200") + sql """insert into large_tablets values (1, '{"a" : 10}')""" + sql """insert into large_tablets values (3001, '{"b" : 10}')""" + sql """insert into large_tablets values (50001, '{"c" : 10}')""" + sql """insert into large_tablets values (99999, '{"d" : 10}')""" + sql """set max_fetch_remote_schema_tablet_count = 2""" + sql "desc large_tablets" + sql """set max_fetch_remote_schema_tablet_count = 128""" + sql "desc large_tablets" + sql """set max_fetch_remote_schema_tablet_count = 512""" + sql "desc large_tablets" + sql """set max_fetch_remote_schema_tablet_count = 2048""" + qt_sql15 "desc large_tablets" } finally { // reset flags set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") From 98bc206353eaa4b51698b4b8d490f4ae24779340 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Wed, 3 Jul 2024 18:50:51 +0800 Subject: [PATCH 3/4] check partition not empty --- .../org/apache/doris/catalog/OlapTable.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index df3fce47fb9d98..1a23bbb2f3af03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2766,11 +2766,15 @@ public List getAllTablets() throws AnalysisException { // 1. Estimate tablets for a partition, 1 at least // 2. Pick the partition sorted with id in desc order, greater id with the newest partition // 3. Truncate to sampleSize - public List getSampleTablets(int sampleSize) throws AnalysisException { + public List getSampleTablets(int sampleSize) { List sampleTablets = new ArrayList<>(); - Collection partitions = getPartitions(); + // Filter partition with empty data + Collection partitions = getPartitions() + .stream() + .filter(partition -> partition.getVisibleVersion() > Partition.PARTITION_INIT_VERSION) + .collect(Collectors.toList()); if (partitions.isEmpty()) { - throw new AnalysisException("No partitions available."); + return sampleTablets; } // 1. Estimate tablets for a partition, 1 at least int estimatePartitionTablets = Math.max(sampleSize / partitions.size(), 1); @@ -2788,10 +2792,11 @@ public int compare(Partition p1, Partition p2) { for (Partition partition : sortedPartitions) { List targetTablets = new ArrayList<>(partition.getBaseIndex().getTablets()); Collections.shuffle(targetTablets); - - // Ensure we do not exceed the available number of tablets - int tabletsToFetch = Math.min(targetTablets.size(), estimatePartitionTablets); - sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch)); + if (!targetTablets.isEmpty()) { + // Ensure we do not exceed the available number of tablets + int tabletsToFetch = Math.min(targetTablets.size(), estimatePartitionTablets); + sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch)); + } if (sampleTablets.size() >= sampleSize) { break; From dc7f8fab5e1b7a3245edb58d5a2bdd327e719e47 Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Sun, 7 Jul 2024 00:11:36 +0800 Subject: [PATCH 4/4] [Fix](Variant Desc) fix desc with emtpy table --- regression-test/suites/variant_p0/desc.groovy | 3 +++ 1 file changed, 3 insertions(+) diff --git a/regression-test/suites/variant_p0/desc.groovy b/regression-test/suites/variant_p0/desc.groovy index b4d3020310ca33..dfb5b40794e7f0 100644 --- a/regression-test/suites/variant_p0/desc.groovy +++ b/regression-test/suites/variant_p0/desc.groovy @@ -255,6 +255,9 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql "desc large_tablets" sql """set max_fetch_remote_schema_tablet_count = 2048""" qt_sql15 "desc large_tablets" + + sql "truncate table large_tablets" + sql "desc large_tablets" } finally { // reset flags set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")