Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.Util;
Expand All @@ -59,6 +60,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
Expand All @@ -82,6 +84,9 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -738,12 +743,23 @@ private void syncExecute(Collection<BaseAnalysisTask> tasks) {
ConnectContext ctx = ConnectContext.get();
try {
ctxToSyncTask.put(ctx, syncTaskCollection);
syncTaskCollection.execute();
ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
syncTaskCollection.execute(syncExecPool);
} finally {
ctxToSyncTask.remove(ctx);
}
}

private ThreadPoolExecutor createThreadPoolForSyncAnalyze() {
String poolName = "SYNC ANALYZE THREAD POOL";
return new ThreadPoolExecutor(0, 64,
0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d")
.build(), new BlockedPolicy(poolName,
(int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
}

public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
if (dropStatsStmt.dropExpired) {
Env.getCurrentEnv().getStatisticsCleaner().clear();
Expand Down Expand Up @@ -844,28 +860,38 @@ public void cancel() {
tasks.forEach(BaseAnalysisTask::cancel);
}

public void execute() {
List<String> colNames = new ArrayList<>();
List<String> errorMessages = new ArrayList<>();
public void execute(ThreadPoolExecutor executor) {
List<String> colNames = Collections.synchronizedList(new ArrayList<>());
List<String> errorMessages = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
for (BaseAnalysisTask task : tasks) {
if (cancelled) {
colNames.add(task.info.colName);
errorMessages.add("Cancelled");
continue;
}
try {
task.doExecute();
updateSyncTaskStatus(task, AnalysisState.FINISHED);
} catch (Throwable t) {
colNames.add(task.info.colName);
errorMessages.add(Util.getRootCauseMessage(t));
updateSyncTaskStatus(task, AnalysisState.FAILED);
LOG.warn("Failed to analyze, info: {}", task, t);
}
executor.submit(() -> {
try {
if (cancelled) {
return;
}
try {
task.doExecute();
updateSyncTaskStatus(task, AnalysisState.FINISHED);
} catch (Throwable t) {
colNames.add(task.info.colName);
errorMessages.add(Util.getRootCauseMessage(t));
updateSyncTaskStatus(task, AnalysisState.FAILED);
LOG.warn("Failed to analyze, info: {}", task, t);
}
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException t) {
LOG.warn("Thread got interrupted when waiting sync analyze task execution finished", t);
}
if (!colNames.isEmpty()) {
throw new RuntimeException("Failed to analyze following columns:[" + String.join(",", colNames)
+ "] Reasons: " + String.join(",", errorMessages));
+ "] Reasons: " + String.join(",", errorMessages));
}
}

Expand Down