From fa5ec0884b8b186cd96e2dfc2437520d7d1a9a12 Mon Sep 17 00:00:00 2001 From: Jibing Li Date: Tue, 19 Sep 2023 11:30:48 +0800 Subject: [PATCH] Doesn't log create analyze job/task anymore, only log when they finish. Abandon unfinished jobs/tasks while loading image. Return without retry when analyze table cancelled by user. --- .../doris/analysis/ShowAnalyzeTaskStatus.java | 5 +++ .../org/apache/doris/persist/EditLog.java | 13 +++++-- .../doris/statistics/AnalysisManager.java | 35 ++++++++++++++----- .../doris/statistics/BaseAnalysisTask.java | 3 ++ .../doris/statistics/AnalysisManagerTest.java | 2 +- 5 files changed, 46 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java index 927a56d19d2c7b..7c6c5cf17feb2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java @@ -59,4 +59,9 @@ public ShowResultSetMetaData getMetaData() { public long getJobId() { return jobId; } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.FORWARD_NO_SYNC; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 25d6bd089dd937..6543173f8a9a98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -86,6 +86,7 @@ import org.apache.doris.scheduler.job.Job; import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.TableStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -1070,11 +1071,19 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { break; } case OperationType.OP_CREATE_ANALYSIS_JOB: { - env.getAnalysisManager().replayCreateAnalysisJob((AnalysisInfo) journal.getData()); + AnalysisInfo info = (AnalysisInfo) journal.getData(); + if (AnalysisManager.needAbandon(info)) { + break; + } + env.getAnalysisManager().replayCreateAnalysisJob(info); break; } case OperationType.OP_CREATE_ANALYSIS_TASK: { - env.getAnalysisManager().replayCreateAnalysisTask((AnalysisInfo) journal.getData()); + AnalysisInfo info = (AnalysisInfo) journal.getData(); + if (AnalysisManager.needAbandon(info)) { + break; + } + env.getAnalysisManager().replayCreateAnalysisTask(info); break; } case OperationType.OP_DELETE_ANALYSIS_JOB: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 83c3cc84e494bd..ce0c54667508fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -151,7 +151,7 @@ public class AnalysisManager extends Daemon implements Writable { // Set the job state to RUNNING when its first task becomes RUNNING. if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) { job.state = AnalysisState.RUNNING; - logCreateAnalysisJob(job); + replayCreateAnalysisJob(job); } boolean allFinished = true; boolean hasFailure = false; @@ -372,7 +372,7 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio updateTableStats(jobInfo); return null; } - persistAnalysisJob(jobInfo); + recordAnalysisJob(jobInfo); analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); // TODO: maybe we should update table stats only when all task succeeded. updateTableStats(jobInfo); @@ -554,13 +554,13 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio } @VisibleForTesting - public void persistAnalysisJob(AnalysisInfo jobInfo) throws DdlException { + public void recordAnalysisJob(AnalysisInfo jobInfo) throws DdlException { if (jobInfo.scheduleType == ScheduleType.PERIOD && jobInfo.lastExecTimeInMs > 0) { return; } AnalysisInfoBuilder jobInfoBuilder = new AnalysisInfoBuilder(jobInfo); AnalysisInfo analysisInfo = jobInfoBuilder.setTaskId(-1).build(); - logCreateAnalysisJob(analysisInfo); + replayCreateAnalysisJob(analysisInfo); } public void createTaskForEachColumns(AnalysisInfo jobInfo, Map analysisTasks, @@ -584,7 +584,7 @@ public void createTaskForEachColumns(AnalysisInfo jobInfo, Map { try { if (cancelled) { - errorMessages.add("Cancelled since query timeout," - + "you could set could query_timeout or parallel_sync_analyze_task_num " - + "to a bigger value and try again"); + errorMessages.add("Query timeout or user cancelled." + + "Could set analyze_timeout to a bigger value."); return; } try { @@ -927,10 +926,28 @@ private static void readAnalysisInfo(DataInput in, Map map, int size = in.readInt(); for (int i = 0; i < size; i++) { AnalysisInfo analysisInfo = AnalysisInfo.read(in); + // Unfinished manual once job/tasks doesn't need to keep in memory anymore. + if (needAbandon(analysisInfo)) { + continue; + } map.put(job ? analysisInfo.jobId : analysisInfo.taskId, analysisInfo); } } + // Need to abandon the unfinished manual once jobs/tasks while loading image and replay journal. + // Journal only store finished tasks and jobs. + public static boolean needAbandon(AnalysisInfo analysisInfo) { + if (analysisInfo == null) { + return true; + } + if ((AnalysisState.PENDING.equals(analysisInfo.state) || AnalysisState.RUNNING.equals(analysisInfo.state)) + && ScheduleType.ONCE.equals(analysisInfo.scheduleType) + && JobType.MANUAL.equals(analysisInfo.jobType)) { + return true; + } + return false; + } + private static void readIdToTblStats(DataInput in, Map map) throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index edadf4c17bf4a5..a563e6a6fef738 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -170,6 +170,9 @@ protected void executeWithRetry() { doExecute(); break; } catch (Throwable t) { + if (killed) { + throw new RuntimeException(t); + } LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t); if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) { throw new RuntimeException(t); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 2146722db92abd..67d1a5dc446295 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -261,7 +261,7 @@ public void logCreateAnalysisJob(AnalysisInfo analysisJob) { analysisManager.buildAndAssignJob(analyzeTblStmt); new Expectations() { { - analysisManager.persistAnalysisJob(analysisInfo); + analysisManager.recordAnalysisJob(analysisInfo); times = 1; } };