diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 4f2c3abe9a5c9f..3ab721238199fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; @@ -45,13 +46,17 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; public class AnalysisManager { @@ -91,13 +96,22 @@ public StatisticsCache getStatisticsCache() { // Each analyze stmt corresponding to an analysis job. public void createAnalysisJob(AnalyzeStmt stmt) throws DdlException { + Map> columnToPartitions = validateAndGetPartitions(stmt); + if (columnToPartitions.isEmpty()) { + // No statistics need to be collected or updated + return; + } + long jobId = Env.getCurrentEnv().getNextId(); + TableIf table = stmt.getTable(); + AnalysisType analysisType = stmt.getAnalysisType(); + AnalysisTaskInfoBuilder taskInfoBuilder = buildCommonTaskInfo(stmt, jobId); Map analysisTaskInfos = new HashMap<>(); // start build analysis tasks - createTaskForEachColumns(stmt.getColumnNames(), taskInfoBuilder, analysisTaskInfos); - createTaskForMVIdx(stmt.getTable(), taskInfoBuilder, analysisTaskInfos, stmt.getAnalysisType()); + createTaskForEachColumns(columnToPartitions, taskInfoBuilder, analysisTaskInfos, analysisType); + createTaskForMVIdx(table, taskInfoBuilder, analysisTaskInfos, analysisType); persistAnalysisJob(taskInfoBuilder); if (stmt.isSync()) { @@ -109,6 +123,83 @@ public void createAnalysisJob(AnalyzeStmt stmt) throws DdlException { analysisTaskInfos.values().forEach(taskScheduler::schedule); } + /** + * Gets the partitions for which statistics are to be collected. First verify that + * there are partitions that have been deleted but have historical statistics(invalid statistics), + * if there are these partitions, we need to delete them to avoid errors in summary table level statistics. + * Then get the partitions for which statistics need to be collected based on collection mode (incremental/full). + *

+ * note: + * If there is no invalid statistics, it does not need to collect/update + * statistics if the following conditions are met: + * - in full collection mode, the partitioned table does not have partitions + * - in incremental collection mode, partition statistics already exist + *

+ * TODO Supports incremental collection of statistics from materialized views + */ + private Map> validateAndGetPartitions(AnalyzeStmt stmt) throws DdlException { + TableIf table = stmt.getTable(); + long tableId = table.getId(); + Set columnNames = stmt.getColumnNames(); + Set partitionNames = table.getPartitionNames(); + + Map> columnToPartitions = Maps.newHashMap(); + stmt.getColumnNames().forEach(column -> columnToPartitions + .computeIfAbsent(column, partitions -> new HashSet<>()).addAll(partitionNames)); + + if (stmt.getAnalysisType() == AnalysisType.HISTOGRAM) { + // Collecting histograms does not need to support incremental collection, + // and will automatically cover historical statistics + return columnToPartitions; + } + + // Get the partition granularity statistics that have been collected + Map> existColAndPartsForStats = StatisticsRepository + .fetchColAndPartsForStats(tableId); + + if (existColAndPartsForStats.isEmpty()) { + // There is no historical statistical information, no need to do validation + return columnToPartitions; + } + + Set existPartIdsForStats = new HashSet<>(); + existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll); + Map idToPartition = StatisticsUtil.getPartitionIdToName(table); + // Get an invalid set of partitions (those partitions were deleted) + Set invalidPartIds = existPartIdsForStats.stream() + .filter(id -> !idToPartition.containsKey(id)).collect(Collectors.toSet()); + + if (!invalidPartIds.isEmpty()) { + // Delete invalid partition statistics to avoid affecting table statistics + StatisticsRepository.dropStatistics(tableId, invalidPartIds, + columnNames, StatisticConstants.STATISTIC_TBL_NAME); + } + + if (stmt.isIncremental() && stmt.getAnalysisType() == AnalysisType.COLUMN) { + existColAndPartsForStats.values().forEach(partIds -> partIds.removeAll(invalidPartIds)); + // In incremental collection mode, just collect the uncollected partition statistics + existColAndPartsForStats.forEach((columnName, partitionIds) -> { + Set existPartitions = partitionIds.stream() + .map(idToPartition::get) + .collect(Collectors.toSet()); + columnToPartitions.computeIfPresent(columnName, (colName, partNames) -> { + partNames.removeAll(existPartitions); + return partNames; + }); + }); + } + + Collection> partitions = columnToPartitions.values(); + boolean isEmptyPartitions = partitions.stream().allMatch(Set::isEmpty); + + if (invalidPartIds.isEmpty() && isEmptyPartitions) { + // There is no invalid statistics, and there are no partition stats to be collected + return Collections.emptyMap(); + } + + return columnToPartitions; + } + private AnalysisTaskInfoBuilder buildCommonTaskInfo(AnalyzeStmt stmt, long jobId) { AnalysisTaskInfoBuilder taskInfoBuilder = new AnalysisTaskInfoBuilder(); String catalogName = stmt.getCatalogName(); @@ -194,14 +285,27 @@ private void createTaskForMVIdx(TableIf table, AnalysisTaskInfoBuilder taskInfoB } } - private void createTaskForEachColumns(Set colNames, AnalysisTaskInfoBuilder taskInfoBuilder, - Map analysisTaskInfos) throws DdlException { - for (String colName : colNames) { - AnalysisTaskInfoBuilder colTaskInfoBuilder = taskInfoBuilder.copy(); - long indexId = -1; + private void createTaskForEachColumns(Map> columnToPartitions, + AnalysisTaskInfoBuilder taskInfoBuilder, + Map analysisTaskInfos, AnalysisType analysisType) throws DdlException { + for (Entry> entry : columnToPartitions.entrySet()) { + Set partitionNames = entry.getValue(); + if (partitionNames.isEmpty()) { + continue; + } + AnalysisTaskInfoBuilder newTaskInfoBuilder = taskInfoBuilder.copy(); + if (analysisType != AnalysisType.HISTOGRAM) { + // Histograms do not need to specify partitions + newTaskInfoBuilder.setPartitionNames(partitionNames); + } long taskId = Env.getCurrentEnv().getNextId(); - AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName) - .setIndexId(indexId).setTaskId(taskId).build(); + long indexId = -1; + String colName = entry.getKey(); + AnalysisTaskInfo analysisTaskInfo = newTaskInfoBuilder + .setTaskId(taskId) + .setIndexId(indexId) + .setColName(colName) + .build(); try { StatisticsRepository.persistAnalysisTask(analysisTaskInfo); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java index c0f04b5eb53e8d..5a5d44eb8b29ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java @@ -17,9 +17,12 @@ package org.apache.doris.statistics; +import org.apache.doris.statistics.util.StatisticsUtil; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Set; import java.util.StringJoiner; public class AnalysisTaskInfo { @@ -60,6 +63,8 @@ public enum ScheduleType { public final String tblName; + public final Set partitionNames; + public final String colName; public final Long indexId; @@ -86,7 +91,7 @@ public enum ScheduleType { public final ScheduleType scheduleType; public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName, - String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod, + Set partitionNames, String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod, AnalysisType analysisType, int samplePercent, int sampleRows, int maxBucketNum, String message, int lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) { this.jobId = jobId; @@ -94,6 +99,7 @@ public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbNa this.catalogName = catalogName; this.dbName = dbName; this.tblName = tblName; + this.partitionNames = partitionNames; this.colName = colName; this.indexId = indexId; this.jobType = jobType; @@ -115,6 +121,7 @@ public String toString() { sj.add("CatalogName: " + catalogName); sj.add("DBName: " + dbName); sj.add("TableName: " + tblName); + sj.add("PartitionNames: " + StatisticsUtil.joinElementsToString(partitionNames, ",")); sj.add("ColumnName: " + colName); sj.add("TaskType: " + analysisType.toString()); sj.add("TaskMethod: " + analysisMethod.toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java index e03f4f8d63f49f..b2dd8fd593f005 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java @@ -22,12 +22,15 @@ import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType; +import java.util.Set; + public class AnalysisTaskInfoBuilder { private long jobId; private long taskId; private String catalogName; private String dbName; private String tblName; + private Set partitionNames; private String colName; private Long indexId; private JobType jobType; @@ -66,6 +69,11 @@ public AnalysisTaskInfoBuilder setTblName(String tblName) { return this; } + public AnalysisTaskInfoBuilder setPartitionNames(Set partitionNames) { + this.partitionNames = partitionNames; + return this; + } + public AnalysisTaskInfoBuilder setColName(String colName) { this.colName = colName; return this; @@ -127,7 +135,7 @@ public AnalysisTaskInfoBuilder setScheduleType(ScheduleType scheduleType) { } public AnalysisTaskInfo build() { - return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, + return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, partitionNames, colName, indexId, jobType, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, message, lastExecTimeInMs, state, scheduleType); } @@ -139,6 +147,7 @@ public AnalysisTaskInfoBuilder copy() { .setCatalogName(catalogName) .setDbName(dbName) .setTblName(tblName) + .setPartitionNames(partitionNames) .setColName(colName) .setIndexId(indexId) .setJobType(jobType) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 24e30e2126a281..12d501ae9c8eb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -74,7 +74,7 @@ public void execute() throws Exception { List partitionAnalysisSQLs = new ArrayList<>(); try { tbl.readLock(); - Set partNames = tbl.getPartitionNames(); + Set partNames = info.partitionNames; for (String partName : partNames) { Partition part = tbl.getPartition(partName); if (part == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index 80187edec2b4b8..78ca525e86f821 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -30,6 +30,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.system.SystemInfoService; +import com.google.common.collect.Maps; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -100,6 +101,11 @@ public class StatisticsRepository { + " ORDER BY update_time " + "LIMIT ${limit} OFFSET ${offset}"; + private static final String FETCH_STATS_PART_ID = "SELECT DISTINCT col_id, part_id FROM " + + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME + + " WHERE tbl_id = ${tblId}" + + " AND part_id IS NOT NULL"; + public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) { ResultRow resultRow = queryColumnStatisticById(tableId, colName); if (resultRow == null) { @@ -173,14 +179,23 @@ private static String constructId(Object... params) { } public static void dropStatistics(long tblId, Set colNames) throws DdlException { - dropStatistics(tblId, colNames, StatisticConstants.STATISTIC_TBL_NAME); - dropStatistics(tblId, colNames, StatisticConstants.HISTOGRAM_TBL_NAME); + dropStatistics(tblId, Collections.emptySet(), colNames, StatisticConstants.STATISTIC_TBL_NAME); + dropStatistics(tblId, Collections.emptySet(), colNames, StatisticConstants.HISTOGRAM_TBL_NAME); } - public static void dropStatistics(long tblId, Set colNames, String statsTblName) throws DdlException { + public static void dropStatistics(long tblId, Set partNames, Set colNames, + String statsTblName) throws DdlException { Map params = new HashMap<>(); - String right = colNames.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")); - String inPredicate = String.format("tbl_id = %s AND %s IN (%s)", tblId, "col_id", right); + String inPredicate = String.format("tbl_id = %s", tblId); + + if (!colNames.isEmpty()) { + String right = StatisticsUtil.joinElementsToString(partNames, "', '"); + inPredicate += String.format(" AND col_id IN ('%s')", right); + } + if (!partNames.isEmpty()) { + String right = StatisticsUtil.joinElementsToString(partNames, ","); + inPredicate += String.format(" AND part_id IN (%s)", right); + } params.put("tblName", statsTblName); params.put("condition", inPredicate); try { @@ -190,14 +205,6 @@ public static void dropStatistics(long tblId, Set colNames, String stats } } - private static void buildPredicate(String fieldName, Set fieldValues, StringBuilder predicate) { - StringJoiner predicateBuilder = new StringJoiner(",", "(", ")"); - fieldValues.stream().map(value -> String.format("'%s'", value)) - .forEach(predicateBuilder::add); - String partPredicate = String.format(" AND %s IN %s", fieldName, predicateBuilder); - predicate.append(partPredicate); - } - public static void persistAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception { Map params = new HashMap<>(); params.put("jobId", String.valueOf(analysisTaskInfo.jobId)); @@ -282,4 +289,28 @@ public static List fetchStatsFullName(long limit, long offset) { params.put("offset", String.valueOf(offset)); return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME)); } + + public static Map> fetchColAndPartsForStats(long tblId) { + Map params = Maps.newHashMap(); + params.put("tblId", String.valueOf(tblId)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String partSql = stringSubstitutor.replace(FETCH_STATS_PART_ID); + List resultRows = StatisticsUtil.execStatisticQuery(partSql); + + Map> columnToPartitions = Maps.newHashMap(); + + resultRows.forEach(row -> { + try { + String colId = row.getColumnValue("col_id"); + String partId = row.getColumnValue("part_id"); + columnToPartitions.computeIfAbsent(colId, + k -> new HashSet<>()).add(Long.valueOf(partId)); + } catch (NumberFormatException | DdlException e) { + LOG.warn("Failed to obtain the column and partition for statistics.{}", + e.getMessage()); + } + }); + + return columnToPartitions; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 19bcb6f2c48b69..c86f4bc9daac7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; @@ -61,10 +62,12 @@ import org.apache.thrift.TException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.StringJoiner; import java.util.UUID; import java.util.stream.Collectors; @@ -308,4 +311,19 @@ public static boolean isNullOrEmpty(String str) { .map(s -> "null".equalsIgnoreCase(s) || s.isEmpty()) .orElse(true); } + + public static Map getPartitionIdToName(TableIf table) { + return table.getPartitionNames().stream() + .map(table::getPartition) + .collect(Collectors.toMap( + Partition::getId, + Partition::getName + )); + } + + public static String joinElementsToString(Collection values, String delimiter) { + StringJoiner builder = new StringJoiner(delimiter); + values.forEach(v -> builder.add(String.valueOf(v))); + return builder.toString(); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index 09f64621ce6bb4..cee5d2f8afd04c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -35,6 +35,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Collections; + public class AnalysisJobTest extends TestWithFeService { @Override @@ -108,8 +110,9 @@ public void execUpdate(String sql) throws Exception { }; AnalysisTaskInfo analysisJobInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") - .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( - AnalysisType.COLUMN) + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL) + .setAnalysisType(AnalysisType.COLUMN) + .setPartitionNames(Collections.singleton("t1")) .build(); new OlapAnalysisTask(analysisJobInfo).execute(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 10e16355603de7..60aa25564253f6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.concurrent.BlockingQueue; public class AnalysisTaskExecutorTest extends TestWithFeService { @@ -93,8 +94,9 @@ public void testTaskExecution() throws Exception { AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") - .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( - AnalysisType.COLUMN) + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL) + .setAnalysisType(AnalysisType.COLUMN) + .setPartitionNames(Collections.singleton("t1")) .build(); OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskInfo); new MockUp() { diff --git a/regression-test/data/statistics/incremental_stats_test.out b/regression-test/data/statistics/incremental_stats_test.out new file mode 100644 index 00000000000000..4522d962882af3 --- /dev/null +++ b/regression-test/data/statistics/incremental_stats_test.out @@ -0,0 +1,217 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +2 2 0 20 32 4 +3 3 0 20 35 6 +6 4 0 20 35 12 +1 1 0 35 35 2 +2 2 0 上海 广州 12 +6 4 0 上海 深圳 36 +3 2 0 北京 深圳 18 +1 1 0 深圳 深圳 6 +1 1 0 11 11 8 +3 3 0 2 100 24 +6 6 0 2 200 48 +2 2 0 30 200 16 +3 1 0 2017-10-01 2017-10-01 48 +6 3 0 2017-10-01 2017-10-03 96 +2 1 0 2017-10-02 2017-10-02 32 +1 1 0 2017-10-03 2017-10-03 16 +3 3 0 2017-10-01 06:00:00 2017-10-01 17:05:45 48 +6 6 0 2017-10-01 06:00:00 2017-10-03 10:20:22 96 +2 2 0 2017-10-02 11:20:00 2017-10-02 12:59:12 32 +1 1 0 2017-10-03 10:20:22 2017-10-03 10:20:22 16 +3 3 0 3 22 12 +6 6 0 3 22 24 +2 2 0 5 11 8 +1 1 0 6 6 4 +3 3 0 2 22 12 +6 6 0 2 22 24 +2 2 0 5 11 8 +1 1 0 6 6 4 +1 1 0 0 0 1 +2 2 0 0 1 2 +3 2 0 0 1 3 +6 2 0 0 1 6 +3 3 0 10000 10004 48 +6 5 0 10000 10004 96 +2 2 0 10002 10003 32 +1 1 0 10004 10004 16 + +-- !sql -- + +-- !sql -- +2 2 0 20 32 4 +3 3 0 20 35 6 +6 4 0 20 35 12 +1 1 0 35 35 2 +2 2 0 上海 广州 12 +6 4 0 上海 深圳 36 +3 2 0 北京 深圳 18 +1 1 0 深圳 深圳 6 +1 1 0 11 11 8 +3 3 0 2 100 24 +6 6 0 2 200 48 +2 2 0 30 200 16 +3 1 0 2017-10-01 2017-10-01 48 +6 3 0 2017-10-01 2017-10-03 96 +2 1 0 2017-10-02 2017-10-02 32 +1 1 0 2017-10-03 2017-10-03 16 +3 3 0 2017-10-01 06:00:00 2017-10-01 17:05:45 48 +6 6 0 2017-10-01 06:00:00 2017-10-03 10:20:22 96 +2 2 0 2017-10-02 11:20:00 2017-10-02 12:59:12 32 +1 1 0 2017-10-03 10:20:22 2017-10-03 10:20:22 16 +3 3 0 3 22 12 +6 6 0 3 22 24 +2 2 0 5 11 8 +1 1 0 6 6 4 +3 3 0 2 22 12 +6 6 0 2 22 24 +2 2 0 5 11 8 +1 1 0 6 6 4 +1 1 0 0 0 1 +2 2 0 0 1 2 +3 2 0 0 1 3 +6 2 0 0 1 6 +3 3 0 10000 10004 48 +6 5 0 10000 10004 96 +2 2 0 10002 10003 32 +1 1 0 10004 10004 16 + +-- !sql -- + +-- !sql -- +2 2 0 20 32 4 +3 3 0 20 35 6 +6 3 0 20 35 12 +1 1 0 35 35 2 +2 2 0 上海 广州 12 +6 3 0 上海 深圳 36 +3 2 0 北京 深圳 18 +1 1 0 深圳 深圳 6 +1 1 0 11 11 8 +3 3 0 2 100 24 +6 3 0 2 200 48 +2 2 0 30 200 16 +3 1 0 2017-10-01 2017-10-01 48 +6 2 0 2017-10-01 2017-10-03 96 +2 1 0 2017-10-02 2017-10-02 32 +1 1 0 2017-10-03 2017-10-03 16 +3 3 0 2017-10-01 06:00:00 2017-10-01 17:05:45 48 +6 3 0 2017-10-01 06:00:00 2017-10-03 10:20:22 96 +2 2 0 2017-10-02 11:20:00 2017-10-02 12:59:12 32 +1 1 0 2017-10-03 10:20:22 2017-10-03 10:20:22 16 +3 3 0 3 22 12 +6 3 0 3 22 24 +2 2 0 5 11 8 +1 1 0 6 6 4 +3 3 0 2 22 12 +6 3 0 2 22 24 +2 2 0 5 11 8 +1 1 0 6 6 4 +1 1 0 0 0 1 +2 2 0 0 1 2 +3 2 0 0 1 3 +6 2 0 0 1 6 +3 3 0 10000 10004 48 +6 3 0 10000 10004 96 +2 2 0 10002 10003 32 +1 1 0 10004 10004 16 + +-- !sql -- + +-- !sql -- +2 2 0 20 32 4 +3 3 0 20 35 6 +3 3 0 20 35 6 +9 4 0 20 35 18 +1 1 0 35 35 2 +2 2 0 上海 广州 12 +9 4 0 上海 深圳 54 +3 2 0 北京 深圳 18 +3 2 0 北京 深圳 18 +1 1 0 深圳 深圳 6 +1 1 0 11 11 8 +3 3 0 2 100 24 +3 3 0 2 100 24 +9 6 0 2 200 72 +2 2 0 30 200 16 +3 1 0 2017-10-01 2017-10-01 48 +3 1 0 2017-10-01 2017-10-01 48 +9 3 0 2017-10-01 2017-10-03 144 +2 1 0 2017-10-02 2017-10-02 32 +1 1 0 2017-10-03 2017-10-03 16 +3 3 0 2017-10-01 06:00:00 2017-10-01 17:05:45 48 +3 3 0 2017-10-01 06:00:00 2017-10-01 17:05:45 48 +9 6 0 2017-10-01 06:00:00 2017-10-03 10:20:22 144 +2 2 0 2017-10-02 11:20:00 2017-10-02 12:59:12 32 +1 1 0 2017-10-03 10:20:22 2017-10-03 10:20:22 16 +3 3 0 3 22 12 +3 3 0 3 22 12 +9 6 0 3 22 36 +2 2 0 5 11 8 +1 1 0 6 6 4 +3 3 0 2 22 12 +3 3 0 2 22 12 +9 6 0 2 22 36 +2 2 0 5 11 8 +1 1 0 6 6 4 +1 1 0 0 0 1 +2 2 0 0 1 2 +3 2 0 0 1 3 +3 2 0 0 1 3 +9 2 0 0 1 9 +3 3 0 10000 10004 48 +3 3 0 10000 10004 48 +9 5 0 10000 10004 144 +2 2 0 10002 10003 32 +1 1 0 10004 10004 16 + +-- !sql -- + +-- !sql -- +2 2 0 20 32 4 +3 3 0 20 35 6 +3 3 0 20 35 6 +9 4 0 20 35 18 +1 1 0 35 35 2 +2 2 0 上海 广州 12 +9 4 0 上海 深圳 54 +3 2 0 北京 深圳 18 +3 2 0 北京 深圳 18 +1 1 0 深圳 深圳 6 +1 1 0 11 11 8 +3 3 0 2 100 24 +3 3 0 2 100 24 +9 6 0 2 200 72 +2 2 0 30 200 16 +3 1 0 2017-10-01 2017-10-01 48 +3 1 0 2017-10-01 2017-10-01 48 +9 3 0 2017-10-01 2017-10-03 144 +2 1 0 2017-10-02 2017-10-02 32 +1 1 0 2017-10-03 2017-10-03 16 +3 3 0 2017-10-01 06:00:00 2017-10-01 17:05:45 48 +3 3 0 2017-10-01 06:00:00 2017-10-01 17:05:45 48 +9 6 0 2017-10-01 06:00:00 2017-10-03 10:20:22 144 +2 2 0 2017-10-02 11:20:00 2017-10-02 12:59:12 32 +1 1 0 2017-10-03 10:20:22 2017-10-03 10:20:22 16 +3 3 0 3 22 12 +3 3 0 3 22 12 +9 6 0 3 22 36 +2 2 0 5 11 8 +1 1 0 6 6 4 +3 3 0 2 22 12 +3 3 0 2 22 12 +9 6 0 2 22 36 +2 2 0 5 11 8 +1 1 0 6 6 4 +1 1 0 0 0 1 +2 2 0 0 1 2 +3 2 0 0 1 3 +3 2 0 0 1 3 +9 2 0 0 1 9 +3 3 0 10000 10004 48 +3 3 0 10000 10004 48 +9 5 0 10000 10004 144 +2 2 0 10002 10003 32 +1 1 0 10004 10004 16 + diff --git a/regression-test/suites/statistics/incremental_stats_test.groovy b/regression-test/suites/statistics/incremental_stats_test.groovy new file mode 100644 index 00000000000000..3a61192cbc37bf --- /dev/null +++ b/regression-test/suites/statistics/incremental_stats_test.groovy @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_incremental_stats") { + def dbName = "test_incremental_stats" + def tblName = "${dbName}.example_tbl" + + def colStatisticsTblName = "__internal_schema.column_statistics" + def analysisJobsTblName = "__internal_schema.analysis_jobs" + + def columnNames = """ + ( + `t_1682176142000_user_id`, `t_1682176142000_date`, + `t_1682176142000_city`, `t_1682176142000_age`, `t_1682176142000_sex`, + `t_1682176142000_last_visit_date`, `t_1682176142000_cost`, + `t_1682176142000_max_dwell_time`, `t_1682176142000_min_dwell_time` + ) + """ + + def columnNameValues = """ + ( + 't_1682176142000_user_id', 't_1682176142000_date', 't_1682176142000_city', + 't_1682176142000_age', 't_1682176142000_sex', 't_1682176142000_last_visit_date', + 't_1682176142000_cost', 't_1682176142000_max_dwell_time', 't_1682176142000_min_dwell_time' + ) + """ + + def query_col_statistics_with_order_sql = """ + SELECT + count, + ndv, + null_count, + min, + max, + data_size_in_bytes + FROM + ${colStatisticsTblName} + WHERE + col_id IN ${columnNameValues} + ORDER BY + col_id, + min, + max, + count, + ndv, + null_count, + data_size_in_bytes; + """ + + def query_analysis_jobs_with_order_sql = """ + SELECT + catalog_name, + db_name, + tbl_name, + col_name, + index_id, + job_type, + analysis_type, + message, + state, + schedule_type + FROM + ${analysisJobsTblName} + WHERE + db_name = '${dbName}' + AND tbl_name = '${tblName}' + AND col_name IN ${columnNameValues} + ORDER BY + index_id, col_name; + """ + + sql "DROP DATABASE IF EXISTS ${dbName}" + + sql "CREATE DATABASE IF NOT EXISTS ${dbName};" + + sql "DROP TABLE IF EXISTS ${tblName}" + + sql """ + CREATE TABLE IF NOT EXISTS ${tblName} ( + `t_1682176142000_user_id` LARGEINT NOT NULL, + `t_1682176142000_date` DATE NOT NULL, + `t_1682176142000_city` VARCHAR(20), + `t_1682176142000_age` SMALLINT, + `t_1682176142000_sex` TINYINT, + `t_1682176142000_last_visit_date` DATETIME REPLACE, + `t_1682176142000_cost` BIGINT SUM, + `t_1682176142000_max_dwell_time` INT MAX, + `t_1682176142000_min_dwell_time` INT MIN + ) ENGINE=OLAP + AGGREGATE KEY(`t_1682176142000_user_id`, `t_1682176142000_date`, + `t_1682176142000_city`, `t_1682176142000_age`, `t_1682176142000_sex`) + PARTITION BY LIST(`t_1682176142000_date`) + ( + PARTITION `p_201701` VALUES IN ("2017-10-01"), + PARTITION `p_201702` VALUES IN ("2017-10-02"), + PARTITION `p_201703` VALUES IN ("2017-10-03"), + PARTITION `default` + ) + DISTRIBUTED BY HASH(`t_1682176142000_user_id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO ${tblName} ${columnNames} + VALUES (10000, "2017-10-01", "北京", 20, 0, "2017-10-01 07:00:00", 15, 2, 2), + (10000, "2017-10-01", "北京", 20, 0, "2017-10-01 06:00:00", 20, 10, 10), + (10001, "2017-10-01", "北京", 30, 1, "2017-10-01 17:05:45", 2, 22, 22), + (10002, "2017-10-02", "上海", 20, 1, "2017-10-02 12:59:12", 200, 5, 5), + (10003, "2017-10-02", "广州", 32, 0, "2017-10-02 11:20:00", 30, 11, 11), + (10004, "2017-10-01", "深圳", 35, 0, "2017-10-01 10:00:15", 100, 3, 3), + (10004, "2017-10-03", "深圳", 35, 0, "2017-10-03 10:20:22", 11, 6, 6); + """ + + // Firstly do a full collection of statistics + sql "ANALYZE TABLE ${tblName} ${columnNames} WITH sync;" + + qt_sql query_col_statistics_with_order_sql + + // Incrementally collect statistics + sql "ANALYZE TABLE ${tblName} ${columnNames} WITH sync WITH incremental;" + + // The table data has not changed, and no new tasks should be generated + qt_sql query_analysis_jobs_with_order_sql + + // Statistics won't change either + qt_sql query_col_statistics_with_order_sql + + // Drop a partition, then re-collect statistics + sql "ALTER TABLE ${tblName} DROP PARTITION `p_201701`;" + + // Incrementally collect statistics + sql "ANALYZE TABLE ${tblName} ${columnNames} WITH sync WITH incremental;" + + // Although the partition is deleted, no new partition collection task will be generated, + // but tasks to refresh table-level statistics will be generated + qt_sql query_analysis_jobs_with_order_sql + + // Statistics will change either + qt_sql query_col_statistics_with_order_sql + + // Add a partition, then re-collect statistics + sql "ALTER TABLE ${tblName} ADD PARTITION `p_201701` VALUES IN ('2017-10-01');" + + sql """ + INSERT INTO ${tblName} ${columnNames} + VALUES (10000, "2017-10-01", "北京", 20, 0, "2017-10-01 07:00:00", 15, 2, 2), + (10000, "2017-10-01", "北京", 20, 0, "2017-10-01 06:00:00", 20, 10, 10), + (10001, "2017-10-01", "北京", 30, 1, "2017-10-01 17:05:45", 2, 22, 22), + (10004, "2017-10-01", "深圳", 35, 0, "2017-10-01 10:00:15", 100, 3, 3); + """ + + // Incrementally collect statistics + sql "ANALYZE TABLE ${tblName} ${columnNames} WITH sync WITH incremental;" + + // Adding a new partition will generate new tasks to incrementally + // collect the corresponding partition information, + // and also have tasks update table-level statistical information + qt_sql query_analysis_jobs_with_order_sql + + // Statistics will change either + qt_sql query_col_statistics_with_order_sql + + // Finally, collect statistics in full + sql "ANALYZE TABLE ${tblName} ${columnNames} WITH sync;" + + // Will generate tasks to collect all partition statistics and update table statistics + qt_sql query_analysis_jobs_with_order_sql + + // Compare statistics again + qt_sql query_col_statistics_with_order_sql + + // TODO delete by database name and table name + sql "DELETE FROM __internal_schema.analysis_jobs WHERE job_id IS NOT NULL;" + + // TODO use drop stats + sql """ + DELETE FROM __internal_schema.column_statistics + WHERE col_id IN ${columnNameValues} + """ + sql "DROP DATABASE IF EXISTS ${dbName}" +} diff --git a/regression-test/suites/statistics/show_stats_test.groovy b/regression-test/suites/statistics/show_stats_test.groovy index fb65741c2171a1..819b4a24dc0197 100644 --- a/regression-test/suites/statistics/show_stats_test.groovy +++ b/regression-test/suites/statistics/show_stats_test.groovy @@ -16,7 +16,7 @@ // under the License. suite("test_show_stats") { - def dbName = "stats_test" + def dbName = "test_show_stats" def tblName = "${dbName}.example_tbl" sql "DROP DATABASE IF EXISTS ${dbName}" @@ -73,6 +73,4 @@ suite("test_show_stats") { qt_sql "SHOW COLUMN HISTOGRAM ${tblName}(city);" sql "DROP DATABASE IF EXISTS ${dbName}" - - sql "CREATE DATABASE IF NOT EXISTS ${dbName};" }