From 9e9382df38cb7b6e3ed964f8e7eab90e6ee1d1bf Mon Sep 17 00:00:00 2001 From: laihui Date: Wed, 10 Sep 2025 15:52:41 +0800 Subject: [PATCH] introduce streaming task scheduler --- .../java/org/apache/doris/common/Config.java | 5 + .../insert/streaming/StreamingInsertJob.java | 20 +++- .../insert/streaming/StreamingInsertTask.java | 36 ++++++ .../streaming/StreamingJobSchedulerTask.java | 2 +- .../apache/doris/job/manager/JobManager.java | 6 +- .../job/scheduler/StreamingTaskScheduler.java | 110 ++++++++++++++++++ 6 files changed, 173 insertions(+), 6 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e2c5b9eb8aec9f..7770318a5e928b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1955,6 +1955,11 @@ public class Config extends ConfigBase { + " greater than 0, otherwise it defaults to 3." }) public static int job_dictionary_task_consumer_thread_num = 3; + @ConfField(masterOnly = true, description = {"最大的 Streaming 作业数量,值应该大于0,否则默认为1024", + "The maximum number of Streaming jobs, " + + "the value should be greater than 0, if it is <=0, default is 1024."}) + public static int max_streaming_job_num = 1024; + /* job test config */ /** * If set to true, we will allow the interval unit to be set to second, when creating a recurring job. diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index a2016c52cb9371..a4bbf51b8a40d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -42,9 +42,6 @@ public class StreamingInsertJob extends AbstractJob queryTasks() { public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); } + + public boolean needScheduleTask() { + return (getJobStatus().equals(JobStatus.RUNNING) || getJobStatus().equals(JobStatus.PENDING)); + } + + // When consumer to EOF, delay schedule task appropriately can avoid too many small transactions. + public boolean needDelayScheduleTask() { + return System.currentTimeMillis() - lastScheduleTaskTimestamp > jobProperties.getMaxIntervalSecond() * 1000; + } + + public boolean hasMoreDataToConsume() { + // TODO: implement this + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java new file mode 100644 index 00000000000000..4e07ee01e305cf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert.streaming; + +import lombok.Getter; + +public class StreamingInsertTask { + @Getter + private long jobId; + + @Getter + private long taskId; + + public StreamingInsertTask(long jobId, long taskId) { + this.jobId = jobId; + this.taskId = taskId; + } + + public void execute() { + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index 51ab96b3b178a7..888669f428af90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -37,7 +37,7 @@ public StreamingJobSchedulerTask(StreamingInsertJob streamingInsertJob) { @Override public void run() throws JobException { - switch (streamingInsertJob.getStatus()) { + switch (streamingInsertJob.getJobStatus()) { case PENDING: streamingInsertJob.createStreamingInsertTask(); streamingInsertJob.updateJobStatus(JobStatus.RUNNING); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index e763f8de590153..ae6bad07066d4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -40,6 +40,7 @@ import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.scheduler.JobScheduler; +import org.apache.doris.job.scheduler.StreamingTaskScheduler; import org.apache.doris.load.loadv2.JobState; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.expressions.And; @@ -72,6 +73,8 @@ public class JobManager, C> implements Writable { private JobScheduler jobScheduler; + private StreamingTaskScheduler streamingTaskScheduler; + // lock for job // lock is private and must use after db lock private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -95,9 +98,10 @@ private void writeUnlock() { public void start() { jobScheduler = new JobScheduler(jobMap); jobScheduler.start(); + streamingTaskScheduler = new StreamingTaskScheduler(); + streamingTaskScheduler.start(); } - /** * get running job * diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java new file mode 100644 index 00000000000000..d87a2b7833d402 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.scheduler; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask; + +import lombok.extern.log4j.Log4j2; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Log4j2 +public class StreamingTaskScheduler extends MasterDaemon { + private final LinkedBlockingDeque needScheduleTasksQueue = new LinkedBlockingDeque<>(); + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( + 0, + Config.max_streaming_job_num, + 60, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(Config.max_streaming_job_num), + new CustomThreadFactory("streaming-task-execute"), + new ThreadPoolExecutor.AbortPolicy() + ); + private final ScheduledThreadPoolExecutor delayScheduler + = new ScheduledThreadPoolExecutor(1, new CustomThreadFactory("streaming-task-delay-scheduler")); + + public StreamingTaskScheduler() { + super("Streaming-task-scheduler", 1); + } + + @Override + protected void runAfterCatalogReady() { + try { + process(); + } catch (Throwable e) { + log.warn("Failed to process one round of StreamingTaskScheduler", e); + } + } + + public void registerTask(StreamingInsertTask task) { + needScheduleTasksQueue.add(task); + } + + private void process() throws InterruptedException { + List tasks = new ArrayList<>(); + tasks.add(needScheduleTasksQueue.take()); + needScheduleTasksQueue.drainTo(tasks); + scheduleTasks(tasks); + } + + private void scheduleTasks(List tasks) { + for (StreamingInsertTask task : tasks) { + threadPool.execute(() -> scheduleOneTask(task)); + } + } + + private void scheduleOneTask(StreamingInsertTask task) { + StreamingInsertJob job = (StreamingInsertJob) Env.getCurrentEnv().getJobManager().getJob(task.getJobId()); + if (job == null) { + log.warn("Job not found, job id: {}", task.getJobId()); + return; + } + if (!job.needScheduleTask()) { + log.info("do not need to schedule invalid task, task id: {}, job id: {}", + task.getTaskId(), task.getJobId()); + return; + } + if (job.hasMoreDataToConsume()) { + scheduleTaskWithDelay(task, 500); + return; + } + if (job.needDelayScheduleTask()) { + scheduleTaskWithDelay(task, 500); + return; + } + log.info("prepare to schedule task, task id: {}, job id: {}", task.getTaskId(), task.getJobId()); + task.execute(); + } + + private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) { + delayScheduler.schedule(() -> { + needScheduleTasksQueue.add(task); + }, delayMs, TimeUnit.MILLISECONDS); + } +}