diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 9ca4c77303820b..a253dbe919c8ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.RejectedExecutionException; /** * There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn. @@ -155,6 +156,13 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) { .build(), e); cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()), true, true); return; + } catch (RejectedExecutionException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("error_msg", "the task queque is full.") + .build(), e); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()), true, true); + return; } loadStartTimestamp = System.currentTimeMillis(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 04f3d938fc0d30..1d649c65627bc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; /** * parent class of BrokerLoadJob and SparkLoadJob from load stmt @@ -233,8 +234,13 @@ public void onTaskFailed(long taskId, FailMsg failMsg) { loadTask.updateRetryInfo(); idToTasks.put(loadTask.getSignature(), loadTask); // load id will be added to loadStatistic when executing this task - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - return; + try { + Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); + } catch (RejectedExecutionException e) { + unprotectedExecuteCancel(failMsg, true); + logFinalOperation(); + return; + } } } finally { writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java index 9a2b691983963b..33d46773d35071 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; /** * LoadScheduler will schedule the pending LoadJob which belongs to LoadManager. @@ -99,6 +100,12 @@ private void process() throws InterruptedException { .build(), e); needScheduleJobs.put(loadJob); return; + } catch (RejectedExecutionException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) + .add("error_msg", "Failed to submit etl job. Job queue is full.") + .build(), e); + loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()), + true, true); } } }