Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
Expand Down Expand Up @@ -262,6 +263,12 @@ public void updateJobStatus(JobStatus status) throws JobException {
}
}

public void resetFailureInfo(FailureReason reason) {
this.setFailureReason(reason);
// Currently, only delayMsg is present here, which needs to be cleared when the status changes.
this.setJobRuntimeMsg("");
}

@Override
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
lock.writeLock().lock();
Expand Down Expand Up @@ -355,6 +362,8 @@ protected void fetchMeta() {
offsetProvider.fetchRemoteMeta(originTvfProps);
} catch (Exception ex) {
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to fetch meta, " + ex.getMessage());
}
}

Expand Down Expand Up @@ -709,6 +718,7 @@ public long getDbId() {
} catch (AnalysisException e) {
log.warn("failed to get db id for streaming insert job {}, db name: {}, msg: {}",
getJobId(), getCurrentDbName(), e.getMessage());
failureReason = new FailureReason(InternalErrorCode.DB_ERR, "Failed to get db id, " + e.getMessage());
}
}
return dbId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private void autoResumeHandler() throws JobException {
if (autoResumeCount < Long.MAX_VALUE) {
streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
}
streamingInsertJob.resetFailureInfo(null);
streamingInsertJob.updateJobStatus(JobStatus.PENDING);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
Expand Down Expand Up @@ -290,12 +291,16 @@ public void alterJob(AlterJobCommand alterJobCommand) throws JobException, Analy
log.info("update job success, jobId: {}", job.getJobId());
}

public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException {

public void alterJobStatus(String jobName, JobStatus jobStatus, FailureReason reason) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
try {
checkSameStatus(a, jobStatus);
alterJobStatus(a.getJobId(), jobStatus);
if (a instanceof StreamingInsertJob) {
((StreamingInsertJob) a).resetFailureInfo(reason);
}
} catch (JobException e) {
throw new JobException("Alter job status error, jobName is %s, errorMsg is %s",
jobName, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand All @@ -39,7 +41,8 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {

@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED);
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED,
new FailureReason(InternalErrorCode.MANUAL_PAUSE_ERR, "Job paused by user " + ctx.getQualifiedUser()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
AbstractJob job = ctx.getEnv().getJobManager().getJobByName(super.getJobName());
if (job instanceof StreamingInsertJob) {
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PENDING);
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PENDING, null);
} else {
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.RUNNING);
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.RUNNING, null);
}
}

Expand Down
Loading