diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index cb4d9eb034c1f8..b5c6ebf6028d82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -96,17 +96,23 @@ public class AnalysisManager extends Daemon implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); - private ConcurrentMap> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); + // Tracking running manually submitted async tasks, keep in mem only + private final ConcurrentMap> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); private StatisticsCache statisticsCache; private AnalysisTaskExecutor taskExecutor; + // Store task information in metadata. private final Map analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>()); + + // Store job information in metadata private final Map analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>()); + // Tracking system submitted job, keep in mem only private final Map systemJobInfoMap = new ConcurrentHashMap<>(); + // Tracking and control sync analyze tasks, keep in mem only private final ConcurrentMap ctxToSyncTask = new ConcurrentHashMap<>(); private final Function userJobStatusUpdater = w -> { @@ -127,6 +133,10 @@ public class AnalysisManager extends Daemon implements Writable { } info.lastExecTimeInMs = time; AnalysisInfo job = analysisJobInfoMap.get(info.jobId); + // Job may get deleted during execution. + if (job == null) { + return null; + } // Synchronize the job state change in job level. synchronized (job) { job.lastExecTimeInMs = time; @@ -333,8 +343,6 @@ private AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlException if (!isSync) { persistAnalysisJob(jobInfo); analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); - } - if (!isSync) { try { updateTableStats(jobInfo); } catch (Throwable e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 257708de54f78b..71b11915650a88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -67,7 +67,7 @@ public void doExecute() throws Exception { params.put("colName", String.valueOf(info.colName)); params.put("tblName", String.valueOf(info.tblName)); params.put("sampleExpr", getSampleExpression()); - List partitionAnalysisSQLs = new ArrayList<>(); + List sqls = new ArrayList<>(); try { tbl.readLock(); Set partNames = info.colToPartitions.get(info.colName); @@ -80,46 +80,40 @@ public void doExecute() throws Exception { // Avoid error when get the default partition params.put("partName", "`" + partName + "`"); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); + sqls.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); } } finally { tbl.readUnlock(); } - execSQLs(partitionAnalysisSQLs); params.remove("partId"); params.put("type", col.getType().toString()); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); - execSQL(sql); + sqls.add(sql); + execSQLs(sqls); } @VisibleForTesting - public void execSQLs(List partitionAnalysisSQLs) throws Exception { - for (String sql : partitionAnalysisSQLs) { - execSQL(sql); - } - } - - @VisibleForTesting - public void execSQL(String sql) throws Exception { - if (killed) { - return; - } + public void execSQLs(List sqls) throws Exception { long startTime = System.currentTimeMillis(); - LOG.info("ANALYZE SQL : " + sql + " start at " + startTime); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - stmtExecutor = new StmtExecutor(r.connectContext, sql); - r.connectContext.setExecutor(stmtExecutor); - stmtExecutor.execute(); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage())); + for (String sql : sqls) { + if (killed) { + return; + } + LOG.info("ANALYZE SQL : " + sql + " start at " + startTime); + stmtExecutor = new StmtExecutor(r.connectContext, sql); + r.connectContext.setExecutor(stmtExecutor); + stmtExecutor.execute(); + QueryState queryState = r.connectContext.getState(); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", + info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage())); + } } } finally { - LOG.info("Analyze SQL: " + sql + " cost time: " + (System.currentTimeMillis() - startTime) + "ms"); + LOG.debug("Analyze SQL: " + sqls + " cost time: " + (System.currentTimeMillis() - startTime) + "ms"); } } - } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 7bbaf9b9020095..574c96c73d3b4a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -89,7 +89,7 @@ public List executeInternalQuery() { new MockUp() { @Mock - public void execSQL(String sql) throws Exception { + public void execSQLs(List sqls) throws Exception { } };