diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 888ac78991353d..fa4be5739bbcf2 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -78,8 +78,9 @@ public KafkaRoutineLoadJob() { super(-1, LoadDataSourceType.KAFKA); } - public KafkaRoutineLoadJob(Long id, String name, long dbId, long tableId, String brokerList, String topic) { - super(id, name, dbId, tableId, LoadDataSourceType.KAFKA); + public KafkaRoutineLoadJob(Long id, String name, String clusterName, long dbId, long tableId, String brokerList, + String topic) { + super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA); this.brokerList = brokerList; this.topic = topic; this.progress = new KafkaProgress(); @@ -119,7 +120,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) { if (state == JobState.NEED_SCHEDULE) { // divide kafkaPartitions into tasks for (int i = 0; i < currentConcurrentTaskNum; i++) { - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } @@ -145,7 +146,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) { @Override public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo(); - int aliveBeNum = systemInfoService.getClusterBackendIds(getClusterName(), true).size(); + int aliveBeNum = systemInfoService.getClusterBackendIds(clusterName, true).size(); int partitionNum = currentKafkaPartitions.size(); if (desireTaskConcurrentNum == 0) { desireTaskConcurrentNum = partitionNum; @@ -290,8 +291,8 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr // init kafka routine load job long id = Catalog.getInstance().getNextId(); - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), db.getId(), - tableId, stmt.getKafkaBrokerList(), stmt.getKafkaTopic()); + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), + db.getClusterName(), db.getId(), tableId, stmt.getKafkaBrokerList(), stmt.getKafkaTopic()); kafkaRoutineLoadJob.setOptional(stmt); return kafkaRoutineLoadJob; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 550f645afe13b4..fda5b29cef21fc 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -45,14 +45,14 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private List partitions; - public KafkaTaskInfo(UUID id, long jobId) { - super(id, jobId); + public KafkaTaskInfo(UUID id, long jobId, String clusterName) { + super(id, jobId, clusterName); this.partitions = new ArrayList<>(); } public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { - super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getBeId()); + super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(), kafkaTaskInfo.getBeId()); this.partitions = kafkaTaskInfo.getPartitions(); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 4981ae6ad26cc9..7ebfb90e118d94 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -123,6 +123,7 @@ public boolean isFinalState() { protected long id; protected String name; + protected String clusterName; protected long dbId; protected long tableId; // this code is used to verify be task request @@ -177,12 +178,13 @@ public RoutineLoadJob(long id, LoadDataSourceType type) { this.dataSourceType = type; } - public RoutineLoadJob(Long id, String name, long dbId, long tableId, LoadDataSourceType dataSourceType) { - this.id = id; + public RoutineLoadJob(Long id, String name, String clusterName, long dbId, long tableId, + LoadDataSourceType dataSourceType) { + this(id, dataSourceType); this.name = name; + this.clusterName = clusterName; this.dbId = dbId; this.tableId = tableId; - this.dataSourceType = dataSourceType; this.endTimestamp = -1; this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser()) .append(ConnectContext.get().getRemoteIP()) @@ -323,19 +325,6 @@ public String getPartitions() { } } - public String getClusterName() throws MetaNotFoundException { - Database database = Catalog.getCurrentCatalog().getDb(dbId); - if (database == null) { - throw new MetaNotFoundException("Database " + dbId + "has been deleted"); - } - database.readLock(); - try { - return database.getClusterName(); - } finally { - database.readUnlock(); - } - } - public int getDesiredConcurrentNumber() { return desireTaskConcurrentNum; } @@ -912,6 +901,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(id); Text.writeString(out, name); + Text.writeString(out, clusterName); out.writeLong(dbId); out.writeLong(tableId); out.writeInt(desireTaskConcurrentNum); @@ -934,6 +924,7 @@ public void readFields(DataInput in) throws IOException { id = in.readLong(); name = Text.readString(in); + clusterName = Text.readString(in); dbId = in.readLong(); tableId = in.readLong(); desireTaskConcurrentNum = in.readInt(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 50cb97f4968f4a..1d78459ac748c8 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -88,9 +88,9 @@ private void writeUnlock() { public RoutineLoadManager() { } - private void updateBeIdToMaxConcurrentTasks() { + public void updateBeIdToMaxConcurrentTasks() { beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) - .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); + .stream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); } // this is not real-time number @@ -98,31 +98,6 @@ public int getTotalMaxConcurrentTaskNum() { return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum(); } - public void updateBeIdTaskMaps() { - writeLock(); - try { - // step1: update backend number in all of cluster - updateBeIdToMaxConcurrentTasks(); - List beIds = Catalog.getCurrentSystemInfo().getBackendIds(true); - - // diff beIds and beIdToMaxConcurrentTasks.keys() - List newBeIds = beIds.parallelStream().filter(entity -> beIdToMaxConcurrentTasks.get(entity) == null) - .collect(Collectors.toList()); - List unavailableBeIds = beIdToMaxConcurrentTasks.keySet().parallelStream() - .filter(entity -> !beIds.contains(entity)) - .collect(Collectors.toList()); - newBeIds.parallelStream().forEach(entity -> beIdToMaxConcurrentTasks.put(entity, DEFAULT_BE_CONCURRENT_TASK_NUM)); - for (long beId : unavailableBeIds) { - beIdToMaxConcurrentTasks.remove(beId); - } - LOG.info("There are {} backends which participate in routine load scheduler. " - + "There are {} new backends and {} unavailable backends for routine load", - beIdToMaxConcurrentTasks.size(), newBeIds.size(), unavailableBeIds.size()); - } finally { - writeUnlock(); - } - } - private Map getBeIdConcurrentTaskMaps() { Map beIdToConcurrentTasks = Maps.newHashMap(); for (RoutineLoadJob routineLoadJob : getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING)) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index fec8fb731e0ee4..35608d8280e6b6 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -43,6 +43,8 @@ public abstract class RoutineLoadTaskInfo { protected UUID id; protected long txnId; protected long jobId; + protected String clusterName; + private long createTimeMs; private long loadStartTimeMs; // the be id of previous task @@ -50,16 +52,15 @@ public abstract class RoutineLoadTaskInfo { // the be id of this task protected long beId = -1L; - public RoutineLoadTaskInfo(UUID id, long jobId) { + public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName) { this.id = id; this.jobId = jobId; + this.clusterName = clusterName; this.createTimeMs = System.currentTimeMillis(); } - public RoutineLoadTaskInfo(UUID id, long jobId, long previousBeId) { - this.id = id; - this.jobId = jobId; - this.createTimeMs = System.currentTimeMillis(); + public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long previousBeId) { + this(id, jobId, clusterName); this.previousBeId = previousBeId; } @@ -71,6 +72,10 @@ public long getJobId() { return jobId; } + public String getClusterName() { + return clusterName; + } + public void setLoadStartTimeMs(long loadStartTimeMs) { this.loadStartTimeMs = loadStartTimeMs; } @@ -87,6 +92,10 @@ public long getBeId() { return beId; } + public long getCreateTimeMs() { + return createTimeMs; + } + public long getLoadStartTimeMs() { return loadStartTimeMs; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 83a40f2c563f0a..66d26e51a45655 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -56,9 +56,13 @@ public class RoutineLoadTaskScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class); + private static final long BACKEND_SLOT_UPDATE_INTERVAL_MS = 10000; // 10s + private RoutineLoadManager routineLoadManager; private LinkedBlockingQueue needScheduleTasksQueue; + private long lastBackendSlotUpdateTime = -1; + @VisibleForTesting public RoutineLoadTaskScheduler() { super("routine load task", 0); @@ -83,23 +87,17 @@ protected void runOneCycle() { } private void process() throws LoadException, UserException, InterruptedException { - // update current beIdMaps for tasks - routineLoadManager.updateBeIdTaskMaps(); + updateBackendSlotIfNecessary(); - LOG.info("There are {} need schedule task in queue when {}", - needScheduleTasksQueue.size(), System.currentTimeMillis()); - Map> beIdTobatchTask = Maps.newHashMap(); int sizeOfTasksQueue = needScheduleTasksQueue.size(); int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum(); int needScheduleTaskNum = sizeOfTasksQueue < clusterIdleSlotNum ? sizeOfTasksQueue : clusterIdleSlotNum; + + LOG.info("There are {} tasks need to be scheduled in queue", needScheduleTasksQueue.size()); + int scheduledTaskNum = 0; - // get idle be task num - // allocate task to be -// if (needScheduleTaskNum == 0) { -// Thread.sleep(1000); -// return; -// } - while (needScheduleTaskNum > 0) { + Map> beIdTobatchTask = Maps.newHashMap(); + while (needScheduleTaskNum-- > 0) { // allocate be to task and begin transaction for task RoutineLoadTaskInfo routineLoadTaskInfo = null; try { @@ -109,21 +107,17 @@ private void process() throws LoadException, UserException, InterruptedException e.getMessage(),e); return; } - RoutineLoadJob routineLoadJob = null; try { - routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId()); - allocateTaskToBe(routineLoadTaskInfo, routineLoadJob); + allocateTaskToBe(routineLoadTaskInfo); routineLoadTaskInfo.beginTxn(); } catch (MetaNotFoundException e) { // task has been abandoned while renew task has been added in queue // or database has been deleted - needScheduleTaskNum--; LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId()) .add("error_msg", "task has been abandoned with error " + e.getMessage()).build(), e); continue; } catch (LoadException e) { needScheduleTasksQueue.put(routineLoadTaskInfo); - needScheduleTaskNum--; LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId()) .add("error_msg", "put task to the rear of queue with error " + e.getMessage()) .build(), e); @@ -144,12 +138,21 @@ private void process() throws LoadException, UserException, InterruptedException } // count scheduledTaskNum++; - needScheduleTaskNum--; } submitBatchTask(beIdTobatchTask); LOG.info("{} tasks have been allocated to be.", scheduledTaskNum); } + private void updateBackendSlotIfNecessary() { + long currentTime = System.currentTimeMillis(); + if (lastBackendSlotUpdateTime != -1 + && currentTime - lastBackendSlotUpdateTime > BACKEND_SLOT_UPDATE_INTERVAL_MS) { + routineLoadManager.updateBeIdToMaxConcurrentTasks(); + lastBackendSlotUpdateTime = currentTime; + LOG.debug("update backend max slot for routine load task scheduling"); + } + } + public void addTaskInQueue(RoutineLoadTaskInfo routineLoadTaskInfo) { needScheduleTasksQueue.add(routineLoadTaskInfo); } @@ -184,12 +187,12 @@ private void submitBatchTask(Map> beIdToRoutineLoad // check if previous be has idle slot // true: allocate previous be to task // false: allocate the most idle be to task - private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo, RoutineLoadJob routineLoadJob) + private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws MetaNotFoundException, LoadException { if (routineLoadTaskInfo.getPreviousBeId() != -1L) { - if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadJob.getClusterName())) { + if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName())) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId()) - .add("job_id", routineLoadJob.getId()) + .add("job_id", routineLoadTaskInfo.getJobId()) .add("previous_be_id", routineLoadTaskInfo.getPreviousBeId()) .add("msg", "task use the previous be id") .build()); @@ -197,9 +200,9 @@ private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo, RoutineLo return; } } - routineLoadTaskInfo.setBeId(routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName())); + routineLoadTaskInfo.setBeId(routineLoadManager.getMinTaskBeId(routineLoadTaskInfo.getClusterName())); LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId()) - .add("job_id", routineLoadJob.getId()) + .add("job_id", routineLoadTaskInfo.getJobId()) .add("be_id", routineLoadTaskInfo.getBeId()) .add("msg", "task has been allocated to be") .build()); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 3c166aad31ef39..06c6d5bbb7acc3 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -219,7 +219,7 @@ public void testProcessTimeOutTasks(@Mocked KafkaConsumer kafkaConsumer, }; List routineLoadTaskInfoList = new ArrayList<>(); - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster"); kafkaTaskInfo.addKafkaPartition(100); kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000); routineLoadTaskInfoList.add(kafkaTaskInfo); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 1711d1cdc2ecd2..d1685efd757a88 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -85,9 +85,8 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, loadPropertyList, properties, typeName, customProperties); - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, 1L, 1L, serverAddress, - topicName); - + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, + serverAddress, topicName); new MockUp() { @Mock @@ -178,7 +177,8 @@ public void testCreateWithSameName(@Mocked ConnectContext connectContext) { String jobName = "job1"; String topicName = "topic1"; String serverAddress = "http://127.0.0.1:8080"; - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, 1L, 1L, serverAddress, + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, + serverAddress, topicName); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); @@ -186,8 +186,8 @@ public void testCreateWithSameName(@Mocked ConnectContext connectContext) { Map>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); List routineLoadJobList = Lists.newArrayList(); - KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, 1L, 1L, - serverAddress, topicName); + KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", + 1L, 1L, serverAddress, topicName); routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); nameToRoutineLoadJob.put(jobName, routineLoadJobList); dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); @@ -206,16 +206,16 @@ public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectCon String jobName = "job1"; String topicName = "topic1"; String serverAddress = "http://127.0.0.1:8080"; - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, 1L, 1L, serverAddress, - topicName); + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, + serverAddress, topicName); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); Map>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); List routineLoadJobList = Lists.newArrayList(); - KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, 1L, 1L, - serverAddress, topicName); + KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", + 1L, 1L, serverAddress, topicName); Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED); routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); nameToRoutineLoadJob.put(jobName, routineLoadJobList); @@ -298,7 +298,7 @@ public void testUpdateBeIdTaskMaps() { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.updateBeIdTaskMaps(); + routineLoadManager.updateBeIdToMaxConcurrentTasks(); } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 975b091b58e3c9..dbe1ff6e8332e0 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -130,8 +130,8 @@ public void functionTest(@Mocked Catalog catalog, } }; - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", 1L, 1L, "10.74.167.16:8092", - "test"); + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, + "10.74.167.16:8092", "test"); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob); @@ -158,8 +158,8 @@ public void functionTest(@Mocked Catalog catalog, executorService.submit(routineLoadScheduler); executorService.submit(routineLoadTaskScheduler); - KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition", 1L, 1L, - "10.74.167.16:8092", "test_1"); + KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition", + "default_cluster", 1L, 1L, "10.74.167.16:8092", "test_1"); List customKafkaPartitions = new ArrayList<>(); customKafkaPartitions.add(2); Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 3fdb5bcc32be8d..80cbea8f222030 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -59,7 +59,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 long beId = 100L; Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l, "default_cluster"); routineLoadTaskInfo1.addKafkaPartition(1); routineLoadTaskInfo1.addKafkaPartition(2); routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index a7bc4da11da3ae..010575a55cdf46 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -316,7 +316,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet List tabletCommitInfoList = new ArrayList<>(); tabletCommitInfoList.add(tabletCommitInfo); - KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", 1L, 1L, "host:port", "topic"); + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic"); List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); @@ -389,7 +389,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi List tabletCommitInfoList = new ArrayList<>(); tabletCommitInfoList.add(tabletCommitInfo); - KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", 1L, 1L, "host:port", "topic"); + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic"); List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1");