Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,9 @@ public class Config extends ConfigBase {
@ConfField
public static int full_auto_analyze_simultaneously_running_task_num = 1;

@ConfField
public static final int period_analyze_simultaneously_running_task_num = 1;

@ConfField
public static int cpu_resource_limit_per_analyze_task = 1;

Expand Down
21 changes: 12 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,10 @@
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.StatisticsAutoAnalyzer;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
import org.apache.doris.statistics.StatisticsPeriodCollector;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -474,7 +475,9 @@ public class Env {
*/
private final LoadManagerAdapter loadManagerAdapter;

private StatisticsAutoAnalyzer statisticsAutoAnalyzer;
private StatisticsAutoCollector statisticsAutoCollector;

private StatisticsPeriodCollector statisticsPeriodCollector;

private HiveTransactionMgr hiveTransactionMgr;

Expand Down Expand Up @@ -699,7 +702,8 @@ private Env(boolean isCheckpointCatalog) {
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoAnalyzer = new StatisticsAutoAnalyzer();
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.statisticsPeriodCollector = new StatisticsPeriodCollector();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.queryStats = new QueryStats();
Expand Down Expand Up @@ -945,8 +949,11 @@ public void initialize(String[] args) throws Exception {
if (statisticsCleaner != null) {
statisticsCleaner.start();
}
if (statisticsAutoAnalyzer != null) {
statisticsAutoAnalyzer.start();
if (statisticsAutoCollector != null) {
statisticsAutoCollector.start();
}
if (statisticsPeriodCollector != null) {
statisticsPeriodCollector.start();
}
}

Expand Down Expand Up @@ -5567,10 +5574,6 @@ public LoadManagerAdapter getLoadManagerAdapter() {
return loadManagerAdapter;
}

public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() {
return statisticsAutoAnalyzer;
}

public QueryStats getQueryStats() {
return queryStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,8 @@ public boolean needReAnalyzeTable(TableStats tblStats) {
}

@Override
public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
public Set<String> findReAnalyzeNeededPartitions() {
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId());
if (tableStats == null) {
return getPartitionNames().stream().map(this::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public boolean needReAnalyzeTable(TableStats tblStats) {
}

@Override
public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
public Set<String> findReAnalyzeNeededPartitions() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ default int getBaseColumnIdxByName(String colName) {

boolean needReAnalyzeTable(TableStats tblStats);

Set<String> findReAnalyzeNeededPartitions(TableStats tableStats);
Set<String> findReAnalyzeNeededPartitions();

void write(DataOutput out) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public boolean needReAnalyzeTable(TableStats tblStats) {
}

@Override
public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
public Set<String> findReAnalyzeNeededPartitions() {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to be analyzed.
partitions.add("Dummy Partition");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public enum ScheduleType {
@SerializedName("tblName")
public final String tblName;

// TODO: Map here is wired, List is enough
@SerializedName("colToPartitions")
public final Map<String, Set<String>> colToPartitions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
Expand Down Expand Up @@ -363,6 +364,9 @@ private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
ShowResultSetMetaData commonResultSetMetaData = new ShowResultSetMetaData(columns);
List<List<String>> resultRows = new ArrayList<>();
for (AnalysisInfo analysisInfo : analysisInfos) {
if (analysisInfo == null) {
continue;
}
List<String> row = new ArrayList<>();
row.add(analysisInfo.catalogName);
row.add(analysisInfo.dbName);
Expand Down Expand Up @@ -442,23 +446,9 @@ private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<Str
StatisticsRepository.dropStatistics(invalidPartIds);
}

if (analysisMode == AnalysisMode.INCREMENTAL && analysisType == AnalysisType.FUNDAMENTALS) {
existColAndPartsForStats.values().forEach(partIds -> partIds.removeAll(invalidPartIds));
// In incremental collection mode, just collect the uncollected partition statistics
existColAndPartsForStats.forEach((columnName, partitionIds) -> {
Set<String> existPartitions = partitionIds.stream()
.map(idToPartition::get)
.collect(Collectors.toSet());
columnToPartitions.computeIfPresent(columnName, (colName, partNames) -> {
partNames.removeAll(existPartitions);
return partNames;
});
});
if (invalidPartIds.isEmpty()) {
// There is no invalid statistics, so there is no need to update table statistics,
// remove columns that do not require re-collection of statistics
columnToPartitions.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
if (analysisType == AnalysisType.FUNDAMENTALS) {
Set<String> reAnalyzeNeededPartitions = findReAnalyzeNeededPartitions(table);
columnToPartitions.replaceAll((k, v) -> reAnalyzeNeededPartitions);
}

return columnToPartitions;
Expand Down Expand Up @@ -692,6 +682,12 @@ public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
}
Set<String> cols = dropStatsStmt.getColumnNames();
long tblId = dropStatsStmt.getTblId();
TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
}
tableStats.updatedTime = 0;
replayUpdateTableStatsStatus(tableStats);
StatisticsRepository.dropStatistics(tblId, cols);
for (String col : cols) {
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
Expand Down Expand Up @@ -951,4 +947,20 @@ public void registerSysJob(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> tas
systemJobInfoMap.put(jobInfo.jobId, jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}

@VisibleForTesting
protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
TableStats tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
}
return table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
Expand All @@ -42,16 +43,23 @@ public class AnalysisTaskExecutor extends Thread {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));

public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(),
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
"Analysis Job Executor", true);
if (!Env.isCheckpointThread()) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(),
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
"Analysis Job Executor", true);
} else {
executors = null;
}
}

@Override
public void run() {
if (Env.isCheckpointThread()) {
return;
}
cancelExpiredTask();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public OlapAnalysisTask(AnalysisInfo info) {
}

public void doExecute() throws Exception {
Set<String> partitionNames = info.colToPartitions.get(info.colName);
if (partitionNames.isEmpty()) {
return;
}
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
Expand All @@ -90,7 +94,7 @@ public void doExecute() throws Exception {
List<String> partitionAnalysisSQLs = new ArrayList<>();
try {
tbl.readLock();
Set<String> partitionNames = info.colToPartitions.get(info.colName);

for (String partitionName : partitionNames) {
Partition part = tbl.getPartition(partitionName);
if (part == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisInfo.JobType;
Expand All @@ -41,43 +39,27 @@
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class StatisticsAutoAnalyzer extends MasterDaemon {
public class StatisticsAutoCollector extends StatisticsCollector {

private static final Logger LOG = LogManager.getLogger(StatisticsAutoAnalyzer.class);
private static final Logger LOG = LogManager.getLogger(StatisticsAutoCollector.class);

private final AnalysisTaskExecutor analysisTaskExecutor;

public StatisticsAutoAnalyzer() {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2);
analysisTaskExecutor = new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num);
analysisTaskExecutor.start();
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
}

@Override
protected void runAfterCatalogReady() {
if (!Env.getCurrentEnv().isMaster()) {
return;
}
if (!StatisticsUtil.statsTblAvailable()) {
return;
}
analyzePeriodically();
protected void collect() {
if (!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
return;
}

if (!analysisTaskExecutor.idle()) {
return;
}

if (Config.enable_full_auto_analyze) {
analyzeAll();
}
Expand Down Expand Up @@ -141,29 +123,18 @@ public List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db
return analysisInfos;
}

private void analyzePeriodically() {
try {
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
for (AnalysisInfo jobInfo : jobInfos) {
createSystemAnalysisJob(jobInfo);
}
} catch (Exception e) {
LOG.warn("Failed to periodically analyze the statistics." + e);
}
}

@VisibleForTesting
protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
TableIf table = StatisticsUtil
.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
TableStats tblStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
TableStats tblStats = analysisManager.findTableStatsStatus(table.getId());

if (!(tblStats == null || table.needReAnalyzeTable(tblStats))) {
return null;
}

Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions(tblStats);
Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions();

if (needRunPartitions.isEmpty()) {
return null;
Expand All @@ -173,7 +144,7 @@ protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
}

@VisibleForTesting
public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
Set<String> needRunPartitions) {
Map<String, Set<String>> newColToPartitions = Maps.newHashMap();
Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions;
Expand Down Expand Up @@ -209,24 +180,4 @@ private boolean checkAnalyzeTime(LocalTime now) {
return true;
}
}


// Analysis job created by the system
@VisibleForTesting
protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
throws DdlException {
if (jobInfo.colToPartitions.isEmpty()) {
// No statistics need to be collected or updated
return;
}

Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
if (StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) {
analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false);
}
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos);
analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
}
}
Loading