Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
Expand All @@ -45,13 +46,17 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class AnalysisManager {

Expand Down Expand Up @@ -91,13 +96,22 @@ public StatisticsCache getStatisticsCache() {

// Each analyze stmt corresponding to an analysis job.
public void createAnalysisJob(AnalyzeStmt stmt) throws DdlException {
Map<String, Set<String>> columnToPartitions = validateAndGetPartitions(stmt);
if (columnToPartitions.isEmpty()) {
// No statistics need to be collected or updated
return;
}

long jobId = Env.getCurrentEnv().getNextId();
TableIf table = stmt.getTable();
AnalysisType analysisType = stmt.getAnalysisType();

AnalysisTaskInfoBuilder taskInfoBuilder = buildCommonTaskInfo(stmt, jobId);
Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();

// start build analysis tasks
createTaskForEachColumns(stmt.getColumnNames(), taskInfoBuilder, analysisTaskInfos);
createTaskForMVIdx(stmt.getTable(), taskInfoBuilder, analysisTaskInfos, stmt.getAnalysisType());
createTaskForEachColumns(columnToPartitions, taskInfoBuilder, analysisTaskInfos, analysisType);
createTaskForMVIdx(table, taskInfoBuilder, analysisTaskInfos, analysisType);
persistAnalysisJob(taskInfoBuilder);

if (stmt.isSync()) {
Expand All @@ -109,6 +123,83 @@ public void createAnalysisJob(AnalyzeStmt stmt) throws DdlException {
analysisTaskInfos.values().forEach(taskScheduler::schedule);
}

/**
* Gets the partitions for which statistics are to be collected. First verify that
* there are partitions that have been deleted but have historical statistics(invalid statistics),
* if there are these partitions, we need to delete them to avoid errors in summary table level statistics.
* Then get the partitions for which statistics need to be collected based on collection mode (incremental/full).
* <p>
* note:
* If there is no invalid statistics, it does not need to collect/update
* statistics if the following conditions are met:
* - in full collection mode, the partitioned table does not have partitions
* - in incremental collection mode, partition statistics already exist
* <p>
* TODO Supports incremental collection of statistics from materialized views
*/
private Map<String, Set<String>> validateAndGetPartitions(AnalyzeStmt stmt) throws DdlException {
TableIf table = stmt.getTable();
long tableId = table.getId();
Set<String> columnNames = stmt.getColumnNames();
Set<String> partitionNames = table.getPartitionNames();

Map<String, Set<String>> columnToPartitions = Maps.newHashMap();
stmt.getColumnNames().forEach(column -> columnToPartitions
.computeIfAbsent(column, partitions -> new HashSet<>()).addAll(partitionNames));

if (stmt.getAnalysisType() == AnalysisType.HISTOGRAM) {
// Collecting histograms does not need to support incremental collection,
// and will automatically cover historical statistics
return columnToPartitions;
}

// Get the partition granularity statistics that have been collected
Map<String, Set<Long>> existColAndPartsForStats = StatisticsRepository
.fetchColAndPartsForStats(tableId);

if (existColAndPartsForStats.isEmpty()) {
// There is no historical statistical information, no need to do validation
return columnToPartitions;
}

Set<Long> existPartIdsForStats = new HashSet<>();
existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll);
Map<Long, String> idToPartition = StatisticsUtil.getPartitionIdToName(table);
// Get an invalid set of partitions (those partitions were deleted)
Set<Long> invalidPartIds = existPartIdsForStats.stream()
.filter(id -> !idToPartition.containsKey(id)).collect(Collectors.toSet());

if (!invalidPartIds.isEmpty()) {
// Delete invalid partition statistics to avoid affecting table statistics
StatisticsRepository.dropStatistics(tableId, invalidPartIds,
columnNames, StatisticConstants.STATISTIC_TBL_NAME);
}

if (stmt.isIncremental() && stmt.getAnalysisType() == AnalysisType.COLUMN) {
existColAndPartsForStats.values().forEach(partIds -> partIds.removeAll(invalidPartIds));
// In incremental collection mode, just collect the uncollected partition statistics
existColAndPartsForStats.forEach((columnName, partitionIds) -> {
Set<String> existPartitions = partitionIds.stream()
.map(idToPartition::get)
.collect(Collectors.toSet());
columnToPartitions.computeIfPresent(columnName, (colName, partNames) -> {
partNames.removeAll(existPartitions);
return partNames;
});
});
}

Collection<Set<String>> partitions = columnToPartitions.values();
boolean isEmptyPartitions = partitions.stream().allMatch(Set::isEmpty);

if (invalidPartIds.isEmpty() && isEmptyPartitions) {
// There is no invalid statistics, and there are no partition stats to be collected
return Collections.emptyMap();
}

return columnToPartitions;
}

private AnalysisTaskInfoBuilder buildCommonTaskInfo(AnalyzeStmt stmt, long jobId) {
AnalysisTaskInfoBuilder taskInfoBuilder = new AnalysisTaskInfoBuilder();
String catalogName = stmt.getCatalogName();
Expand Down Expand Up @@ -194,14 +285,27 @@ private void createTaskForMVIdx(TableIf table, AnalysisTaskInfoBuilder taskInfoB
}
}

private void createTaskForEachColumns(Set<String> colNames, AnalysisTaskInfoBuilder taskInfoBuilder,
Map<Long, AnalysisTaskInfo> analysisTaskInfos) throws DdlException {
for (String colName : colNames) {
AnalysisTaskInfoBuilder colTaskInfoBuilder = taskInfoBuilder.copy();
long indexId = -1;
private void createTaskForEachColumns(Map<String, Set<String>> columnToPartitions,
AnalysisTaskInfoBuilder taskInfoBuilder,
Map<Long, AnalysisTaskInfo> analysisTaskInfos, AnalysisType analysisType) throws DdlException {
for (Entry<String, Set<String>> entry : columnToPartitions.entrySet()) {
Set<String> partitionNames = entry.getValue();
if (partitionNames.isEmpty()) {
continue;
}
AnalysisTaskInfoBuilder newTaskInfoBuilder = taskInfoBuilder.copy();
if (analysisType != AnalysisType.HISTOGRAM) {
// Histograms do not need to specify partitions
newTaskInfoBuilder.setPartitionNames(partitionNames);
}
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName)
.setIndexId(indexId).setTaskId(taskId).build();
long indexId = -1;
String colName = entry.getKey();
AnalysisTaskInfo analysisTaskInfo = newTaskInfoBuilder
.setTaskId(taskId)
.setIndexId(indexId)
.setColName(colName)
.build();
try {
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.doris.statistics;

import org.apache.doris.statistics.util.StatisticsUtil;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Set;
import java.util.StringJoiner;

public class AnalysisTaskInfo {
Expand Down Expand Up @@ -60,6 +63,8 @@ public enum ScheduleType {

public final String tblName;

public final Set<String> partitionNames;

public final String colName;

public final Long indexId;
Expand All @@ -86,14 +91,15 @@ public enum ScheduleType {
public final ScheduleType scheduleType;

public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod,
Set<String> partitionNames, String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod,
AnalysisType analysisType, int samplePercent, int sampleRows, int maxBucketNum,
String message, int lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) {
this.jobId = jobId;
this.taskId = taskId;
this.catalogName = catalogName;
this.dbName = dbName;
this.tblName = tblName;
this.partitionNames = partitionNames;
this.colName = colName;
this.indexId = indexId;
this.jobType = jobType;
Expand All @@ -115,6 +121,7 @@ public String toString() {
sj.add("CatalogName: " + catalogName);
sj.add("DBName: " + dbName);
sj.add("TableName: " + tblName);
sj.add("PartitionNames: " + StatisticsUtil.joinElementsToString(partitionNames, ","));
sj.add("ColumnName: " + colName);
sj.add("TaskType: " + analysisType.toString());
sj.add("TaskMethod: " + analysisMethod.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;

import java.util.Set;

public class AnalysisTaskInfoBuilder {
private long jobId;
private long taskId;
private String catalogName;
private String dbName;
private String tblName;
private Set<String> partitionNames;
private String colName;
private Long indexId;
private JobType jobType;
Expand Down Expand Up @@ -66,6 +69,11 @@ public AnalysisTaskInfoBuilder setTblName(String tblName) {
return this;
}

public AnalysisTaskInfoBuilder setPartitionNames(Set<String> partitionNames) {
this.partitionNames = partitionNames;
return this;
}

public AnalysisTaskInfoBuilder setColName(String colName) {
this.colName = colName;
return this;
Expand Down Expand Up @@ -127,7 +135,7 @@ public AnalysisTaskInfoBuilder setScheduleType(ScheduleType scheduleType) {
}

public AnalysisTaskInfo build() {
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName,
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, partitionNames,
colName, indexId, jobType, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, message, lastExecTimeInMs, state, scheduleType);
}
Expand All @@ -139,6 +147,7 @@ public AnalysisTaskInfoBuilder copy() {
.setCatalogName(catalogName)
.setDbName(dbName)
.setTblName(tblName)
.setPartitionNames(partitionNames)
.setColName(colName)
.setIndexId(indexId)
.setJobType(jobType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void execute() throws Exception {
List<String> partitionAnalysisSQLs = new ArrayList<>();
try {
tbl.readLock();
Set<String> partNames = tbl.getPartitionNames();
Set<String> partNames = info.partitionNames;
for (String partName : partNames) {
Partition part = tbl.getPartition(partName);
if (part == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.SystemInfoService;

import com.google.common.collect.Maps;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -100,6 +101,11 @@ public class StatisticsRepository {
+ " ORDER BY update_time "
+ "LIMIT ${limit} OFFSET ${offset}";

private static final String FETCH_STATS_PART_ID = "SELECT DISTINCT col_id, part_id FROM "
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME
+ " WHERE tbl_id = ${tblId}"
+ " AND part_id IS NOT NULL";

public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) {
ResultRow resultRow = queryColumnStatisticById(tableId, colName);
if (resultRow == null) {
Expand Down Expand Up @@ -173,14 +179,23 @@ private static String constructId(Object... params) {
}

public static void dropStatistics(long tblId, Set<String> colNames) throws DdlException {
dropStatistics(tblId, colNames, StatisticConstants.STATISTIC_TBL_NAME);
dropStatistics(tblId, colNames, StatisticConstants.HISTOGRAM_TBL_NAME);
dropStatistics(tblId, Collections.emptySet(), colNames, StatisticConstants.STATISTIC_TBL_NAME);
dropStatistics(tblId, Collections.emptySet(), colNames, StatisticConstants.HISTOGRAM_TBL_NAME);
}

public static void dropStatistics(long tblId, Set<String> colNames, String statsTblName) throws DdlException {
public static void dropStatistics(long tblId, Set<Long> partNames, Set<String> colNames,
String statsTblName) throws DdlException {
Map<String, String> params = new HashMap<>();
String right = colNames.stream().map(s -> "'" + s + "'").collect(Collectors.joining(","));
String inPredicate = String.format("tbl_id = %s AND %s IN (%s)", tblId, "col_id", right);
String inPredicate = String.format("tbl_id = %s", tblId);

if (!colNames.isEmpty()) {
String right = StatisticsUtil.joinElementsToString(partNames, "', '");
inPredicate += String.format(" AND col_id IN ('%s')", right);
}
if (!partNames.isEmpty()) {
String right = StatisticsUtil.joinElementsToString(partNames, ",");
inPredicate += String.format(" AND part_id IN (%s)", right);
}
params.put("tblName", statsTblName);
params.put("condition", inPredicate);
try {
Expand All @@ -190,14 +205,6 @@ public static void dropStatistics(long tblId, Set<String> colNames, String stats
}
}

private static <T> void buildPredicate(String fieldName, Set<T> fieldValues, StringBuilder predicate) {
StringJoiner predicateBuilder = new StringJoiner(",", "(", ")");
fieldValues.stream().map(value -> String.format("'%s'", value))
.forEach(predicateBuilder::add);
String partPredicate = String.format(" AND %s IN %s", fieldName, predicateBuilder);
predicate.append(partPredicate);
}

public static void persistAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception {
Map<String, String> params = new HashMap<>();
params.put("jobId", String.valueOf(analysisTaskInfo.jobId));
Expand Down Expand Up @@ -282,4 +289,28 @@ public static List<ResultRow> fetchStatsFullName(long limit, long offset) {
params.put("offset", String.valueOf(offset));
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME));
}

public static Map<String, Set<Long>> fetchColAndPartsForStats(long tblId) {
Map<String, String> params = Maps.newHashMap();
params.put("tblId", String.valueOf(tblId));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String partSql = stringSubstitutor.replace(FETCH_STATS_PART_ID);
List<ResultRow> resultRows = StatisticsUtil.execStatisticQuery(partSql);

Map<String, Set<Long>> columnToPartitions = Maps.newHashMap();

resultRows.forEach(row -> {
try {
String colId = row.getColumnValue("col_id");
String partId = row.getColumnValue("part_id");
columnToPartitions.computeIfAbsent(colId,
k -> new HashSet<>()).add(Long.valueOf(partId));
} catch (NumberFormatException | DdlException e) {
LOG.warn("Failed to obtain the column and partition for statistics.{}",
e.getMessage());
}
});

return columnToPartitions;
}
}
Loading