diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index d5ebcf26341951..4db4292c04763e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -477,4 +477,8 @@ public void onReplayEnd(AbstractJob replayJob) throws JobException { public boolean needPersist() { return true; } + + public boolean isFinalStatus() { + return jobStatus.equals(JobStatus.STOPPED) || jobStatus.equals(JobStatus.FINISHED); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java index 94cb06db6200fe..25d207110d9cb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java @@ -19,7 +19,10 @@ public enum JobStatus { - + /** + * For streaming job, if the task has not been created, the job will be in pending state. + */ + PENDING, /** * When the task is not started, the initial state will be triggered. * The initial state can be started diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java new file mode 100644 index 00000000000000..49a46327b32756 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.common; + +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataOutput; +import java.io.IOException; + +public class PauseReason implements Writable { + @SerializedName(value = "code") + private InternalErrorCode code; + @SerializedName(value = "msg") + private String msg; + + public PauseReason(InternalErrorCode errCode, String msg) { + this.code = errCode; + this.msg = msg; + } + + public InternalErrorCode getCode() { + return code; + } + + public void setCode(InternalErrorCode code) { + this.code = code; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + @Override + public String toString() { + return "ErrorReason{" + "code=" + code + ", msg='" + msg + '\'' + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index 56222fd3e1fe60..77078896735a9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -18,6 +18,7 @@ package org.apache.doris.job.executor; import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; @@ -55,7 +56,7 @@ public void onEvent(TimerJobEvent event) { log.info("job is null,may be job is deleted, ignore"); return; } - if (event.getJob().isReadyForScheduling(null) && event.getJob().getJobStatus() == JobStatus.RUNNING) { + if (event.getJob().isReadyForScheduling(null) && checkStatus(event)) { List tasks = event.getJob().commonCreateTasks(TaskType.SCHEDULED, null); if (CollectionUtils.isEmpty(tasks)) { log.warn("job is ready for scheduling, but create task is empty, skip scheduler," @@ -77,4 +78,11 @@ public void onEvent(TimerJobEvent event) { log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e); } } + + private boolean checkStatus(TimerJobEvent event) { + if (event.getJob().getJobConfig().getExecuteType() == JobExecuteType.STREAMING) { + return !event.getJob().isFinalStatus(); + } + return event.getJob().getJobStatus() == JobStatus.RUNNING; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java new file mode 100644 index 00000000000000..9e5baccd13450c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert.streaming; + +import org.apache.doris.common.io.Text; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.PauseReason; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; +import lombok.Getter; +import lombok.Setter; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class StreamingInsertJob extends AbstractJob> { + + @Getter + @SerializedName("st") + protected JobStatus status; + + @Getter + protected PauseReason pauseReason; + + @Getter + @Setter + protected long latestAutoResumeTimestamp; + + @Getter + @Setter + protected long autoResumeCount; + + @Override + public void updateJobStatus(JobStatus status) throws JobException { + super.updateJobStatus(status); + } + + protected void createStreamingInsertTask() { + } + + protected void fetchMeta() { + } + + @Override + public JobType getJobType() { + return JobType.INSERT; + } + + @Override + protected void checkJobParamsInternal() { + } + + @Override + public boolean isReadyForScheduling(Map taskContext) { + return true; + } + + @Override + public java.util.List createTasks(org.apache.doris.job.common.TaskType taskType, + Map taskContext) { + return java.util.Collections.emptyList(); + } + + @Override + public org.apache.doris.qe.ShowResultSetMetaData getTaskMetaData() { + return org.apache.doris.qe.ShowResultSetMetaData.builder().build(); + } + + @Override + public String formatMsgWhenExecuteQueueFull(Long taskId) { + return commonFormatMsgWhenExecuteQueueFull(taskId, "streaming_task_queue_size", + "job_streaming_task_consumer_thread_num"); + } + + @Override + public List queryTasks() { + return new ArrayList<>(); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java new file mode 100644 index 00000000000000..51ab96b3b178a7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert.streaming; + +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.PauseReason; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.thrift.TRow; + +public class StreamingJobSchedulerTask extends AbstractTask { + + private static final long BACK_OFF_BASIC_TIME_SEC = 10L; + private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5; + + private StreamingInsertJob streamingInsertJob; + + public StreamingJobSchedulerTask(StreamingInsertJob streamingInsertJob) { + this.streamingInsertJob = streamingInsertJob; + } + + @Override + public void run() throws JobException { + switch (streamingInsertJob.getStatus()) { + case PENDING: + streamingInsertJob.createStreamingInsertTask(); + streamingInsertJob.updateJobStatus(JobStatus.RUNNING); + streamingInsertJob.setAutoResumeCount(0); + break; + case RUNNING: + streamingInsertJob.fetchMeta(); + break; + case PAUSED: + autoResumeHandler(); + break; + default: + break; + } + } + + private void autoResumeHandler() throws JobException { + final PauseReason pauseReason = streamingInsertJob.getPauseReason(); + final long latestAutoResumeTimestamp = streamingInsertJob.getLatestAutoResumeTimestamp(); + final long autoResumeCount = streamingInsertJob.getAutoResumeCount(); + final long current = System.currentTimeMillis(); + + if (pauseReason != null + && pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR + && pauseReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR + && pauseReason.getCode() != InternalErrorCode.CANNOT_RESUME_ERR) { + long autoResumeIntervalTimeSec = autoResumeCount < 5 + ? Math.min((long) Math.pow(2, autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, + MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC; + if (current - latestAutoResumeTimestamp > autoResumeIntervalTimeSec * 1000L) { + streamingInsertJob.setLatestAutoResumeTimestamp(current); + if (autoResumeCount < Long.MAX_VALUE) { + streamingInsertJob.setAutoResumeCount(autoResumeCount + 1); + } + streamingInsertJob.updateJobStatus(JobStatus.RUNNING); + return; + } + } + } + + @Override + protected void closeOrReleaseResources() { + } + + @Override + protected void executeCancelLogic(boolean needWaitCancelComplete) throws Exception { + } + + @Override + public TRow getTvfInfo(String jobName) { + return null; + } +}