Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2948,7 +2948,7 @@ public class Config extends ConfigBase {
"Columns that have not been collected within the specified interval will trigger automatic analyze. "
+ "0 means not trigger."
})
public static long auto_analyze_interval_seconds = 0;
public static long auto_analyze_interval_seconds = 86400; // 24 hours.

//==========================================================================
// begin of cloud config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* [TABLE]
* [
* WHERE
* [PRIORITY = ["HIGH"|"MID"|"LOW"]]
* [PRIORITY = ["HIGH"|"MID"|"LOW"|"VERY_LOW"]]
* ]
*/
public class ShowAutoAnalyzeJobsStmt extends ShowStmt implements NotFallbackInParser {
Expand Down Expand Up @@ -175,7 +175,7 @@ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {

if (!valid) {
throw new AnalysisException("Where clause should looks like: "
+ "PRIORITY = \"HIGH|MID|LOW\"");
+ "PRIORITY = \"HIGH|MID|LOW|VERY_LOW\"");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class ShowColumnStatsStmt extends ShowStmt implements NotFallbackInParser
.add("updated_time")
.add("update_rows")
.add("last_analyze_row_count")
.add("last_analyze_version")
.build();

private static final ImmutableList<String> PARTITION_COLUMN_TITLE_NAMES =
Expand Down Expand Up @@ -185,6 +186,7 @@ public ShowResultSet constructResultSet(List<Pair<Pair<String, String>, ColumnSt
row.add(String.valueOf(p.second.updatedTime));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.updatedRows));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.rowCount));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.tableVersion));
result.add(row);
});
return new ShowResultSet(getMetaData(), result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowTableStatsStmt extends ShowStmt implements NotFallbackInParser
.add("new_partition")
.add("user_inject")
.add("enable_auto_analyze")
.add("last_analyze_time")
.build();

private static final ImmutableList<String> PARTITION_TITLE_NAMES =
Expand Down Expand Up @@ -230,6 +231,7 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
row.add("");
row.add("");
row.add(String.valueOf(table.autoAnalyzeEnabled()));
row.add("");
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand All @@ -242,13 +244,16 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
LocalDateTime dateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime),
java.time.ZoneId.systemDefault());
String formattedDateTime = dateTime.format(formatter);
row.add(formattedDateTime);
LocalDateTime lastAnalyzeTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime),
java.time.ZoneId.systemDefault());
row.add(dateTime.format(formatter));
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());
row.add(String.valueOf(tableStatistic.partitionChanged.get()));
row.add(String.valueOf(tableStatistic.userInjected));
row.add(table == null ? "N/A" : String.valueOf(table.autoAnalyzeEnabled()));
row.add(lastAnalyzeTime.format(formatter));
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand Down
11 changes: 0 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -969,17 +969,6 @@ public Set<Column> getSchemaAllIndexes(boolean full) {
return columns;
}

public List<Column> getMvColumns(boolean full) {
List<Column> columns = Lists.newArrayList();
for (Long indexId : indexIdToMeta.keySet()) {
if (indexId == baseIndexId) {
continue;
}
columns.addAll(getSchemaByIndexId(indexId, full));
}
return columns;
}

public List<Column> getBaseSchemaKeyColumns() {
return getKeyColumnsByIndexId(baseIndexId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public enum ScheduleType {
@SerializedName("updateRows")
public final long updateRows;

@SerializedName("tv")
public final long tableVersion;

public final Map<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>();

@SerializedName("tblUpdateTime")
Expand All @@ -206,8 +209,8 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForExternalTable, long tblUpdateTime, long rowCount, boolean userInject,
long updateRows, JobPriority priority, Map<Long, Long> partitionUpdateRows, boolean enablePartition) {
boolean usingSqlForExternalTable, long tblUpdateTime, long rowCount, boolean userInject, long updateRows,
long tableVersion, JobPriority priority, Map<Long, Long> partitionUpdateRows, boolean enablePartition) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -244,6 +247,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
this.rowCount = rowCount;
this.userInject = userInject;
this.updateRows = updateRows;
this.tableVersion = tableVersion;
this.priority = priority;
if (partitionUpdateRows != null) {
this.partitionUpdateRows.putAll(partitionUpdateRows);
Expand Down Expand Up @@ -292,6 +296,7 @@ public String toString() {
sj.add("rowCount: " + rowCount);
sj.add("userInject: " + userInject);
sj.add("updateRows: " + updateRows);
sj.add("tableVersion: " + tableVersion);
sj.add("priority: " + priority.name());
sj.add("enablePartition: " + enablePartition);
return sj.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class AnalysisInfoBuilder {
private long rowCount;
private boolean userInject = false;
private long updateRows;
private long tableVersion;
private JobPriority priority;
private Map<Long, Long> partitionUpdateRows;
private boolean enablePartition;
Expand Down Expand Up @@ -104,6 +105,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
rowCount = info.rowCount;
userInject = info.userInject;
updateRows = info.updateRows;
tableVersion = info.tableVersion;
priority = info.priority;
partitionUpdateRows = info.partitionUpdateRows;
enablePartition = info.enablePartition;
Expand Down Expand Up @@ -274,6 +276,11 @@ public AnalysisInfoBuilder setUpdateRows(long updateRows) {
return this;
}

public AnalysisInfoBuilder setTableVersion(long tableVersion) {
this.tableVersion = tableVersion;
return this;
}

public AnalysisInfoBuilder setPriority(JobPriority priority) {
this.priority = priority;
return this;
Expand All @@ -295,7 +302,7 @@ public AnalysisInfo build() {
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForExternalTable, tblUpdateTime, rowCount, userInject, updateRows,
priority, partitionUpdateRows, enablePartition);
tableVersion, priority, partitionUpdateRows, enablePartition);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class AnalysisManager implements Writable {
public final Map<TableName, Set<Pair<String, String>>> highPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> midPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> lowPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> veryLowPriorityJobs = new LinkedHashMap<>();

// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -381,14 +382,15 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) {
}
infoBuilder.setColName(stringJoiner.toString());
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(System.currentTimeMillis());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
// Empty table row count is 0. Call fetchRowCount() when getRowCount() returns <= 0,
// because getRowCount may return <= 0 if cached is not loaded. This is mainly for external table.
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 :
(table.getRowCount() <= 0 ? table.fetchRowCount() : table.getRowCount());
infoBuilder.setRowCount(rowCount);
TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get());
infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0);
infoBuilder.setPriority(JobPriority.MANUAL);
infoBuilder.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows);
infoBuilder.setEnablePartition(StatisticsUtil.enablePartitionAnalyze());
Expand Down Expand Up @@ -547,12 +549,15 @@ public List<AutoAnalysisPendingJob> showAutoPendingJobs(ShowAutoAnalyzeJobsStmt
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
result.addAll(getPendingJobs(veryLowPriorityJobs, JobPriority.VERY_LOW, tblName));
} else if (priority.equals(JobPriority.HIGH.name())) {
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
} else if (priority.equals(JobPriority.MID.name())) {
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
} else if (priority.equals(JobPriority.LOW.name())) {
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
} else if (priority.equals(JobPriority.VERY_LOW.name())) {
result.addAll(getPendingJobs(veryLowPriorityJobs, JobPriority.VERY_LOW, tblName));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,23 @@ public class ColStatsMeta {
@SerializedName("rowCount")
public long rowCount;

@SerializedName("tv")
public long tableVersion;

@SerializedName("pur")
public ConcurrentMap<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>();

public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, AnalysisType analysisType, JobType jobType,
long queriedTimes, long rowCount, long updatedRows, Map<Long, Long> partitionUpdateRows) {
public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, AnalysisType analysisType,
JobType jobType, long queriedTimes, long rowCount, long updatedRows,
long tableVersion, Map<Long, Long> partitionUpdateRows) {
this.updatedTime = updatedTime;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.jobType = jobType;
this.queriedTimes.addAndGet(queriedTimes);
this.updatedRows = updatedRows;
this.rowCount = rowCount;
this.tableVersion = tableVersion;
if (partitionUpdateRows != null) {
this.partitionUpdateRows.putAll(partitionUpdateRows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum JobPriority {
HIGH,
MID,
LOW,
VERY_LOW,
MANUAL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
Expand All @@ -37,7 +36,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -92,10 +90,11 @@ protected void runAfterCatalogReady() {
}

protected void collect() {
while (canCollect()) {
while (StatisticsUtil.canCollect()) {
Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> job = getJob();
if (job == null) {
// No more job to process, break and sleep.
LOG.info("No auto analyze jobs to process.");
break;
}
try {
Expand All @@ -112,11 +111,6 @@ protected void collect() {
}
}

protected boolean canCollect() {
return StatisticsUtil.enableAutoAnalyze()
&& StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
}

protected Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> getJob() {
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
Optional<Entry<TableName, Set<Pair<String, String>>>> job = fetchJobFromMap(manager.highPriorityJobs);
Expand All @@ -128,7 +122,11 @@ protected Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> getJob(
return Pair.of(job.get(), JobPriority.MID);
}
job = fetchJobFromMap(manager.lowPriorityJobs);
return job.map(entry -> Pair.of(entry, JobPriority.LOW)).orElse(null);
if (job.isPresent()) {
return Pair.of(job.get(), JobPriority.LOW);
}
job = fetchJobFromMap(manager.veryLowPriorityJobs);
return job.map(tableNameSetEntry -> Pair.of(tableNameSetEntry, JobPriority.VERY_LOW)).orElse(null);
}

protected Optional<Map.Entry<TableName, Set<Pair<String, String>>>> fetchJobFromMap(
Expand All @@ -142,9 +140,13 @@ protected Optional<Map.Entry<TableName, Set<Pair<String, String>>>> fetchJobFrom

protected void processOneJob(TableIf table, Set<Pair<String, String>> columns,
JobPriority priority) throws DdlException {
// appendMvColumn(table, columns);
appendAllColumns(table, columns);
columns = columns.stream().filter(c -> StatisticsUtil.needAnalyzeColumn(table, c)).collect(Collectors.toSet());
columns = columns.stream().filter(
c -> StatisticsUtil.needAnalyzeColumn(table, c) || StatisticsUtil.isLongTimeColumn(table, c))
.collect(Collectors.toSet());
if (columns.isEmpty()) {
return;
}
AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, priority);
if (analyzeJob == null) {
return;
Expand Down Expand Up @@ -178,15 +180,6 @@ protected void appendAllColumns(TableIf table, Set<Pair<String, String>> columns
}
}

protected void appendMvColumn(TableIf table, Set<String> columns) {
if (!(table instanceof OlapTable)) {
return;
}
OlapTable olapTable = (OlapTable) table;
Set<String> mvColumns = olapTable.getMvColumns(false).stream().map(Column::getName).collect(Collectors.toSet());
columns.addAll(mvColumns);
}

protected boolean supportAutoAnalyze(TableIf tableIf) {
if (tableIf == null) {
return false;
Expand Down Expand Up @@ -248,9 +241,10 @@ protected AnalysisInfo createAnalyzeJobForTbl(
.setTaskIds(new ArrayList<>())
.setLastExecTimeInMs(System.currentTimeMillis())
.setJobType(JobType.SYSTEM)
.setTblUpdateTime(System.currentTimeMillis())
.setTblUpdateTime(table.getUpdateTime())
.setRowCount(rowCount)
.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get())
.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0)
.setPriority(priority)
.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows)
.setEnablePartition(StatisticsUtil.enablePartitionAnalyze())
Expand All @@ -275,4 +269,8 @@ protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
future.get();
}
}

public boolean isReady() {
return waited;
}
}
Loading
Loading