diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index e84fdc34cc2003..967de0ae0e7e23 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -78,7 +78,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; // kafka type properties - public static final String KAFKA_ENDPOINT_PROPERTY = "kafka_endpoint"; + public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list"; public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic"; // optional public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; @@ -93,7 +93,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .build(); private static final ImmutableSet KAFKA_PROPERTIES_SET = new ImmutableSet.Builder() - .add(KAFKA_ENDPOINT_PROPERTY) + .add(KAFKA_BROKER_LIST_PROPERTY) .add(KAFKA_TOPIC_PROPERTY) .add(KAFKA_PARTITIONS_PROPERTY) .build(); @@ -110,7 +110,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private RoutineLoadDesc routineLoadDesc; private int desiredConcurrentNum; private int maxErrorNum; - private String kafkaEndpoint; + private String kafkaBrokerList; private String kafkaTopic; private List kafkaPartitions; @@ -121,7 +121,7 @@ public CreateRoutineLoadStmt(String name, TableName dbTableName, List this.dbTableName = dbTableName; this.loadPropertyList = loadPropertyList; this.properties = properties; - this.typeName = typeName; + this.typeName = typeName.toUpperCase(); this.customProperties = customProperties; } @@ -145,6 +145,7 @@ public Map getCustomProperties() { return customProperties; } + // nullable public RoutineLoadDesc getRoutineLoadDesc() { return routineLoadDesc; } @@ -157,8 +158,8 @@ public int getMaxErrorNum() { return maxErrorNum; } - public String getKafkaEndpoint() { - return kafkaEndpoint; + public String getKafkaBrokerList() { + return kafkaBrokerList; } public String getKafkaTopic() { @@ -176,6 +177,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { FeNameFormat.checkCommonName(NAME_TYPE, name); // check dbName and tableName checkDBTableName(); + dbTableName.analyze(analyzer); // check load properties include column separator etc. checkLoadProperties(analyzer); // check routine load properties include desired concurrent number etc. @@ -291,12 +293,16 @@ private void checkKafkaCustomProperties() throws AnalysisException { throw new AnalysisException(optional.get() + " is invalid kafka custom property"); } // check endpoint - kafkaEndpoint = customProperties.get(KAFKA_ENDPOINT_PROPERTY); - if (Strings.isNullOrEmpty(kafkaEndpoint)) { - throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " is required property"); + kafkaBrokerList = customProperties.get(KAFKA_BROKER_LIST_PROPERTY); + if (Strings.isNullOrEmpty(kafkaBrokerList)) { + throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + " is required property"); } - if (!Pattern.matches(ENDPOINT_REGEX, kafkaEndpoint)) { - throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " not match pattern " + ENDPOINT_REGEX); + String[] kafkaBrokerList = this.kafkaBrokerList.split(","); + for (String broker : kafkaBrokerList) { + if (!Pattern.matches(ENDPOINT_REGEX, broker)) { + throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + ":" + broker + + " not match pattern " + ENDPOINT_REGEX); + } } // check topic kafkaTopic = customProperties.get(KAFKA_TOPIC_PROPERTY); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 84ab3f4e919bcb..081edb6e5e48d4 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -100,6 +100,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -133,6 +134,8 @@ import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.load.routineload.RoutineLoadScheduler; +import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.meta.MetaContext; @@ -349,6 +352,10 @@ public class Catalog { private TabletChecker tabletChecker; + private RoutineLoadScheduler routineLoadScheduler; + + private RoutineLoadTaskScheduler routineLoadTaskScheduler; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -466,6 +473,9 @@ private Catalog() { this.stat = new TabletSchedulerStat(); this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); + + this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager); + this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager); } public static void destroyCheckpoint() { @@ -650,6 +660,10 @@ public void initialize(String[] args) throws Exception { txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second, Config.stream_load_default_timeout_second) * 100L); + // 8. start routine load scheduler + routineLoadScheduler.start(); + routineLoadTaskScheduler.start(); + } private void getClusterIdAndRole() throws IOException { diff --git a/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java b/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java index c0b8e8b8b19261..4faaca6d2a736d 100644 --- a/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java +++ b/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java @@ -28,6 +28,7 @@ public class RoutineLoadDesc { private final ColumnSeparator columnSeparator; private final LoadColumnsInfo columnsInfo; private final Expr wherePredicate; + // nullable private final List partitionNames; public RoutineLoadDesc(ColumnSeparator columnSeparator, LoadColumnsInfo columnsInfo, @@ -50,6 +51,7 @@ public Expr getWherePredicate() { return wherePredicate; } + // nullable public List getPartitionNames() { return partitionNames; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index fa1415f4bf3738..44790073575b1e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -53,7 +53,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private static final String FE_GROUP_ID = "fe_fetch_partitions"; private static final int FETCH_PARTITIONS_TIMEOUT = 10; - private String serverAddress; + private String brokerList; private String topic; // optional, user want to load partitions. private List customKafkaPartitions; @@ -63,9 +63,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // this is the kafka consumer which is used to fetch the number of partitions private KafkaConsumer consumer; - public KafkaRoutineLoadJob(String name, long dbId, long tableId, String serverAddress, String topic) { + public KafkaRoutineLoadJob(String name, long dbId, long tableId, String brokerList, String topic) { super(name, dbId, tableId, LoadDataSourceType.KAFKA); - this.serverAddress = serverAddress; + this.brokerList = brokerList; this.topic = topic; this.progress = new KafkaProgress(); this.customKafkaPartitions = new ArrayList<>(); @@ -78,11 +78,11 @@ public KafkaRoutineLoadJob(String name, long dbId, long tableId, String serverAd public KafkaRoutineLoadJob(long id, String name, long dbId, long tableId, RoutineLoadDesc routineLoadDesc, int desireTaskConcurrentNum, int maxErrorNum, - String serverAddress, String topic, KafkaProgress kafkaProgress) { + String brokerList, String topic, KafkaProgress kafkaProgress) { super(id, name, dbId, tableId, routineLoadDesc, desireTaskConcurrentNum, LoadDataSourceType.KAFKA, maxErrorNum); - this.serverAddress = serverAddress; + this.brokerList = brokerList; this.topic = topic; this.progress = kafkaProgress; this.customKafkaPartitions = new ArrayList<>(); @@ -94,32 +94,28 @@ public String getTopic() { return topic; } - public String getServerAddress() { - return serverAddress; + public String getBrokerList() { + return brokerList; } + // this is a unprotected method which is called in the initialization function private void setCustomKafkaPartitions(List kafkaPartitions) throws LoadException { - writeLock(); - try { - if (this.customKafkaPartitions.size() != 0) { - throw new LoadException("Kafka partitions have been initialized"); - } - // check if custom kafka partition is valid - List allKafkaPartitions = getAllKafkaPartitions(); - outter: - for (Integer customkafkaPartition : kafkaPartitions) { - for (Integer kafkaPartition : allKafkaPartitions) { - if (kafkaPartition.equals(customkafkaPartition)) { - continue outter; - } + if (this.customKafkaPartitions.size() != 0) { + throw new LoadException("Kafka partitions have been initialized"); + } + // check if custom kafka partition is valid + List allKafkaPartitions = getAllKafkaPartitions(); + outter: + for (Integer customkafkaPartition : kafkaPartitions) { + for (Integer kafkaPartition : allKafkaPartitions) { + if (kafkaPartition.equals(customkafkaPartition)) { + continue outter; } - throw new LoadException("there is a custom kafka partition " + customkafkaPartition - + " which is invalid for topic " + topic); } - this.customKafkaPartitions = kafkaPartitions; - } finally { - writeUnlock(); + throw new LoadException("there is a custom kafka partition " + customkafkaPartition + + " which is invalid for topic " + topic); } + this.customKafkaPartitions = kafkaPartitions; } @Override @@ -130,17 +126,10 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN if (state == JobState.NEED_SCHEDULE) { // divide kafkaPartitions into tasks for (int i = 0; i < currentConcurrentTaskNum; i++) { - try { - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id); - routineLoadTaskInfoList.add(kafkaTaskInfo); - needScheduleTaskInfoList.add(kafkaTaskInfo); - result.add(kafkaTaskInfo); - } catch (UserException e) { - LOG.error("failed to begin txn for kafka routine load task, change job state to failed"); - state = JobState.CANCELLED; - // TODO(ml): edit log - break; - } + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id); + routineLoadTaskInfoList.add(kafkaTaskInfo); + needScheduleTaskInfoList.add(kafkaTaskInfo); + result.add(kafkaTaskInfo); } if (result.size() != 0) { for (int i = 0; i < currentKafkaPartitions.size(); i++) { @@ -154,6 +143,8 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN } else { LOG.debug("Ignore to divide routine load job while job state {}", state); } + // save task into queue of needScheduleTasks + Catalog.getCurrentCatalog().getRoutineLoadManager().addTasksToNeedScheduleQueue(result); } finally { writeUnlock(); } @@ -162,14 +153,8 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN @Override public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { - updateCurrentKafkaPartitions(); SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo(); - Database db = Catalog.getCurrentCatalog().getDb(dbId); - if (db == null) { - LOG.warn("db {} is not exists from job {}", dbId, id); - throw new MetaNotFoundException("db " + dbId + " is not exists from job " + id); - } - int aliveBeNum = systemInfoService.getBackendIds(true).size(); + int aliveBeNum = systemInfoService.getClusterBackendIds(getClusterName(), true).size(); int partitionNum = currentKafkaPartitions.size(); if (desireTaskConcurrentNum == 0) { desireTaskConcurrentNum = partitionNum; @@ -178,16 +163,17 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { LOG.info("current concurrent task number is min " + "(current size of partition {}, desire task concurrent num {}, alive be num {})", partitionNum, desireTaskConcurrentNum, aliveBeNum); - return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)); + return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); } @Override - protected void updateProgress(RoutineLoadProgress progress) { - this.progress.update(progress); + protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + super.updateProgress(attachment); + this.progress.update(attachment.getProgress()); } @Override - protected RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, + protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { // remove old task routineLoadTaskInfoList.remove(routineLoadTaskInfo); @@ -198,22 +184,38 @@ protected RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) return kafkaTaskInfo; } + @Override + protected void executeUpdate() { + updateNewPartitionProgress(); + } + // if customKafkaPartition is not null, then return false immediately // else if kafka partitions of topic has been changed, return true. // else return false + // update current kafka partition at the same time + // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions @Override protected boolean needReschedule() { if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { + currentKafkaPartitions = customKafkaPartitions; return false; } else { - List newCurrentKafkaPartition = getAllKafkaPartitions(); + List newCurrentKafkaPartition; + try { + newCurrentKafkaPartition = getAllKafkaPartitions(); + } catch (Exception e) { + LOG.warn("Job {} failed to fetch all current partition", id); + return false; + } if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) { + currentKafkaPartitions = newCurrentKafkaPartition; return true; } else { return false; } } else { + currentKafkaPartitions = newCurrentKafkaPartition; return true; } @@ -232,12 +234,15 @@ private List getAllKafkaPartitions() { public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) throws AnalysisException, LoadException { - checkCreate(stmt); - // find dbId + // check db and table Database database = Catalog.getCurrentCatalog().getDb(stmt.getDBTableName().getDb()); - Table table; + if (database == null) { + throw new AnalysisException("There is no database named " + stmt.getDBTableName().getDb()); + } database.readLock(); + Table table; try { + unprotectCheckCreate(stmt); table = database.getTable(stmt.getDBTableName().getTbl()); } finally { database.readUnlock(); @@ -246,27 +251,19 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr // init kafka routine load job KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(stmt.getName(), database.getId(), table.getId(), - stmt.getKafkaEndpoint(), + stmt.getKafkaBrokerList(), stmt.getKafkaTopic()); kafkaRoutineLoadJob.setOptional(stmt); return kafkaRoutineLoadJob; } - // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions - private void updateCurrentKafkaPartitions() { - if (customKafkaPartitions == null || customKafkaPartitions.size() == 0) { - LOG.debug("All of partitions which belong to topic will be loaded for {} routine load job", name); - // fetch all of kafkaPartitions in topic - currentKafkaPartitions.addAll(getAllKafkaPartitions()); - } else { - currentKafkaPartitions = customKafkaPartitions; - } + private void updateNewPartitionProgress() { // update the progress of new partitions for (Integer kafkaPartition : currentKafkaPartitions) { - try { + if (((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) { ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition); - } catch (NullPointerException e) { + } else { ((KafkaProgress) progress).getPartitionIdToOffset().put(kafkaPartition, 0L); } } @@ -274,7 +271,7 @@ private void updateCurrentKafkaPartitions() { private void setConsumer() { Properties props = new Properties(); - props.put("bootstrap.servers", this.serverAddress); + props.put("bootstrap.servers", this.brokerList); props.put("group.id", FE_GROUP_ID); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 5ffd7c3b3dd8e6..d3ea5f5ae06ce6 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -20,22 +20,15 @@ import com.google.common.base.Joiner; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; -import org.apache.doris.planner.StreamLoadPlanner; -import org.apache.doris.task.KafkaRoutineLoadTask; -import org.apache.doris.task.RoutineLoadTask; -import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TKafkaLoadInfo; import org.apache.doris.thrift.TLoadSourceType; +import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TRoutineLoadTask; -import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; @@ -52,8 +45,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private List partitions; - public KafkaTaskInfo(UUID id, long jobId) throws LabelAlreadyUsedException, - BeginTransactionException, AnalysisException { + public KafkaTaskInfo(UUID id, long jobId) { super(id, jobId); this.partitions = new ArrayList<>(); } @@ -74,7 +66,7 @@ public List getPartitions() { // todo: reuse plan fragment of stream load @Override - public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException { + public TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException { KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId); Map partitionIdToOffset = Maps.newHashMap(); for (Integer partitionId : partitions) { @@ -96,7 +88,7 @@ public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, U tRoutineLoadTask.setTbl(database.getTable(routineLoadJob.getTableId()).getName()); StringBuilder stringBuilder = new StringBuilder(); // label = (serviceAddress_topic_partition1:offset_partition2:offset).hashcode() - String label = String.valueOf(stringBuilder.append(routineLoadJob.getServerAddress()).append("_") + String label = String.valueOf(stringBuilder.append(routineLoadJob.getBrokerList()).append("_") .append(routineLoadJob.getTopic()).append("_") .append(Joiner.on("_").withKeyValueSeparator(":") .join(partitionIdToOffset)).toString().hashCode()); @@ -104,21 +96,19 @@ public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, U tRoutineLoadTask.setAuth_code(routineLoadJob.getAuthCode()); TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo(); tKafkaLoadInfo.setTopic((routineLoadJob).getTopic()); - tKafkaLoadInfo.setBrokers((routineLoadJob).getServerAddress()); + tKafkaLoadInfo.setBrokers((routineLoadJob).getBrokerList()); tKafkaLoadInfo.setPartition_begin_offset(partitionIdToOffset); tRoutineLoadTask.setKafka_load_info(tKafkaLoadInfo); tRoutineLoadTask.setType(TLoadSourceType.KAFKA); - tRoutineLoadTask.setParams(createTExecPlanFragmentParams(routineLoadJob)); + tRoutineLoadTask.setParams(updateTExecPlanFragmentParams(routineLoadJob)); return tRoutineLoadTask; } - private TExecPlanFragmentParams createTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { - StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadTaskInfo(this); - Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId()); - StreamLoadPlanner planner = new StreamLoadPlanner(database, - (OlapTable) database.getTable(routineLoadJob.getTableId()), - streamLoadTask); - return planner.plan(); + private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { + TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams(); + TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); + tPlanFragment.getOutput_sink().getOlap_table_sink().setTxn_id(this.txnId); + return tExecPlanFragmentParams; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 886428d98a37b0..524c577bbfb8b9 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.IdGenerator; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -32,16 +31,14 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.TxnStateChangeListener; -import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendServiceImpl; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TLoadTxnCommitRequest; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.transaction.AbortTransactionException; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -57,9 +54,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.NoSuchElementException; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -76,6 +71,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; private static final int BASE_OF_ERROR_RATE = 10000; private static final String STAR_STRING = "*"; + protected static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3; /** * +-----------------+ @@ -132,6 +128,7 @@ public boolean isFinalState() { protected RoutineLoadProgress progress; protected String pausedReason; + protected String cancelReason; // currentErrorNum and currentTotalNum will be update // when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum @@ -213,17 +210,28 @@ public long getDbId() { return dbId; } - public String getDbFullName() { + public String getDbFullName() throws MetaNotFoundException { Database database = Catalog.getCurrentCatalog().getDb(dbId); - return database.getFullName(); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } + database.readLock(); + try { + return database.getFullName(); + } finally { + database.readUnlock(); + } } public long getTableId() { return tableId; } - public String getTableName() { + public String getTableName() throws MetaNotFoundException { Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } database.readLock(); try { Table table = database.getTable(tableId); @@ -237,24 +245,16 @@ public JobState getState() { return state; } - public void setState(JobState state) { - this.state = state; - } - public long getAuthCode() { return authCode; } + // this is a unprotected method which is called in the initialization function protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException { - writeLock(); - try { - if (this.routineLoadDesc != null) { - throw new LoadException("Routine load desc has been initialized"); - } - this.routineLoadDesc = routineLoadDesc; - } finally { - writeUnlock(); + if (this.routineLoadDesc != null) { + throw new LoadException("Routine load desc has been initialized"); } + this.routineLoadDesc = routineLoadDesc; } public RoutineLoadDesc getRoutineLoadDesc() { @@ -270,13 +270,28 @@ public RoutineLoadProgress getProgress() { } public String getPartitions() { - if (routineLoadDesc.getPartitionNames() == null || routineLoadDesc.getPartitionNames().size() == 0) { + if (routineLoadDesc == null + || routineLoadDesc.getPartitionNames() == null + || routineLoadDesc.getPartitionNames().size() == 0) { return STAR_STRING; } else { return String.join(",", routineLoadDesc.getPartitionNames()); } } + public String getClusterName() throws MetaNotFoundException { + Database database = Catalog.getCurrentCatalog().getDb(id); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } + database.readLock(); + try { + return database.getClusterName(); + } finally { + database.readUnlock(); + } + } + protected void setDesireTaskConcurrentNum(int desireTaskConcurrentNum) throws LoadException { writeLock(); try { @@ -323,13 +338,8 @@ public List getNeedScheduleTaskInfoList() { return needScheduleTaskInfoList; } - public void updateState(JobState jobState) { - writeLock(); - try { - state = jobState; - } finally { - writeUnlock(); - } + public TExecPlanFragmentParams gettExecPlanFragmentParams() { + return tExecPlanFragmentParams; } public List processTimeoutTasks() { @@ -355,7 +365,7 @@ public List processTimeoutTasks() { } try { - result.add(reNewTask(routineLoadTaskInfo)); + result.add(unprotectRenewTask(routineLoadTaskInfo)); LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS); } catch (UserException e) { @@ -397,7 +407,10 @@ public void removeNeedScheduleTask(RoutineLoadTaskInfo routineLoadTaskInfo) { } } - abstract void updateProgress(RoutineLoadProgress progress); + // if rate of error data is more then max_filter_ratio, pause job + protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows()); + } public boolean containsTask(String taskId) { readLock(); @@ -413,11 +426,6 @@ public boolean containsTask(String taskId) { private void checkStateTransform(RoutineLoadJob.JobState desireState) throws UnsupportedOperationException { switch (state) { - case RUNNING: - if (desireState == JobState.NEED_SCHEDULE) { - throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); - } - break; case PAUSED: if (desireState == JobState.PAUSED) { throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); @@ -461,9 +469,24 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) { } } - abstract RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, + abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException; + public void plan() throws UserException { + StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadJob(this); + Database database = Catalog.getCurrentCatalog().getDb(this.getDbId()); + + database.readLock(); + try { + StreamLoadPlanner planner = new StreamLoadPlanner(database, + (OlapTable) database.getTable(this.tableId), + streamLoadTask); + tExecPlanFragmentParams = planner.plan(); + } finally { + database.readUnlock(); + } + } + @Override public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) throws AbortTransactionException { @@ -472,6 +495,7 @@ public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusC if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { case TIMEOUT: + default: String taskId = txnState.getLabel(); if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().equals(taskId))) { throw new AbortTransactionException( @@ -497,29 +521,28 @@ public void onCommitted(TransactionState txnState) { Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() .filter(entity -> entity.getId().equals(txnState.getLabel())).findFirst(); - RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); - - // step2: update job progress - updateProgress(rlTaskTxnCommitAttachment.getProgress()); + if (routineLoadTaskInfoOptional.isPresent()) { + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); - // step4: if rate of error data is more then max_filter_ratio, pause job - updateNumOfData(rlTaskTxnCommitAttachment.getFilteredRows(), rlTaskTxnCommitAttachment.getLoadedRows()); + // step2: update job progress + updateProgress(rlTaskTxnCommitAttachment); - if (state == JobState.RUNNING) { - // step5: create a new task for partitions - RoutineLoadTaskInfo newRoutineLoadTaskInfo = reNewTask(routineLoadTaskInfo); - Catalog.getCurrentCatalog().getRoutineLoadManager() - .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); + if (state == JobState.RUNNING) { + // step3: create a new task for partitions + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadManager() + .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); + } + } else { + LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " + + " Transaction {} will not be committed", + txnState.getLabel(), txnState.getTransactionId()); } - } catch (NoSuchElementException e) { - LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " - + " Transaction {} will not be committed", - txnState.getLabel(), txnState.getTransactionId()); } catch (Throwable e) { LOG.error("failed to update offset in routine load task {} when transaction {} has been committed. " + "change job to paused", rlTaskTxnCommitAttachment.getTaskId(), txnState.getTransactionId(), e); - executePause("failed to update offset when transaction " + updateState(JobState.PAUSED, "failed to update offset when transaction " + txnState.getTransactionId() + " has been committed"); } finally { writeUnlock(); @@ -528,12 +551,12 @@ public void onCommitted(TransactionState txnState) { @Override public void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) { - pause(txnStatusChangeReason.name()); + updateState(JobState.PAUSED, txnStatusChangeReason.name()); LOG.debug("job {} need to be pause while txn {} abort with reason {}", id, txnState.getTransactionId(), txnStatusChangeReason.name()); } - protected static void checkCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { + protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { // check table belong to db, partitions belong to table if (stmt.getRoutineLoadDesc() == null) { checkDBSemantics(stmt.getDBTableName(), null); @@ -546,44 +569,54 @@ private static void checkDBSemantics(TableName dbTableName, List partiti throws AnalysisException { String tableName = dbTableName.getTbl(); String dbName = dbTableName.getDb(); - // check database + + // check table belong to database Database database = Catalog.getCurrentCatalog().getDb(dbName); - if (database == null) { - throw new AnalysisException("There is no database named " + dbName); + Table table = database.getTable(tableName); + if (table == null) { + throw new AnalysisException("There is no table named " + tableName + " in " + dbName); + } + // check table type + if (table.getType() != Table.TableType.OLAP) { + throw new AnalysisException("Only doris table support routine load"); } - database.readLock(); - try { - Table table = database.getTable(tableName); - // check table belong to database - if (table == null) { - throw new AnalysisException("There is no table named " + tableName + " in " + dbName); - } - // check table type - if (table.getType() != Table.TableType.OLAP) { - throw new AnalysisException("Only doris table support routine load"); - } - - if (partitionNames == null || partitionNames.size() == 0) { - return; - } - // check partitions belong to table - Optional partitionNotInTable = partitionNames.parallelStream() - .filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst(); - if (partitionNotInTable != null && partitionNotInTable.isPresent()) { - throw new AnalysisException("Partition " + partitionNotInTable.get() - + " does not belong to table " + tableName); - } - } finally { - database.readUnlock(); + if (partitionNames == null || partitionNames.size() == 0) { + return; + } + // check partitions belong to table + Optional partitionNotInTable = partitionNames.parallelStream() + .filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst(); + if (partitionNotInTable != null && partitionNotInTable.isPresent()) { + throw new AnalysisException("Partition " + partitionNotInTable.get() + + " does not belong to table " + tableName); } } - public void pause(String reason) { + public void updateState(JobState jobState) { + updateState(jobState, null); + } + + public void updateState(JobState jobState, String reason) { writeLock(); try { - checkStateTransform(JobState.PAUSED); - executePause(reason); + checkStateTransform(jobState); + switch (jobState) { + case PAUSED: + executePause(reason); + break; + case NEED_SCHEDULE: + executeNeedSchedule(); + break; + case STOPPED: + executeStop(); + break; + case CANCELLED: + executeCancel(reason); + break; + default: + break; + } } finally { writeUnlock(); } @@ -598,45 +631,56 @@ private void executePause(String reason) { needScheduleTaskInfoList.clear(); } - public void resume() { + private void executeNeedSchedule() { // TODO(ml): edit log - writeLock(); - try { - checkStateTransform(JobState.NEED_SCHEDULE); - state = JobState.NEED_SCHEDULE; - } finally { - writeUnlock(); - } + state = JobState.NEED_SCHEDULE; + routineLoadTaskInfoList.clear(); + needScheduleTaskInfoList.clear(); } - public void stop() { + private void executeStop() { // TODO(ml): edit log - writeLock(); + state = JobState.STOPPED; + routineLoadTaskInfoList.clear(); + needScheduleTaskInfoList.clear(); + } + + private void executeCancel(String reason) { + cancelReason = reason; + state = JobState.CANCELLED; + routineLoadTaskInfoList.clear(); + needScheduleTaskInfoList.clear(); + } + + public void update() { + // check if db and table exist + Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + LOG.info("The database {} has been deleted. Change {} job state to stopped", dbId, id); + updateState(JobState.STOPPED); + } + database.readLock(); try { - checkStateTransform(JobState.STOPPED); - state = JobState.STOPPED; - routineLoadTaskInfoList.clear(); - needScheduleTaskInfoList.clear(); + Table table = database.getTable(tableId); + // check table belong to database + if (table == null) { + LOG.info("The table {} has been deleted. Change {} job state to stopeed", tableId, id); + updateState(JobState.STOPPED); + } } finally { - writeUnlock(); + database.readUnlock(); } - } - public void reschedule() { + // check if partition has been changed if (needReschedule()) { - writeLock(); - try { - if (state == JobState.RUNNING) { - state = JobState.NEED_SCHEDULE; - routineLoadTaskInfoList.clear(); - needScheduleTaskInfoList.clear(); - } - } finally { - writeUnlock(); - } + executeUpdate(); + updateState(JobState.NEED_SCHEDULE); } } + protected void executeUpdate() { + } + protected boolean needReschedule() { return false; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 8f9b4f52a588df..4c5b938446d7ed 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -92,6 +92,10 @@ public Queue getNeedScheduleTasksQueue() { return needScheduleTasksQueue; } + public void addTasksToNeedScheduleQueue(List routineLoadTaskInfoList) { + needScheduleTasksQueue.addAll(routineLoadTaskInfoList); + } + private void updateBeIdToMaxConcurrentTasks() { beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); @@ -172,7 +176,7 @@ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) throws DdlException writeLock(); try { // check if db.routineLoadName has been used - if (isNameUsed(routineLoadJob.dbId, routineLoadJob.getName())) { + if (isNameUsed(routineLoadJob.getDbId(), routineLoadJob.getName())) { throw new DdlException("Name " + routineLoadJob.getName() + " already used in db " + routineLoadJob.getDbId()); } @@ -228,17 +232,26 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throw throw new DdlException("There is not routine load job with name " + pauseRoutineLoadStmt.getName()); } // check auth + String dbFullName; + String tableName; + try { + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); + } catch (MetaNotFoundException e) { + throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); + } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), + dbFullName, + tableName, PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + tableName); } - routineLoadJob.pause("User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job"); + routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, + "User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job"); } public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException, @@ -248,16 +261,24 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th throw new DdlException("There is not routine load job with name " + resumeRoutineLoadStmt.getName()); } // check auth + String dbFullName; + String tableName; + try { + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); + } catch (MetaNotFoundException e) { + throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); + } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), + dbFullName, + tableName, PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + tableName); } - routineLoadJob.resume(); + routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE); } public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException { @@ -266,16 +287,24 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws D throw new DdlException("There is not routine load job with name " + stopRoutineLoadStmt.getName()); } // check auth + String dbFullName; + String tableName; + try { + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); + } catch (MetaNotFoundException e) { + throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); + } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), + dbFullName, + tableName, PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + tableName); } - routineLoadJob.stop(); + routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED); } public int getSizeOfIdToRoutineLoadTask() { @@ -304,21 +333,26 @@ public int getClusterIdleSlotNum() { } } - public long getMinTaskBeId() throws LoadException { + public long getMinTaskBeId(String clusterName) throws LoadException { + List beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName); + if (beIdsInCluster == null) { + throw new LoadException("The " + clusterName + " has been deleted"); + } + readLock(); try { long result = -1L; int maxIdleSlotNum = 0; updateBeIdToMaxConcurrentTasks(); - for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { - if (beIdToConcurrentTasks.get(entry.getKey()) == null) { - result = maxIdleSlotNum < entry.getValue() ? entry.getKey() : result; - maxIdleSlotNum = Math.max(maxIdleSlotNum, entry.getValue()); - } else { - int idelTaskNum = entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); - result = maxIdleSlotNum < idelTaskNum ? entry.getKey() : result; - maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum); - } + for (Long beId : beIdsInCluster) { + int idleTaskNum = 0; + if (beIdToConcurrentTasks.containsKey(beId)) { + idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); + } else { + idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; + } + result = maxIdleSlotNum < idleTaskNum ? beId : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); } if (result < 0) { throw new LoadException("There is no empty slot in cluster"); @@ -369,17 +403,12 @@ public RoutineLoadJob getJobByTaskId(String taskId) throws MetaNotFoundException throw new MetaNotFoundException("could not found task by id " + taskId); } - public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException { - List jobs = new ArrayList<>(); - Collection stateJobs = null; + public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) { LOG.debug("begin to get routine load job by state {}", jobState.name()); - stateJobs = idToRoutineLoadJob.values().stream() + List stateJobs = idToRoutineLoadJob.values().stream() .filter(entity -> entity.getState() == jobState).collect(Collectors.toList()); - if (stateJobs != null) { - jobs.addAll(stateJobs); - LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name()); - } - return jobs; + LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name()); + return stateJobs; } public List processTimeoutTasks() { @@ -397,9 +426,9 @@ public void removeOldRoutineLoadJobs() { // TODO(ml): remove old routine load job } - public void rescheduleRoutineLoadJob() { + public void updateRoutineLoadJob() { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - routineLoadJob.reschedule(); + routineLoadJob.update(); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 0757376f58cb78..e8e744d22848cf 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.common.annotations.VisibleForTesting; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -30,8 +31,20 @@ public class RoutineLoadScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class); + private static final int DEFAULT_INTERVAL_SECONDS = 10; - private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); + private RoutineLoadManager routineLoadManager; + + @VisibleForTesting + public RoutineLoadScheduler() { + super(); + routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); + } + + public RoutineLoadScheduler(RoutineLoadManager routineLoadManager) { + super("Routine load", DEFAULT_INTERVAL_SECONDS * 1000); + this.routineLoadManager = routineLoadManager; + } @Override protected void runOneCycle() { @@ -44,7 +57,7 @@ protected void runOneCycle() { private void process() { // update - routineLoadManager.rescheduleRoutineLoadJob(); + routineLoadManager.updateRoutineLoadJob(); // get need schedule routine jobs List routineLoadJobList = null; try { @@ -53,9 +66,11 @@ private void process() { LOG.error("failed to get need schedule routine jobs"); } - LOG.debug("there are {} job need schedule", routineLoadJobList.size()); + LOG.info("there are {} job need schedule", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { try { + // create plan of routine load job + routineLoadJob.plan(); // judge nums of tasks more then max concurrent tasks of cluster int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getSizeOfIdToRoutineLoadTask(); @@ -68,20 +83,21 @@ private void process() { totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); break; } - // divide job into tasks - List needScheduleTasksList = - routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); - // save task into queue of needScheduleTasks - routineLoadManager.getNeedScheduleTasksQueue().addAll(needScheduleTasksList); + // check state and divide job into tasks + routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); } catch (MetaNotFoundException e) { - routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED); + routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED, e.getMessage()); + } catch (Throwable e) { + LOG.warn("failed to scheduler job, change job state to paused", e); + routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, e.getMessage()); + continue; } } LOG.debug("begin to check timeout tasks"); // check timeout tasks List rescheduleTasksList = routineLoadManager.processTimeoutTasks(); - routineLoadManager.getNeedScheduleTasksQueue().addAll(rescheduleTasksList); + routineLoadManager.addTasksToNeedScheduleQueue(rescheduleTasksList); } private List getNeedScheduleRoutineJobs() throws LoadException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index aae26a7fb87fb0..3fa4ce1efccda5 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -47,17 +47,11 @@ public abstract class RoutineLoadTaskInfo { private long createTimeMs; private long loadStartTimeMs; private TExecPlanFragmentParams tExecPlanFragmentParams; - - public RoutineLoadTaskInfo(UUID id, long jobId) throws BeginTransactionException, - LabelAlreadyUsedException, AnalysisException { + + public RoutineLoadTaskInfo(UUID id, long jobId) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); - // begin a txn for task - RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); - txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - routineLoadJob.getDbId(), id.toString(), -1, "streamLoad", - TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob); } public UUID getId() { @@ -79,8 +73,16 @@ public long getLoadStartTimeMs() { public long getTxnId() { return txnId; } - - abstract TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException; + + abstract TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException; + + public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + // begin a txn for task + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); + txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( + routineLoadJob.getDbId(), id.toString(), -1, "streamLoad", + TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob); + } @Override public boolean equals(Object obj) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 3c380043895e47..9141c2e8f3ed7f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -17,6 +17,9 @@ package org.apache.doris.load.routineload; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.ClientPool; import org.apache.doris.common.LoadException; @@ -28,9 +31,6 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TRoutineLoadTask; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,12 +52,16 @@ public class RoutineLoadTaskScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class); private RoutineLoadManager routineLoadManager; - private LinkedBlockingQueue needScheduleTasksQueue; + @VisibleForTesting public RoutineLoadTaskScheduler() { super("routine load task", 0); - routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); - needScheduleTasksQueue = (LinkedBlockingQueue) routineLoadManager.getNeedScheduleTasksQueue(); + this.routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); + } + + public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) { + super("routine load task", 0); + this.routineLoadManager = routineLoadManager; } @Override @@ -70,7 +74,9 @@ protected void runOneCycle() { } } - private void process() throws LoadException, UserException { + private void process() throws LoadException, UserException, InterruptedException { + LinkedBlockingQueue needScheduleTasksQueue = + (LinkedBlockingQueue) routineLoadManager.getNeedScheduleTasksQueue(); // update current beIdMaps for tasks routineLoadManager.updateBeIdTaskMaps(); @@ -83,17 +89,13 @@ private void process() throws LoadException, UserException { int scheduledTaskNum = 0; // get idle be task num // allocate task to be + if (needScheduleTaskNum == 0) { + Thread.sleep(1000); + return; + } while (needScheduleTaskNum > 0) { - RoutineLoadTaskInfo routineLoadTaskInfo = null; - try { - routineLoadTaskInfo = needScheduleTasksQueue.take(); - } catch (InterruptedException e) { - LOG.warn("Taking routine load task from queue has been interrupted with error msg {}", - e.getMessage()); - return; - } - - long beId = routineLoadManager.getMinTaskBeId(); + // allocate be to task and begin transaction for task + RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.peek(); RoutineLoadJob routineLoadJob = null; try { routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId().toString()); @@ -101,7 +103,27 @@ private void process() throws LoadException, UserException { LOG.warn("task {} has been abandoned", routineLoadTaskInfo.getId()); return; } - TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(beId); + long beId; + try { + beId = routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName()); + routineLoadTaskInfo.beginTxn(); + } catch (Exception e) { + LOG.warn("put task to the rear of queue with error " + e.getMessage()); + needScheduleTasksQueue.take(); + needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTaskNum--; + continue; + } + + // task to thrift + try { + routineLoadTaskInfo = needScheduleTasksQueue.take(); + } catch (InterruptedException e) { + LOG.warn("Taking routine load task from queue has been interrupted with error msg {}", + e.getMessage()); + return; + } + TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(); // remove task for needScheduleTasksList in job routineLoadJob.removeNeedScheduleTask(routineLoadTaskInfo); routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); @@ -123,7 +145,6 @@ private void process() throws LoadException, UserException { LOG.info("{} tasks have bean allocated to be.", scheduledTaskNum); } - // todo: change to batch submit and reuse client private void submitBatchTask(Map> beIdToRoutineLoadTask) { for (Map.Entry> entry : beIdToRoutineLoadTask.entrySet()) { Backend backend = Catalog.getCurrentSystemInfo().getBackend(entry.getKey()); @@ -133,7 +154,6 @@ private void submitBatchTask(Map> beIdToRoutineLoad try { client = ClientPool.backendPool.borrowObject(address); client.submit_routine_load_task(entry.getValue()); - ok = true; } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backend.getId(), e); diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 570d718fdcbcaa..0bc7a8c74ae18d 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -80,6 +80,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.proc.BackendsProcDir; import org.apache.doris.common.proc.FrontendsProcNode; @@ -801,14 +802,22 @@ private void handleShowRoutineLoad() throws AnalysisException { } // check auth + String dbFullName; + String tableName; + try { + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); + } catch (MetaNotFoundException e) { + throw new AnalysisException("The metadata of job has been changed. The job will be cancelled automatically", e); + } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), + dbFullName, + tableName, PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + tableName); } // get routine load info diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 30d9a4896b628f..b37d16b2ae19e2 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -45,6 +45,7 @@ import java.io.StringReader; import java.util.Map; +import java.util.UUID; public class StreamLoadTask { @@ -131,19 +132,18 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws } } - public static StreamLoadTask fromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo) { - TUniqueId queryId = new TUniqueId(routineLoadTaskInfo.getId().getMostSignificantBits(), - routineLoadTaskInfo.getId().getLeastSignificantBits()); - StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, routineLoadTaskInfo.getTxnId(), + // the taskId and txnId is faked + public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), + taskId.getLeastSignificantBits()); + StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, -1L, TFileType.FILE_STREAM, TFileFormatType.FORMAT_CSV_PLAIN); - RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); - streamLoadTask.setOptionalFromRoutineLoadTaskInfo(routineLoadTaskInfo, - routineLoadManager.getJob(routineLoadTaskInfo.getJobId())); + streamLoadTask.setOptionalFromRoutineLoadJob(routineLoadJob); return streamLoadTask; } - private void setOptionalFromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo, - RoutineLoadJob routineLoadJob) { + private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) { if (routineLoadJob.getRoutineLoadDesc() != null) { RoutineLoadDesc routineLoadDesc = routineLoadJob.getRoutineLoadDesc(); if (routineLoadDesc.getColumnsInfo() != null) { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index d39c0e085c7677..b7e814ba48a647 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -317,6 +317,8 @@ public void setTransactionStatus(TransactionStatus transactionStatus, TxnStatusC case ABORTED: txnStateChangeListener.beforeAborted(this, txnStatusChangeReason); break; + default: + break; } } @@ -426,10 +428,6 @@ public boolean isPublishTimeout() { return System.currentTimeMillis() - publishVersionTime > timeoutMillis; } - public void setTxnStateChangeListener(TxnStateChangeListener txnStateChangeListener) { - this.txnStateChangeListener = txnStateChangeListener; - } - public TxnStateChangeListener getTxnStateChangeListener() { return txnStateChangeListener; } diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index 6b210b1d9c4b15..dc17df77deee1c 100644 --- a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -63,7 +63,7 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro Map customProperties = Maps.newHashMap(); customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, @@ -109,7 +109,7 @@ public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { Map customProperties = Maps.newHashMap(); customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, @@ -129,7 +129,7 @@ public void analyze(Analyzer analyzer1) { Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames()); Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum()); Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum()); - Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaEndpoint()); + Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaBrokerList()); Assert.assertEquals(topicName, createRoutineLoadStmt.getKafkaTopic()); Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(createRoutineLoadStmt.getKafkaPartitions())); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 76a29589fd544f..366567f4a9dcea 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -367,7 +367,7 @@ private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { Map customProperties = Maps.newHashMap(); customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 8681411a857c7d..c78908b697892f 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -76,7 +76,7 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, String topicName = "topic1"; customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, loadPropertyList, properties, typeName, customProperties); @@ -142,7 +142,7 @@ public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, String topicName = "topic1"; customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, loadPropertyList, properties, typeName, customProperties); @@ -236,7 +236,7 @@ public void testGetMinTaskBeId() throws LoadException { new Expectations() { { - systemInfoService.getBackendIds(true); + systemInfoService.getClusterBackendIds(anyString, true); result = beIds; Catalog.getCurrentSystemInfo(); result = systemInfoService; @@ -245,7 +245,7 @@ public void testGetMinTaskBeId() throws LoadException { RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addNumOfConcurrentTasksByBeId(1L); - Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId()); + Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId("default")); } @Test diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 98578f0e729c5d..ccd7023642592e 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -77,7 +77,7 @@ public void testNormalRunOneCycle(@Mocked KafkaConsumer consumer, new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, 1L, routineLoadDesc ,3, 0, "", "", new KafkaProgress()); - routineLoadJob.setState(RoutineLoadJob.JobState.NEED_SCHEDULE); + Deencapsulation.setField(routineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE); List routineLoadJobList = new ArrayList<>(); routineLoadJobList.add(routineLoadJob); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 05d26e31b28426..824cd29193d4be 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -113,7 +113,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 routineLoadManager.getNeedScheduleTasksQueue(); result = routineLoadTaskInfoQueue; - routineLoadManager.getMinTaskBeId(); + routineLoadManager.getMinTaskBeId(anyString); result = beId; routineLoadManager.getJobByTaskId(anyString); result = kafkaRoutineLoadJob1; diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 5afdff7accdf5c..2768fabef2fe39 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -321,7 +321,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); transactionState.setTransactionStatus(TransactionStatus.PREPARE); - transactionState.setTxnStateChangeListener(routineLoadJob); + Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); @@ -330,7 +330,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet KafkaProgress oldkafkaProgress = new KafkaProgress(); oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); - routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); rlTaskTxnCommitAttachment.setId(new TUniqueId()); @@ -395,7 +395,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); transactionState.setTransactionStatus(TransactionStatus.PREPARE); - transactionState.setTxnStateChangeListener(routineLoadJob); + Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); @@ -404,7 +404,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi KafkaProgress oldkafkaProgress = new KafkaProgress(); oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); - routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); rlTaskTxnCommitAttachment.setId(new TUniqueId());