From fd82747f287f882b2d17f3fab0f4353a68e4ac0f Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 31 Oct 2025 11:40:52 +0800 Subject: [PATCH 1/2] add pause code and message in job --- .../extensions/insert/streaming/StreamingInsertJob.java | 6 ++++++ .../main/java/org/apache/doris/job/manager/JobManager.java | 7 ++++++- .../nereids/trees/plans/commands/PauseJobCommand.java | 5 ++++- .../nereids/trees/plans/commands/ResumeJobCommand.java | 4 ++-- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index a21a3c854fa1cf..5359b60b187d09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -262,6 +262,12 @@ public void updateJobStatus(JobStatus status) throws JobException { } } + public void afterManualStatusChange(FailureReason reason) { + this.setFailureReason(reason); + // Currently, only delayMsg is present here, which needs to be cleared when the status changes. + this.setJobRuntimeMsg(""); + } + @Override public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException { lock.writeLock().lock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 95c38e0fa06963..5e851ed7d7d06e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -36,6 +36,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.common.FailureReason; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; @@ -290,12 +291,16 @@ public void alterJob(AlterJobCommand alterJobCommand) throws JobException, Analy log.info("update job success, jobId: {}", job.getJobId()); } - public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException { + + public void alterJobStatus(String jobName, JobStatus jobStatus, FailureReason reason) throws JobException { for (T a : jobMap.values()) { if (a.getJobName().equals(jobName)) { try { checkSameStatus(a, jobStatus); alterJobStatus(a.getJobId(), jobStatus); + if (a instanceof StreamingInsertJob) { + ((StreamingInsertJob) a).afterManualStatusChange(reason); + } } catch (JobException e) { throw new JobException("Alter job status error, jobName is %s, errorMsg is %s", jobName, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java index 2b22a527293166..7d71302fcf8827 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java @@ -18,6 +18,8 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.StmtType; +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.job.common.FailureReason; import org.apache.doris.job.common.JobStatus; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -39,7 +41,8 @@ public R accept(PlanVisitor visitor, C context) { @Override public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { - ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED); + ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED, + new FailureReason(InternalErrorCode.MANUAL_PAUSE_ERR, "Job paused by user " + ctx.getQualifiedUser())); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java index 906f1c9d159cc3..40e25dbdc12b45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java @@ -43,9 +43,9 @@ public R accept(PlanVisitor visitor, C context) { public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { AbstractJob job = ctx.getEnv().getJobManager().getJobByName(super.getJobName()); if (job instanceof StreamingInsertJob) { - ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PENDING); + ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PENDING, null); } else { - ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.RUNNING); + ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.RUNNING, null); } } From 912becf5c0a7b8d1c4fd4513011a7c23c299eefe Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 31 Oct 2025 15:30:23 +0800 Subject: [PATCH 2/2] fix --- .../job/extensions/insert/streaming/StreamingInsertJob.java | 6 +++++- .../insert/streaming/StreamingJobSchedulerTask.java | 1 + .../main/java/org/apache/doris/job/manager/JobManager.java | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 5359b60b187d09..57eab5cb816666 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -26,6 +26,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -262,7 +263,7 @@ public void updateJobStatus(JobStatus status) throws JobException { } } - public void afterManualStatusChange(FailureReason reason) { + public void resetFailureInfo(FailureReason reason) { this.setFailureReason(reason); // Currently, only delayMsg is present here, which needs to be cleared when the status changes. this.setJobRuntimeMsg(""); @@ -361,6 +362,8 @@ protected void fetchMeta() { offsetProvider.fetchRemoteMeta(originTvfProps); } catch (Exception ex) { log.warn("fetch remote meta failed, job id: {}", getJobId(), ex); + failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR, + "Failed to fetch meta, " + ex.getMessage()); } } @@ -715,6 +718,7 @@ public long getDbId() { } catch (AnalysisException e) { log.warn("failed to get db id for streaming insert job {}, db name: {}, msg: {}", getJobId(), getCurrentDbName(), e.getMessage()); + failureReason = new FailureReason(InternalErrorCode.DB_ERR, "Failed to get db id, " + e.getMessage()); } } return dbId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index 7f483a8f587134..c3655e6697eb9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -88,6 +88,7 @@ private void autoResumeHandler() throws JobException { if (autoResumeCount < Long.MAX_VALUE) { streamingInsertJob.setAutoResumeCount(autoResumeCount + 1); } + streamingInsertJob.resetFailureInfo(null); streamingInsertJob.updateJobStatus(JobStatus.PENDING); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 5e851ed7d7d06e..c62e94f8d646fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -299,7 +299,7 @@ public void alterJobStatus(String jobName, JobStatus jobStatus, FailureReason re checkSameStatus(a, jobStatus); alterJobStatus(a.getJobId(), jobStatus); if (a instanceof StreamingInsertJob) { - ((StreamingInsertJob) a).afterManualStatusChange(reason); + ((StreamingInsertJob) a).resetFailureInfo(reason); } } catch (JobException e) { throw new JobException("Alter job status error, jobName is %s, errorMsg is %s",