diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 9de006cebb5c46..5c25fd4aded023 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -530,12 +530,6 @@ public void readFields(DataInput in) throws IOException { this.createTime = in.readLong(); } - // return if this table is partitioned. - // For OlapTable, return true only if its partition type is RANGE or HASH - public boolean isPartitionedTable() { - return false; - } - // return if this table is partitioned, for planner. // For OlapTable ture when is partitioned, or distributed by hash when no partition public boolean isPartitionDistributed() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index c5039660e6eeae..22c8f7106abecd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -560,4 +560,8 @@ default boolean isPartitionColumn(String columnName) { default Set getDistributionColumnNames() { return Sets.newHashSet(); } + + default boolean isPartitionedTable() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index e29bafd5dc7a0a..6e648b25d674b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -1068,4 +1068,10 @@ private List getFilesForPartitions( String bindBrokerName = catalog.bindBrokerName(); return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); } + + @Override + public boolean isPartitionedTable() { + makeSureInitialized(); + return remoteTable.getPartitionKeysSize() > 0; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5bafaf4b469926..1ef5cee4f1778f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -485,6 +485,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG = "enable_auto_analyze_internal_catalog"; + public static final String ENABLE_PARTITION_ANALYZE = "enable_partition_analyze"; + public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = "auto_analyze_table_width_threshold"; public static final String FASTER_FLOAT_CONVERT = "faster_float_convert"; @@ -1569,6 +1571,11 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { flag = VariableMgr.GLOBAL) public boolean enableAutoAnalyzeInternalCatalog = true; + @VariableMgr.VarAttr(name = ENABLE_PARTITION_ANALYZE, + description = {"临时参数,收否收集分区级别统计信息", "Temp variable, enable to collect partition level statistics."}, + flag = VariableMgr.GLOBAL) + public boolean enablePartitionAnalyze = false; + @VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD, description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集", "Maximum table width to enable auto analyze, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 368112c26c99a1..c78415c674cc67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -24,7 +24,9 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.qe.AuditLogHelper; import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; @@ -46,6 +48,7 @@ public abstract class BaseAnalysisTask { public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB public static final double LIMIT_FACTOR = 1.2; + public static final int PARTITION_BATCH_SIZE = 100; protected static final String FULL_ANALYZE_TEMPLATE = "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " @@ -144,6 +147,44 @@ public abstract class BaseAnalysisTask { + "${data_size} AS `data_size`, " + "NOW() "; + protected static final String PARTITION_ANALYZE_TEMPLATE = " SELECT " + + "${catalogId} AS `catalog_id`, " + + "${dbId} AS `db_id`, " + + "${tblId} AS `tbl_id`, " + + "${idxId} AS `idx_id`, " + + "${partId} AS `part_id`, " + + "'${colId}' AS `col_id`, " + + "COUNT(1) AS `row_count`, " + + "HLL_UNION(HLL_HASH(`${colName}`)) as ndv, " + + "COUNT(1) - COUNT(`${colName}`) AS `null_count`, " + + "SUBSTRING(CAST(MIN(`${colName}`) AS STRING), 1, 1024) AS `min`, " + + "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`, " + + "${dataSizeFunction} AS `data_size`, " + + "NOW() AS `update_time` " + + " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${partitionInfo}"; + + protected static final String MERGE_PARTITION_TEMPLATE = + "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " + + "${catalogId} AS `catalog_id`, " + + "${dbId} AS `db_id`, " + + "${tblId} AS `tbl_id`, " + + "${idxId} AS `idx_id`, " + + "'${colId}' AS `col_id`, " + + "NULL AS `part_id`, " + + "SUM(count) AS `row_count`, " + + "HLL_CARDINALITY(HLL_UNION(ndv)) AS `ndv`, " + + "SUM(null_count) AS `null_count`, " + + "MIN(min) AS `min`, " + + "MAX(max) AS `max`, " + + "SUM(data_size_in_bytes) AS `data_size`, " + + "NOW() AS `update_time` FROM " + + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME + + " WHERE `catalog_id` = ${catalogId} " + + " AND `db_id` = ${dbId} " + + " AND `tbl_id` = ${tblId} " + + " AND `idx_id` = ${idxId} " + + " AND `col_id` = '${colId}'"; + protected AnalysisInfo info; protected CatalogIf> catalog; @@ -320,4 +361,22 @@ protected void runQuery(String sql) { } } + protected void runInsert(String sql) throws Exception { + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + stmtExecutor = new StmtExecutor(r.connectContext, sql); + try { + stmtExecutor.execute(); + QueryState queryState = stmtExecutor.getContext().getState(); + if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { + throw new RuntimeException( + "Failed to insert : " + stmtExecutor.getOriginStmt().originStmt + "Error msg: " + + queryState.getErrorMessage()); + } + } finally { + AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(), + stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), true); + } + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 302a861ad1ee4d..f85542511b82d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -33,6 +33,8 @@ import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.commons.text.StringSubstitutor; import java.security.SecureRandom; @@ -206,15 +208,56 @@ protected ResultRow collectBasicStat(AutoCloseConnectContext context) { return resultRow; } + protected void doFull() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Will do full collection for column {}", col.getName()); + } + if (StatisticsUtil.enablePartitionAnalyze() && tbl.isPartitionedTable()) { + doPartitionTable(); + } else { + doNonPartitionTable(); + } + } + /** * 1. Get stats of each partition * 2. insert partition in batch * 3. calculate column stats based on partition stats */ - protected void doFull() { - if (LOG.isDebugEnabled()) { - LOG.debug("Will do full collection for column {}", col.getName()); + protected void doPartitionTable() throws Exception { + Map params = buildSqlParams(); + Set partitionNames = tbl.getPartitionNames(); + List sqls = Lists.newArrayList(); + int count = 0; + for (String part : partitionNames) { + params.put("partId", "'" + StatisticsUtil.escapeColumnName(part) + "'"); + params.put("partitionInfo", "partition " + part); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + sqls.add(stringSubstitutor.replace(PARTITION_ANALYZE_TEMPLATE)); + count++; + if (count == PARTITION_BATCH_SIZE) { + String sql = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME + + Joiner.on(" UNION ALL ").join(sqls); + runInsert(sql); + sqls.clear(); + count = 0; + } + } + if (count > 0) { + String sql = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME + + Joiner.on(" UNION ALL ").join(sqls); + runInsert(sql); } + StringSubstitutor stringSubstitutor = new StringSubstitutor(buildSqlParams()); + runQuery(stringSubstitutor.replace(MERGE_PARTITION_TEMPLATE)); + } + + protected void doNonPartitionTable() { + StringSubstitutor stringSubstitutor = new StringSubstitutor(buildSqlParams()); + runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE)); + } + + protected Map buildSqlParams() { Map params = new HashMap<>(); params.put("internalDB", FeConstants.INTERNAL_DB_NAME); params.put("columnStatTbl", StatisticConstants.TABLE_STATISTIC_TBL_NAME); @@ -229,8 +272,7 @@ protected void doFull() { params.put("colName", StatisticsUtil.escapeColumnName(String.valueOf(info.colName))); params.put("tblName", String.valueOf(tbl.getName())); params.put("index", getIndex()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE)); + return params; } protected String getIndex() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 8dea5d515a60e6..67de69c57dac87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -70,6 +70,9 @@ public class StatisticConstants { public static final String FULL_QUALIFIED_STATS_TBL_NAME = InternalCatalog.INTERNAL_CATALOG_NAME + "." + FeConstants.INTERNAL_DB_NAME + "." + TABLE_STATISTIC_TBL_NAME; + public static final String FULL_QUALIFIED_PARTITION_STATS_TBL_NAME = InternalCatalog.INTERNAL_CATALOG_NAME + + "." + FeConstants.INTERNAL_DB_NAME + "." + PARTITION_STATISTIC_TBL_NAME; + public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3; public static final int RETRY_LOAD_QUEUE_SIZE = 1000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 955ddea04b1c8c..a90932941f7560 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -813,6 +813,16 @@ public static boolean enableAutoAnalyzeInternalCatalog() { return true; } + public static boolean enablePartitionAnalyze() { + try { + return findConfigFromGlobalSessionVar( + SessionVariable.ENABLE_PARTITION_ANALYZE).enablePartitionAnalyze; + } catch (Exception e) { + LOG.warn("Fail to get value of enable partition analyze, return false by default", e); + } + return false; + } + public static int getInsertMergeCount() { try { return findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)