Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
Expand Down Expand Up @@ -77,9 +78,14 @@ public enum ScheduleType {
@SerializedName("jobId")
public final long jobId;

// When this AnalysisInfo represent a task, this is the task id for it.
@SerializedName("taskId")
public final long taskId;

// When this AnalysisInfo represent a job, this is the list of task ids belong to this job.
@SerializedName("taskIds")
public final List<Long> taskIds;

@SerializedName("catalogName")
public final String catalogName;

Expand Down Expand Up @@ -153,14 +159,19 @@ public enum ScheduleType {
@SerializedName("samplingPartition")
public boolean samplingPartition;

public AnalysisInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
// For serialize
@SerializedName("cronExpr")
public String cronExprStr;

public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogName, String dbName, String tblName,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
this.catalogName = catalogName;
this.dbName = dbName;
this.tblName = tblName;
Expand Down Expand Up @@ -231,6 +242,10 @@ public boolean isJob() {
return taskId == -1;
}

public void addTaskId(long taskId) {
taskIds.add(taskId);
}

// TODO: use thrift
public static AnalysisInfo fromResultRow(ResultRow resultRow) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;

import java.util.List;
import java.util.Map;
import java.util.Set;

public class AnalysisInfoBuilder {
private long jobId;
private long taskId;
private List<Long> taskIds;
private String catalogName;
private String dbName;
private String tblName;
Expand Down Expand Up @@ -59,6 +61,7 @@ public AnalysisInfoBuilder() {
public AnalysisInfoBuilder(AnalysisInfo info) {
jobId = info.jobId;
taskId = info.taskId;
taskIds = info.taskIds;
catalogName = info.catalogName;
dbName = info.dbName;
tblName = info.tblName;
Expand Down Expand Up @@ -94,6 +97,11 @@ public AnalysisInfoBuilder setTaskId(long taskId) {
return this;
}

public AnalysisInfoBuilder setTaskIds(List<Long> taskIds) {
this.taskIds = taskIds;
return this;
}

public AnalysisInfoBuilder setCatalogName(String catalogName) {
this.catalogName = catalogName;
return this;
Expand Down Expand Up @@ -210,7 +218,7 @@ public AnalysisInfoBuilder setSamplingPartition(boolean samplingPartition) {
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, partitionNames,
return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition);
Expand All @@ -220,6 +228,7 @@ public AnalysisInfoBuilder copy() {
return new AnalysisInfoBuilder()
.setJobId(jobId)
.setTaskId(taskId)
.setTaskIds(taskIds)
.setCatalogName(catalogName)
.setDbName(dbName)
.setTblName(tblName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -439,6 +440,7 @@ private AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExcepti
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, columnNames,
partitionNames, analysisType, analysisMode);
taskInfoBuilder.setColToPartitions(colToPartitions);
taskInfoBuilder.setTaskIds(Lists.newArrayList());

return taskInfoBuilder.build();
}
Expand Down Expand Up @@ -511,6 +513,7 @@ private void createTaskForMVIdx(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask
AnalysisInfoBuilder indexTaskInfoBuilder = new AnalysisInfoBuilder(jobInfo);
AnalysisInfo analysisInfo = indexTaskInfoBuilder.setIndexId(indexId)
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
jobInfo.addTaskId(taskId);
if (isSync) {
return;
}
Expand All @@ -537,6 +540,7 @@ private void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long, BaseAnalys
AnalysisInfo analysisInfo = colTaskInfoBuilder.setColName(colName).setIndexId(indexId)
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
analysisTasks.put(taskId, createTask(analysisInfo));
jobInfo.addTaskId(taskId);
if (isSync) {
continue;
}
Expand Down Expand Up @@ -580,6 +584,7 @@ private void createTaskForExternalTable(AnalysisInfo jobInfo,
AnalysisInfo analysisInfo = colTaskInfoBuilder.setIndexId(-1L).setLastExecTimeInMs(System.currentTimeMillis())
.setTaskId(taskId).setColName("TableRowCount").setExternalTableLevelTask(true).build();
analysisTasks.put(taskId, createTask(analysisInfo));
jobInfo.addTaskId(taskId);
if (isSync) {
// For sync job, don't need to persist, return here and execute it immediately.
return;
Expand Down Expand Up @@ -708,7 +713,10 @@ public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
}

public String getJobProgress(long jobId) {
List<AnalysisInfo> tasks = findTasks(jobId);
List<AnalysisInfo> tasks = findTasksByTaskIds(jobId);
if (tasks == null) {
return "N/A";
}
int finished = 0;
int failed = 0;
int inProgress = 0;
Expand Down Expand Up @@ -921,6 +929,14 @@ public List<AnalysisInfo> findTasks(long jobId) {
}
}

public List<AnalysisInfo> findTasksByTaskIds(long jobId) {
AnalysisInfo jobInfo = analysisJobInfoMap.get(jobId);
if (jobInfo != null && jobInfo.taskIds != null) {
return jobInfo.taskIds.stream().map(id -> analysisTaskInfoMap.get(id)).collect(Collectors.toList());
}
return null;
}

public void removeAll(List<AnalysisInfo> analysisInfos) {
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisTaskInfoMap.remove(analysisInfo.taskId);
Expand Down