From 0e72e4eb1010fcd7f7b0186f4d149ca1bf30ffb8 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 3 Nov 2025 10:36:53 +0800 Subject: [PATCH] [improve](job) Adding FailureReason after manually changing the status of a streaming job (#57551) ### What problem does this PR solve? Adding FailureReason after manually changing the status of a streaming job --- .../insert/streaming/StreamingInsertJob.java | 10 ++++++++++ .../insert/streaming/StreamingJobSchedulerTask.java | 1 + .../java/org/apache/doris/job/manager/JobManager.java | 7 ++++++- .../nereids/trees/plans/commands/PauseJobCommand.java | 5 ++++- .../nereids/trees/plans/commands/ResumeJobCommand.java | 4 ++-- 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index b4accf1b15ecc6..f26294cc4757e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -26,6 +26,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -262,6 +263,12 @@ public void updateJobStatus(JobStatus status) throws JobException { } } + public void resetFailureInfo(FailureReason reason) { + this.setFailureReason(reason); + // Currently, only delayMsg is present here, which needs to be cleared when the status changes. + this.setJobRuntimeMsg(""); + } + @Override public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException { lock.writeLock().lock(); @@ -355,6 +362,8 @@ protected void fetchMeta() { offsetProvider.fetchRemoteMeta(originTvfProps); } catch (Exception ex) { log.warn("fetch remote meta failed, job id: {}", getJobId(), ex); + failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR, + "Failed to fetch meta, " + ex.getMessage()); } } @@ -678,6 +687,7 @@ public long getDbId() { } catch (AnalysisException e) { log.warn("failed to get db id for streaming insert job {}, db name: {}, msg: {}", getJobId(), getCurrentDbName(), e.getMessage()); + failureReason = new FailureReason(InternalErrorCode.DB_ERR, "Failed to get db id, " + e.getMessage()); } } return dbId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index 7f483a8f587134..c3655e6697eb9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -88,6 +88,7 @@ private void autoResumeHandler() throws JobException { if (autoResumeCount < Long.MAX_VALUE) { streamingInsertJob.setAutoResumeCount(autoResumeCount + 1); } + streamingInsertJob.resetFailureInfo(null); streamingInsertJob.updateJobStatus(JobStatus.PENDING); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 95c38e0fa06963..c62e94f8d646fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -36,6 +36,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.common.FailureReason; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; @@ -290,12 +291,16 @@ public void alterJob(AlterJobCommand alterJobCommand) throws JobException, Analy log.info("update job success, jobId: {}", job.getJobId()); } - public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException { + + public void alterJobStatus(String jobName, JobStatus jobStatus, FailureReason reason) throws JobException { for (T a : jobMap.values()) { if (a.getJobName().equals(jobName)) { try { checkSameStatus(a, jobStatus); alterJobStatus(a.getJobId(), jobStatus); + if (a instanceof StreamingInsertJob) { + ((StreamingInsertJob) a).resetFailureInfo(reason); + } } catch (JobException e) { throw new JobException("Alter job status error, jobName is %s, errorMsg is %s", jobName, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java index 2b22a527293166..7d71302fcf8827 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java @@ -18,6 +18,8 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.StmtType; +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.job.common.FailureReason; import org.apache.doris.job.common.JobStatus; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -39,7 +41,8 @@ public R accept(PlanVisitor visitor, C context) { @Override public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { - ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED); + ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED, + new FailureReason(InternalErrorCode.MANUAL_PAUSE_ERR, "Job paused by user " + ctx.getQualifiedUser())); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java index 906f1c9d159cc3..40e25dbdc12b45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java @@ -43,9 +43,9 @@ public R accept(PlanVisitor visitor, C context) { public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { AbstractJob job = ctx.getEnv().getJobManager().getJobByName(super.getJobName()); if (job instanceof StreamingInsertJob) { - ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PENDING); + ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PENDING, null); } else { - ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.RUNNING); + ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.RUNNING, null); } }