diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index b18ffac380ba1a..24df73cbbac56e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -171,7 +171,7 @@ public List createTasks(TaskType taskType, Map, C> implements Writable { private JobScheduler jobScheduler; + @Getter + private final StreamingTaskManager streamingTaskManager = new StreamingTaskManager(); @Getter private StreamingTaskScheduler streamingTaskScheduler; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java new file mode 100644 index 00000000000000..7c6fecee14a179 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/StreamingTaskManager.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask; + +import lombok.Getter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; + +public class StreamingTaskManager { + @Getter + private final LinkedBlockingDeque needScheduleTasksQueue = new LinkedBlockingDeque<>(); + @Getter + private List runningTasks = Collections.synchronizedList(new ArrayList<>()); + + public void registerTask(StreamingInsertTask task) { + needScheduleTasksQueue.add(task); + } + + public StreamingInsertTask getStreamingInsertTaskById(long taskId) { + synchronized (runningTasks) { + return runningTasks.stream() + .filter(task -> task.getTaskId() == taskId) + .findFirst() + .orElse(null); + } + } + + public void addRunningTask(StreamingInsertTask task) { + runningTasks.add(task); + } + + public void removeRunningTask(StreamingInsertTask task) { + runningTasks.remove(task); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java index 3562ed028d21b2..3fbae399303c61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java @@ -37,7 +37,6 @@ @Log4j2 public class StreamingTaskScheduler extends MasterDaemon { - private final LinkedBlockingDeque needScheduleTasksQueue = new LinkedBlockingDeque<>(); private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 0, Config.max_streaming_job_num, @@ -63,12 +62,10 @@ protected void runAfterCatalogReady() { } } - public void registerTask(StreamingInsertTask task) { - needScheduleTasksQueue.add(task); - } - private void process() throws InterruptedException { List tasks = new ArrayList<>(); + LinkedBlockingDeque needScheduleTasksQueue = + Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getNeedScheduleTasksQueue(); tasks.add(needScheduleTasksQueue.take()); needScheduleTasksQueue.drainTo(tasks); scheduleTasks(tasks); @@ -106,13 +103,14 @@ private void scheduleOneTask(StreamingInsertTask task) throws JobException { return; } log.info("prepare to schedule task, task id: {}, job id: {}", task.getTaskId(), task.getJobId()); - task.execute(); job.setLastScheduleTaskTimestamp(System.currentTimeMillis()); + Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task); + task.execute(); } private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) { delayScheduler.schedule(() -> { - needScheduleTasksQueue.add(task); + Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task); }, delayMs, TimeUnit.MILLISECONDS); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 47ef9cc3c2ebca..f41d8893f020e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -31,6 +31,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask; import org.apache.doris.load.EtlJobType; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -98,16 +99,26 @@ public void beginTransaction() { throw new BeginTransactionException("current running txns on db is larger than limit"); } this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - database.getId(), ImmutableList.of(table.getId()), labelName, + database.getId(), ImmutableList.of(table.getId()), labelName, null, new TxnCoordinator(TxnSourceType.FE, 0, FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()), - LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS()); + LoadJobSourceType.INSERT_STREAMING, getListenerId(), ctx.getExecTimeoutS()); } catch (Exception e) { throw new AnalysisException("begin transaction failed. " + e.getMessage(), e); } } + private long getListenerId() { + long listenerId = -1; + StreamingInsertTask streamingInsertTask = + Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId); + if (streamingInsertTask != null) { + listenerId = streamingInsertTask.getJobId(); + } + return listenerId; + } + @Override public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { OlapTableSink olapTableSink = (OlapTableSink) sink; diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 7c630f03353f98..aa6bcd28359734 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -368,7 +368,7 @@ message RoutineLoadJobStatisticPB { message StreamingTaskCommitAttachmentPB { optional int64 job_id = 1; - optional UniqueIdPB task_id = 2; + optional int64 task_id = 2; optional string offset = 3; optional int64 scanned_rows = 4; optional int64 load_bytes = 5;