Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,23 @@ public class AnalysisManager extends Daemon implements Writable {

private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);

private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
// Tracking running manually submitted async tasks, keep in mem only
private final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();

private StatisticsCache statisticsCache;

private AnalysisTaskExecutor taskExecutor;

// Store task information in metadata.
private final Map<Long, AnalysisInfo> analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>());

// Store job information in metadata
private final Map<Long, AnalysisInfo> analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>());

// Tracking system submitted job, keep in mem only
private final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();

// Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();

private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
Expand All @@ -127,6 +133,10 @@ public class AnalysisManager extends Daemon implements Writable {
}
info.lastExecTimeInMs = time;
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
// Job may get deleted during execution.
if (job == null) {
return null;
}
// Synchronize the job state change in job level.
synchronized (job) {
job.lastExecTimeInMs = time;
Expand Down Expand Up @@ -333,8 +343,6 @@ private AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlException
if (!isSync) {
persistAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
}
if (!isSync) {
try {
updateTableStats(jobInfo);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void doExecute() throws Exception {
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(info.tblName));
params.put("sampleExpr", getSampleExpression());
List<String> partitionAnalysisSQLs = new ArrayList<>();
List<String> sqls = new ArrayList<>();
try {
tbl.readLock();
Set<String> partNames = info.colToPartitions.get(info.colName);
Expand All @@ -80,46 +80,40 @@ public void doExecute() throws Exception {
// Avoid error when get the default partition
params.put("partName", "`" + partName + "`");
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
sqls.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
}
} finally {
tbl.readUnlock();
}
execSQLs(partitionAnalysisSQLs);
params.remove("partId");
params.put("type", col.getType().toString());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
execSQL(sql);
sqls.add(sql);
execSQLs(sqls);
}

@VisibleForTesting
public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
for (String sql : partitionAnalysisSQLs) {
execSQL(sql);
}
}

@VisibleForTesting
public void execSQL(String sql) throws Exception {
if (killed) {
return;
}
public void execSQLs(List<String> sqls) throws Exception {
long startTime = System.currentTimeMillis();
LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
QueryState queryState = r.connectContext.getState();
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
for (String sql : sqls) {
if (killed) {
return;
}
LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
QueryState queryState = r.connectContext.getState();
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
}
}
} finally {
LOG.info("Analyze SQL: " + sql + " cost time: " + (System.currentTimeMillis() - startTime) + "ms");
LOG.debug("Analyze SQL: " + sqls + " cost time: " + (System.currentTimeMillis() - startTime) + "ms");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public List<ResultRow> executeInternalQuery() {

new MockUp<OlapAnalysisTask>() {
@Mock
public void execSQL(String sql) throws Exception {
public void execSQLs(List<String> sqls) throws Exception {
}
};

Expand Down