diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java index fd10a3bc467474..eff1c345e5a0ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java @@ -23,6 +23,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; @@ -63,4 +64,16 @@ protected List getAvailableBackendIds(long jobId) throws LoadException { .map(Backend::getId) .collect(Collectors.toList()); } + + @Override + public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { + routineLoadJob.setCloudClusterById(); + super.replayCreateRoutineLoadJob(routineLoadJob); + } + + @Override + public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { + getJob(operation.getId()).setCloudClusterById(); + super.replayChangeRoutineLoadJob(operation); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index b9985e95d08fdb..46c330dca87e0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1563,6 +1563,11 @@ public String getCloudClusterId() { return cloudClusterId; } + public void setCloudClusterById() { + this.cloudCluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getClusterNameByClusterId(cloudClusterId); + } + // check the correctness of commit info protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, TransactionState txnState,