Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ public ShowResultSetMetaData getMetaData() {
public long getJobId() {
return jobId;
}

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}
13 changes: 11 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -1070,11 +1071,19 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
break;
}
case OperationType.OP_CREATE_ANALYSIS_JOB: {
env.getAnalysisManager().replayCreateAnalysisJob((AnalysisInfo) journal.getData());
AnalysisInfo info = (AnalysisInfo) journal.getData();
if (AnalysisManager.needAbandon(info)) {
break;
}
env.getAnalysisManager().replayCreateAnalysisJob(info);
break;
}
case OperationType.OP_CREATE_ANALYSIS_TASK: {
env.getAnalysisManager().replayCreateAnalysisTask((AnalysisInfo) journal.getData());
AnalysisInfo info = (AnalysisInfo) journal.getData();
if (AnalysisManager.needAbandon(info)) {
break;
}
env.getAnalysisManager().replayCreateAnalysisTask(info);
break;
}
case OperationType.OP_DELETE_ANALYSIS_JOB: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public class AnalysisManager extends Daemon implements Writable {
// Set the job state to RUNNING when its first task becomes RUNNING.
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
job.state = AnalysisState.RUNNING;
logCreateAnalysisJob(job);
replayCreateAnalysisJob(job);
}
boolean allFinished = true;
boolean hasFailure = false;
Expand Down Expand Up @@ -372,7 +372,7 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio
updateTableStats(jobInfo);
return null;
}
persistAnalysisJob(jobInfo);
recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
// TODO: maybe we should update table stats only when all task succeeded.
updateTableStats(jobInfo);
Expand Down Expand Up @@ -554,13 +554,13 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
}

@VisibleForTesting
public void persistAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
public void recordAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
if (jobInfo.scheduleType == ScheduleType.PERIOD && jobInfo.lastExecTimeInMs > 0) {
return;
}
AnalysisInfoBuilder jobInfoBuilder = new AnalysisInfoBuilder(jobInfo);
AnalysisInfo analysisInfo = jobInfoBuilder.setTaskId(-1).build();
logCreateAnalysisJob(analysisInfo);
replayCreateAnalysisJob(analysisInfo);
}

public void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> analysisTasks,
Expand All @@ -584,7 +584,7 @@ public void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long, BaseAnalysi
}
try {
if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
logCreateAnalysisTask(analysisInfo);
replayCreateAnalysisTask(analysisInfo);
}
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
Expand Down Expand Up @@ -623,7 +623,7 @@ public void createTableLevelTaskForExternalTable(AnalysisInfo jobInfo,
return;
}
try {
logCreateAnalysisTask(analysisInfo);
replayCreateAnalysisTask(analysisInfo);
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
}
Expand Down Expand Up @@ -833,9 +833,8 @@ public void execute(ThreadPoolExecutor executor) {
executor.submit(() -> {
try {
if (cancelled) {
errorMessages.add("Cancelled since query timeout,"
+ "you could set could query_timeout or parallel_sync_analyze_task_num "
+ "to a bigger value and try again");
errorMessages.add("Query timeout or user cancelled."
+ "Could set analyze_timeout to a bigger value.");
return;
}
try {
Expand Down Expand Up @@ -927,10 +926,28 @@ private static void readAnalysisInfo(DataInput in, Map<Long, AnalysisInfo> map,
int size = in.readInt();
for (int i = 0; i < size; i++) {
AnalysisInfo analysisInfo = AnalysisInfo.read(in);
// Unfinished manual once job/tasks doesn't need to keep in memory anymore.
if (needAbandon(analysisInfo)) {
continue;
}
map.put(job ? analysisInfo.jobId : analysisInfo.taskId, analysisInfo);
}
}

// Need to abandon the unfinished manual once jobs/tasks while loading image and replay journal.
// Journal only store finished tasks and jobs.
public static boolean needAbandon(AnalysisInfo analysisInfo) {
if (analysisInfo == null) {
return true;
}
if ((AnalysisState.PENDING.equals(analysisInfo.state) || AnalysisState.RUNNING.equals(analysisInfo.state))
&& ScheduleType.ONCE.equals(analysisInfo.scheduleType)
&& JobType.MANUAL.equals(analysisInfo.jobType)) {
return true;
}
return false;
}

private static void readIdToTblStats(DataInput in, Map<Long, TableStats> map) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ protected void executeWithRetry() {
doExecute();
break;
} catch (Throwable t) {
if (killed) {
throw new RuntimeException(t);
}
LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t);
if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
throw new RuntimeException(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void logCreateAnalysisJob(AnalysisInfo analysisJob) {
analysisManager.buildAndAssignJob(analyzeTblStmt);
new Expectations() {
{
analysisManager.persistAnalysisJob(analysisInfo);
analysisManager.recordAnalysisJob(analysisInfo);
times = 1;
}
};
Expand Down