Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,6 @@ public void readFields(DataInput in) throws IOException {
this.createTime = in.readLong();
}

// return if this table is partitioned.
// For OlapTable, return true only if its partition type is RANGE or HASH
public boolean isPartitionedTable() {
return false;
}

// return if this table is partitioned, for planner.
// For OlapTable ture when is partitioned, or distributed by hash when no partition
public boolean isPartitionDistributed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,4 +560,8 @@ default boolean isPartitionColumn(String columnName) {
default Set<String> getDistributionColumnNames() {
return Sets.newHashSet();
}

default boolean isPartitionedTable() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1068,4 +1068,10 @@ private List<HiveMetaStoreCache.FileCacheValue> getFilesForPartitions(
String bindBrokerName = catalog.bindBrokerName();
return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName);
}

@Override
public boolean isPartitionedTable() {
makeSureInitialized();
return remoteTable.getPartitionKeysSize() > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG = "enable_auto_analyze_internal_catalog";

public static final String ENABLE_PARTITION_ANALYZE = "enable_partition_analyze";

public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = "auto_analyze_table_width_threshold";

public static final String FASTER_FLOAT_CONVERT = "faster_float_convert";
Expand Down Expand Up @@ -1569,6 +1571,11 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
flag = VariableMgr.GLOBAL)
public boolean enableAutoAnalyzeInternalCatalog = true;

@VariableMgr.VarAttr(name = ENABLE_PARTITION_ANALYZE,
description = {"临时参数,收否收集分区级别统计信息", "Temp variable, enable to collect partition level statistics."},
flag = VariableMgr.GLOBAL)
public boolean enablePartitionAnalyze = false;

@VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
"Maximum table width to enable auto analyze, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
Expand All @@ -46,6 +48,7 @@ public abstract class BaseAnalysisTask {

public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;
public static final int PARTITION_BATCH_SIZE = 100;

protected static final String FULL_ANALYZE_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
Expand Down Expand Up @@ -144,6 +147,44 @@ public abstract class BaseAnalysisTask {
+ "${data_size} AS `data_size`, "
+ "NOW() ";

protected static final String PARTITION_ANALYZE_TEMPLATE = " SELECT "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "${partId} AS `part_id`, "
+ "'${colId}' AS `col_id`, "
+ "COUNT(1) AS `row_count`, "
+ "HLL_UNION(HLL_HASH(`${colName}`)) as ndv, "
+ "COUNT(1) - COUNT(`${colName}`) AS `null_count`, "
+ "SUBSTRING(CAST(MIN(`${colName}`) AS STRING), 1, 1024) AS `min`, "
+ "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`, "
+ "${dataSizeFunction} AS `data_size`, "
+ "NOW() AS `update_time` "
+ " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${partitionInfo}";

protected static final String MERGE_PARTITION_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "SUM(count) AS `row_count`, "
+ "HLL_CARDINALITY(HLL_UNION(ndv)) AS `ndv`, "
+ "SUM(null_count) AS `null_count`, "
+ "MIN(min) AS `min`, "
+ "MAX(max) AS `max`, "
+ "SUM(data_size_in_bytes) AS `data_size`, "
+ "NOW() AS `update_time` FROM "
+ StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ " WHERE `catalog_id` = ${catalogId} "
+ " AND `db_id` = ${dbId} "
+ " AND `tbl_id` = ${tblId} "
+ " AND `idx_id` = ${idxId} "
+ " AND `col_id` = '${colId}'";

protected AnalysisInfo info;

protected CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog;
Expand Down Expand Up @@ -320,4 +361,22 @@ protected void runQuery(String sql) {
}
}

protected void runInsert(String sql) throws Exception {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
stmtExecutor = new StmtExecutor(r.connectContext, sql);
try {
stmtExecutor.execute();
QueryState queryState = stmtExecutor.getContext().getState();
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
throw new RuntimeException(
"Failed to insert : " + stmtExecutor.getOriginStmt().originStmt + "Error msg: "
+ queryState.getErrorMessage());
}
} finally {
AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(),
stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), true);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.text.StringSubstitutor;

import java.security.SecureRandom;
Expand Down Expand Up @@ -206,15 +208,56 @@ protected ResultRow collectBasicStat(AutoCloseConnectContext context) {
return resultRow;
}

protected void doFull() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Will do full collection for column {}", col.getName());
}
if (StatisticsUtil.enablePartitionAnalyze() && tbl.isPartitionedTable()) {
doPartitionTable();
} else {
doNonPartitionTable();
}
}

/**
* 1. Get stats of each partition
* 2. insert partition in batch
* 3. calculate column stats based on partition stats
*/
protected void doFull() {
if (LOG.isDebugEnabled()) {
LOG.debug("Will do full collection for column {}", col.getName());
protected void doPartitionTable() throws Exception {
Map<String, String> params = buildSqlParams();
Set<String> partitionNames = tbl.getPartitionNames();
List<String> sqls = Lists.newArrayList();
int count = 0;
for (String part : partitionNames) {
params.put("partId", "'" + StatisticsUtil.escapeColumnName(part) + "'");
params.put("partitionInfo", "partition " + part);
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
sqls.add(stringSubstitutor.replace(PARTITION_ANALYZE_TEMPLATE));
count++;
if (count == PARTITION_BATCH_SIZE) {
String sql = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ Joiner.on(" UNION ALL ").join(sqls);
runInsert(sql);
sqls.clear();
count = 0;
}
}
if (count > 0) {
String sql = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ Joiner.on(" UNION ALL ").join(sqls);
runInsert(sql);
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(buildSqlParams());
runQuery(stringSubstitutor.replace(MERGE_PARTITION_TEMPLATE));
}

protected void doNonPartitionTable() {
StringSubstitutor stringSubstitutor = new StringSubstitutor(buildSqlParams());
runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE));
}

protected Map<String, String> buildSqlParams() {
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.TABLE_STATISTIC_TBL_NAME);
Expand All @@ -229,8 +272,7 @@ protected void doFull() {
params.put("colName", StatisticsUtil.escapeColumnName(String.valueOf(info.colName)));
params.put("tblName", String.valueOf(tbl.getName()));
params.put("index", getIndex());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE));
return params;
}

protected String getIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class StatisticConstants {
public static final String FULL_QUALIFIED_STATS_TBL_NAME = InternalCatalog.INTERNAL_CATALOG_NAME
+ "." + FeConstants.INTERNAL_DB_NAME + "." + TABLE_STATISTIC_TBL_NAME;

public static final String FULL_QUALIFIED_PARTITION_STATS_TBL_NAME = InternalCatalog.INTERNAL_CATALOG_NAME
+ "." + FeConstants.INTERNAL_DB_NAME + "." + PARTITION_STATISTIC_TBL_NAME;

public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3;

public static final int RETRY_LOAD_QUEUE_SIZE = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,16 @@ public static boolean enableAutoAnalyzeInternalCatalog() {
return true;
}

public static boolean enablePartitionAnalyze() {
try {
return findConfigFromGlobalSessionVar(
SessionVariable.ENABLE_PARTITION_ANALYZE).enablePartitionAnalyze;
} catch (Exception e) {
LOG.warn("Fail to get value of enable partition analyze, return false by default", e);
}
return false;
}

public static int getInsertMergeCount() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
Expand Down