diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index cb6a3dfe5c2cac..66f0b94aa845cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -41,6 +41,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.Util; @@ -59,6 +60,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -82,6 +84,9 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; @@ -738,12 +743,23 @@ private void syncExecute(Collection tasks) { ConnectContext ctx = ConnectContext.get(); try { ctxToSyncTask.put(ctx, syncTaskCollection); - syncTaskCollection.execute(); + ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze(); + syncTaskCollection.execute(syncExecPool); } finally { ctxToSyncTask.remove(ctx); } } + private ThreadPoolExecutor createThreadPoolForSyncAnalyze() { + String poolName = "SYNC ANALYZE THREAD POOL"; + return new ThreadPoolExecutor(0, 64, + 0, TimeUnit.SECONDS, + new SynchronousQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d") + .build(), new BlockedPolicy(poolName, + (int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours))); + } + public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException { if (dropStatsStmt.dropExpired) { Env.getCurrentEnv().getStatisticsCleaner().clear(); @@ -844,28 +860,38 @@ public void cancel() { tasks.forEach(BaseAnalysisTask::cancel); } - public void execute() { - List colNames = new ArrayList<>(); - List errorMessages = new ArrayList<>(); + public void execute(ThreadPoolExecutor executor) { + List colNames = Collections.synchronizedList(new ArrayList<>()); + List errorMessages = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); for (BaseAnalysisTask task : tasks) { - if (cancelled) { - colNames.add(task.info.colName); - errorMessages.add("Cancelled"); - continue; - } - try { - task.doExecute(); - updateSyncTaskStatus(task, AnalysisState.FINISHED); - } catch (Throwable t) { - colNames.add(task.info.colName); - errorMessages.add(Util.getRootCauseMessage(t)); - updateSyncTaskStatus(task, AnalysisState.FAILED); - LOG.warn("Failed to analyze, info: {}", task, t); - } + executor.submit(() -> { + try { + if (cancelled) { + return; + } + try { + task.doExecute(); + updateSyncTaskStatus(task, AnalysisState.FINISHED); + } catch (Throwable t) { + colNames.add(task.info.colName); + errorMessages.add(Util.getRootCauseMessage(t)); + updateSyncTaskStatus(task, AnalysisState.FAILED); + LOG.warn("Failed to analyze, info: {}", task, t); + } + } finally { + countDownLatch.countDown(); + } + }); + } + try { + countDownLatch.await(); + } catch (InterruptedException t) { + LOG.warn("Thread got interrupted when waiting sync analyze task execution finished", t); } if (!colNames.isEmpty()) { throw new RuntimeException("Failed to analyze following columns:[" + String.join(",", colNames) - + "] Reasons: " + String.join(",", errorMessages)); + + "] Reasons: " + String.join(",", errorMessages)); } }