diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 4db4292c04763e..83b84348ff77eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -246,7 +246,7 @@ private boolean canCreateTask(TaskType taskType) { switch (taskType) { case SCHEDULED: - return currentJobStatus.equals(JobStatus.RUNNING); + return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PENDING); case MANUAL: return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PAUSED); default: @@ -294,7 +294,8 @@ public void updateJobStatus(JobStatus newJobStatus) throws JobException { } String errorMsg = String.format("Can't update job %s status to the %s status", jobStatus.name(), newJobStatus.name()); - if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) { + if (newJobStatus.equals(JobStatus.RUNNING) + && (!jobStatus.equals(JobStatus.PAUSED) && !jobStatus.equals(JobStatus.PENDING))) { throw new IllegalArgumentException(errorMsg); } if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java new file mode 100644 index 00000000000000..93ff5f41480b38 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.common; + +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; + +public class JobUtils { + public static boolean checkNeedSchedule(AbstractJob job) { + if (job.getJobConfig().getExecuteType() == JobExecuteType.STREAMING) { + return !job.isFinalStatus(); + } + return job.getJobStatus() == JobStatus.RUNNING; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index 77078896735a9d..35b1f351f723d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -18,9 +18,8 @@ package org.apache.doris.job.executor; import org.apache.doris.job.base.AbstractJob; -import org.apache.doris.job.base.JobExecuteType; -import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.JobUtils; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.disruptor.TimerJobEvent; import org.apache.doris.job.task.AbstractTask; @@ -56,7 +55,7 @@ public void onEvent(TimerJobEvent event) { log.info("job is null,may be job is deleted, ignore"); return; } - if (event.getJob().isReadyForScheduling(null) && checkStatus(event)) { + if (event.getJob().isReadyForScheduling(null) && JobUtils.checkNeedSchedule(event.getJob())) { List tasks = event.getJob().commonCreateTasks(TaskType.SCHEDULED, null); if (CollectionUtils.isEmpty(tasks)) { log.warn("job is ready for scheduling, but create task is empty, skip scheduler," @@ -78,11 +77,4 @@ public void onEvent(TimerJobEvent event) { log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e); } } - - private boolean checkStatus(TimerJobEvent event) { - if (event.getJob().getJobConfig().getExecuteType() == JobExecuteType.STREAMING) { - return !event.getJob().isFinalStatus(); - } - return event.getJob().getJobStatus() == JobStatus.RUNNING; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 4269fa0d8f3c52..0f58452ff7b065 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -18,7 +18,7 @@ package org.apache.doris.job.executor; import org.apache.doris.job.base.AbstractJob; -import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobUtils; import org.apache.doris.job.disruptor.TaskDisruptor; import io.netty.util.Timeout; @@ -40,7 +40,7 @@ public TimerJobSchedulerTask(TaskDisruptor dispatchDisruptor, T job) { @Override public void run(Timeout timeout) { try { - if (!JobStatus.RUNNING.equals(job.getJobStatus())) { + if (!JobUtils.checkNeedSchedule(job)) { log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 84833604f84683..84e42c6e1f62bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -29,10 +29,10 @@ import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.PauseReason; +import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.InsertJob; -import org.apache.doris.job.offset.Offset; import org.apache.doris.job.offset.SourceOffsetProvider; import org.apache.doris.job.offset.SourceOffsetProviderFactory; import org.apache.doris.job.task.AbstractTask; @@ -42,6 +42,7 @@ import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; @@ -65,7 +66,7 @@ import java.util.Map; public class StreamingInsertJob extends AbstractJob> implements - TxnStateChangeCallback { + TxnStateChangeCallback, GsonPostProcessable { private final long dbId; private StreamingJobStatistic jobStatistic = new StreamingJobStatistic(); @SerializedName("fm") @@ -79,10 +80,16 @@ public class StreamingInsertJob extends AbstractJob createTasks(TaskType taskType, Map rfiles = new ArrayList<>(); + String startFile = currentOffset == null ? null : currentOffset.endFile; try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { - maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, currentOffset.endFile, + maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, startFile, jobProperties.getS3BatchFiles(), jobProperties.getS3BatchSize()); - offset.setStartFile(currentOffset.endFile); + offset.setStartFile(startFile); offset.setEndFile(rfiles.get(rfiles.size() - 1)); offset.setFileLists(rfiles); } catch (Exception e) { @@ -90,15 +92,14 @@ public Offset getCurrentOffset() { } @Override - public InsertIntoTableCommand rewriteTvfParams(Offset nextOffset) { - S3Offset offset = (S3Offset) nextOffset; + public InsertIntoTableCommand rewriteTvfParams(Offset runningOffset) { + S3Offset offset = (S3Offset) runningOffset; Map props = new HashMap<>(); - //todo: need to change file list to glob string - props.put("uri", offset.getFileLists().toString()); - + String finalUri = "{" + String.join(",", offset.getFileLists()) + "}"; + props.put("uri", finalUri); InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(executeSql); - - command.rewriteFirstTvfProperties(getSourceType(), props); + //todo: command query plan is immutable + //command.rewriteFirstTvfProperties(getSourceType(), props); return command; } @@ -114,6 +115,9 @@ public void fetchRemoteMeta() { @Override public boolean hasMoreDataToConsume() { + if (currentOffset == null) { + return true; + } if (currentOffset.endFile.compareTo(maxRemoteEndFile) < 0) { return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 49445c46e0df53..ab03cce7c9195c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -24,6 +24,7 @@ import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobUtils; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.disruptor.TaskDisruptor; import org.apache.doris.job.exception.JobException; @@ -109,7 +110,7 @@ private void batchSchedulerTimerJob() { } public void scheduleOneJob(T job) throws JobException { - if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + if (!JobUtils.checkNeedSchedule(job)) { return; } // not-schedule task @@ -144,7 +145,7 @@ public void scheduleOneJob(T job) throws JobException { } public void cycleTimerJobScheduler(T job) { - if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + if (!JobUtils.checkNeedSchedule(job)) { return; } if (!JobExecuteType.RECURRING.equals(job.getJobConfig().getExecuteType())) { @@ -225,7 +226,7 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() { clearEndJob(job); continue; } - if (job.getJobStatus().equals(JobStatus.RUNNING) && job.getJobConfig().checkIsTimerJob()) { + if (JobUtils.checkNeedSchedule(job) && job.getJobConfig().checkIsTimerJob()) { cycleTimerJobScheduler(job, lastTimeWindowMs); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java index bdc27e983e17d9..3562ed028d21b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java @@ -97,16 +97,17 @@ private void scheduleOneTask(StreamingInsertTask task) throws JobException { task.getTaskId(), task.getJobId()); return; } - if (job.hasMoreDataToConsume()) { + if (!job.hasMoreDataToConsume()) { scheduleTaskWithDelay(task, 500); return; } - if (job.needDelayScheduleTask()) { + if (job.getLastScheduleTaskTimestamp() != -1 && job.needDelayScheduleTask()) { scheduleTaskWithDelay(task, 500); return; } log.info("prepare to schedule task, task id: {}, job id: {}", task.getTaskId(), task.getJobId()); task.execute(); + job.setLastScheduleTaskTimestamp(System.currentTimeMillis()); } private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java index 61a8eb0a5c06ea..51d0dddc7cb664 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -130,6 +130,7 @@ public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserExcepti executeType = JobExecuteType.STREAMING; properties = new StreamingJobProperties(jobProperties); properties.validate(); + jobExecutionConfiguration.setImmediate(true); } jobExecutionConfiguration.setExecuteType(executeType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 8bf31d941df1d5..04a1db682c5e38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -565,9 +565,6 @@ private UnboundTVFRelation getFirstTvfInPlan(LogicalPlan plan) { public void rewriteFirstTvfProperties(String functionName, Map props) { AtomicBoolean found = new AtomicBoolean(false); rewriteFirstTvfInPlan(originLogicalQuery, functionName, props, found); - if (logicalQuery.isPresent() && !found.get()) { - rewriteFirstTvfInPlan(logicalQuery.get(), functionName, props, found); - } } private void rewriteFirstTvfInPlan(LogicalPlan plan,