From 6b4dda9aeb4dd3345ac9f49c587bb9b06b5f59ce Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Tue, 5 Mar 2019 15:47:22 +0800 Subject: [PATCH 1/2] Put begin txn into task scheduler 1. fix the nesting lock of db and txn 2. the txn of task will be init in task scheduler before take task from queue --- .../doris/analysis/CreateRoutineLoadStmt.java | 28 +- .../org/apache/doris/catalog/Catalog.java | 31 ++ .../apache/doris/load/RoutineLoadDesc.java | 2 + .../load/routineload/KafkaRoutineLoadJob.java | 125 ++++---- .../doris/load/routineload/KafkaTaskInfo.java | 32 +-- .../load/routineload/RoutineLoadJob.java | 270 ++++++++++-------- .../load/routineload/RoutineLoadManager.java | 117 ++++---- .../routineload/RoutineLoadScheduler.java | 36 ++- .../load/routineload/RoutineLoadTaskInfo.java | 22 +- .../routineload/RoutineLoadTaskScheduler.java | 57 ++-- .../org/apache/doris/qe/ShowExecutor.java | 21 +- .../org/apache/doris/task/StreamLoadTask.java | 18 +- .../doris/transaction/TransactionState.java | 6 +- .../analysis/CreateRoutineLoadStmtTest.java | 6 +- .../routineload/KafkaRoutineLoadJobTest.java | 2 +- .../routineload/RoutineLoadManagerTest.java | 8 +- .../routineload/RoutineLoadSchedulerTest.java | 2 +- .../RoutineLoadTaskSchedulerTest.java | 2 +- .../transaction/GlobalTransactionMgrTest.java | 8 +- 19 files changed, 463 insertions(+), 330 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index e84fdc34cc2003..967de0ae0e7e23 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -78,7 +78,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; // kafka type properties - public static final String KAFKA_ENDPOINT_PROPERTY = "kafka_endpoint"; + public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list"; public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic"; // optional public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; @@ -93,7 +93,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .build(); private static final ImmutableSet KAFKA_PROPERTIES_SET = new ImmutableSet.Builder() - .add(KAFKA_ENDPOINT_PROPERTY) + .add(KAFKA_BROKER_LIST_PROPERTY) .add(KAFKA_TOPIC_PROPERTY) .add(KAFKA_PARTITIONS_PROPERTY) .build(); @@ -110,7 +110,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private RoutineLoadDesc routineLoadDesc; private int desiredConcurrentNum; private int maxErrorNum; - private String kafkaEndpoint; + private String kafkaBrokerList; private String kafkaTopic; private List kafkaPartitions; @@ -121,7 +121,7 @@ public CreateRoutineLoadStmt(String name, TableName dbTableName, List this.dbTableName = dbTableName; this.loadPropertyList = loadPropertyList; this.properties = properties; - this.typeName = typeName; + this.typeName = typeName.toUpperCase(); this.customProperties = customProperties; } @@ -145,6 +145,7 @@ public Map getCustomProperties() { return customProperties; } + // nullable public RoutineLoadDesc getRoutineLoadDesc() { return routineLoadDesc; } @@ -157,8 +158,8 @@ public int getMaxErrorNum() { return maxErrorNum; } - public String getKafkaEndpoint() { - return kafkaEndpoint; + public String getKafkaBrokerList() { + return kafkaBrokerList; } public String getKafkaTopic() { @@ -176,6 +177,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { FeNameFormat.checkCommonName(NAME_TYPE, name); // check dbName and tableName checkDBTableName(); + dbTableName.analyze(analyzer); // check load properties include column separator etc. checkLoadProperties(analyzer); // check routine load properties include desired concurrent number etc. @@ -291,12 +293,16 @@ private void checkKafkaCustomProperties() throws AnalysisException { throw new AnalysisException(optional.get() + " is invalid kafka custom property"); } // check endpoint - kafkaEndpoint = customProperties.get(KAFKA_ENDPOINT_PROPERTY); - if (Strings.isNullOrEmpty(kafkaEndpoint)) { - throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " is required property"); + kafkaBrokerList = customProperties.get(KAFKA_BROKER_LIST_PROPERTY); + if (Strings.isNullOrEmpty(kafkaBrokerList)) { + throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + " is required property"); } - if (!Pattern.matches(ENDPOINT_REGEX, kafkaEndpoint)) { - throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " not match pattern " + ENDPOINT_REGEX); + String[] kafkaBrokerList = this.kafkaBrokerList.split(","); + for (String broker : kafkaBrokerList) { + if (!Pattern.matches(ENDPOINT_REGEX, broker)) { + throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + ":" + broker + + " not match pattern " + ENDPOINT_REGEX); + } } // check topic kafkaTopic = customProperties.get(KAFKA_TOPIC_PROPERTY); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 84ab3f4e919bcb..cb5f4f0ba264b9 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -100,6 +100,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -133,6 +134,8 @@ import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.load.routineload.RoutineLoadScheduler; +import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.meta.MetaContext; @@ -349,6 +352,10 @@ public class Catalog { private TabletChecker tabletChecker; + private RoutineLoadScheduler routineLoadScheduler; + + private RoutineLoadTaskScheduler routineLoadTaskScheduler; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -466,6 +473,9 @@ private Catalog() { this.stat = new TabletSchedulerStat(); this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); + + this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager); + this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager); } public static void destroyCheckpoint() { @@ -650,6 +660,10 @@ public void initialize(String[] args) throws Exception { txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second, Config.stream_load_default_timeout_second) * 100L); + // 8. start routine load scheduler + routineLoadScheduler.start(); + routineLoadTaskScheduler.start(); + } private void getClusterIdAndRole() throws IOException { @@ -6065,5 +6079,22 @@ public void replayBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) { replica.setBad(backendTabletsInfo.isBad()); } } + + public List getBackendIdsByCluster(String clusterName) throws MetaNotFoundException { + if (nameToCluster.containsKey(clusterName)) { + Cluster cluster = nameToCluster.get(clusterName); + if (cluster == null) { + throw new MetaNotFoundException("Cluster " + clusterName + "has been deleted"); + } + tryLock(true); + try { + return cluster.getBackendIdList(); + } finally { + unlock(); + } + } else { + throw new MetaNotFoundException("Cluster " + clusterName + "has been deleted"); + } + } } diff --git a/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java b/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java index c0b8e8b8b19261..4faaca6d2a736d 100644 --- a/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java +++ b/fe/src/main/java/org/apache/doris/load/RoutineLoadDesc.java @@ -28,6 +28,7 @@ public class RoutineLoadDesc { private final ColumnSeparator columnSeparator; private final LoadColumnsInfo columnsInfo; private final Expr wherePredicate; + // nullable private final List partitionNames; public RoutineLoadDesc(ColumnSeparator columnSeparator, LoadColumnsInfo columnsInfo, @@ -50,6 +51,7 @@ public Expr getWherePredicate() { return wherePredicate; } + // nullable public List getPartitionNames() { return partitionNames; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index fa1415f4bf3738..44790073575b1e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -53,7 +53,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private static final String FE_GROUP_ID = "fe_fetch_partitions"; private static final int FETCH_PARTITIONS_TIMEOUT = 10; - private String serverAddress; + private String brokerList; private String topic; // optional, user want to load partitions. private List customKafkaPartitions; @@ -63,9 +63,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // this is the kafka consumer which is used to fetch the number of partitions private KafkaConsumer consumer; - public KafkaRoutineLoadJob(String name, long dbId, long tableId, String serverAddress, String topic) { + public KafkaRoutineLoadJob(String name, long dbId, long tableId, String brokerList, String topic) { super(name, dbId, tableId, LoadDataSourceType.KAFKA); - this.serverAddress = serverAddress; + this.brokerList = brokerList; this.topic = topic; this.progress = new KafkaProgress(); this.customKafkaPartitions = new ArrayList<>(); @@ -78,11 +78,11 @@ public KafkaRoutineLoadJob(String name, long dbId, long tableId, String serverAd public KafkaRoutineLoadJob(long id, String name, long dbId, long tableId, RoutineLoadDesc routineLoadDesc, int desireTaskConcurrentNum, int maxErrorNum, - String serverAddress, String topic, KafkaProgress kafkaProgress) { + String brokerList, String topic, KafkaProgress kafkaProgress) { super(id, name, dbId, tableId, routineLoadDesc, desireTaskConcurrentNum, LoadDataSourceType.KAFKA, maxErrorNum); - this.serverAddress = serverAddress; + this.brokerList = brokerList; this.topic = topic; this.progress = kafkaProgress; this.customKafkaPartitions = new ArrayList<>(); @@ -94,32 +94,28 @@ public String getTopic() { return topic; } - public String getServerAddress() { - return serverAddress; + public String getBrokerList() { + return brokerList; } + // this is a unprotected method which is called in the initialization function private void setCustomKafkaPartitions(List kafkaPartitions) throws LoadException { - writeLock(); - try { - if (this.customKafkaPartitions.size() != 0) { - throw new LoadException("Kafka partitions have been initialized"); - } - // check if custom kafka partition is valid - List allKafkaPartitions = getAllKafkaPartitions(); - outter: - for (Integer customkafkaPartition : kafkaPartitions) { - for (Integer kafkaPartition : allKafkaPartitions) { - if (kafkaPartition.equals(customkafkaPartition)) { - continue outter; - } + if (this.customKafkaPartitions.size() != 0) { + throw new LoadException("Kafka partitions have been initialized"); + } + // check if custom kafka partition is valid + List allKafkaPartitions = getAllKafkaPartitions(); + outter: + for (Integer customkafkaPartition : kafkaPartitions) { + for (Integer kafkaPartition : allKafkaPartitions) { + if (kafkaPartition.equals(customkafkaPartition)) { + continue outter; } - throw new LoadException("there is a custom kafka partition " + customkafkaPartition - + " which is invalid for topic " + topic); } - this.customKafkaPartitions = kafkaPartitions; - } finally { - writeUnlock(); + throw new LoadException("there is a custom kafka partition " + customkafkaPartition + + " which is invalid for topic " + topic); } + this.customKafkaPartitions = kafkaPartitions; } @Override @@ -130,17 +126,10 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN if (state == JobState.NEED_SCHEDULE) { // divide kafkaPartitions into tasks for (int i = 0; i < currentConcurrentTaskNum; i++) { - try { - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id); - routineLoadTaskInfoList.add(kafkaTaskInfo); - needScheduleTaskInfoList.add(kafkaTaskInfo); - result.add(kafkaTaskInfo); - } catch (UserException e) { - LOG.error("failed to begin txn for kafka routine load task, change job state to failed"); - state = JobState.CANCELLED; - // TODO(ml): edit log - break; - } + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id); + routineLoadTaskInfoList.add(kafkaTaskInfo); + needScheduleTaskInfoList.add(kafkaTaskInfo); + result.add(kafkaTaskInfo); } if (result.size() != 0) { for (int i = 0; i < currentKafkaPartitions.size(); i++) { @@ -154,6 +143,8 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN } else { LOG.debug("Ignore to divide routine load job while job state {}", state); } + // save task into queue of needScheduleTasks + Catalog.getCurrentCatalog().getRoutineLoadManager().addTasksToNeedScheduleQueue(result); } finally { writeUnlock(); } @@ -162,14 +153,8 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN @Override public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { - updateCurrentKafkaPartitions(); SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo(); - Database db = Catalog.getCurrentCatalog().getDb(dbId); - if (db == null) { - LOG.warn("db {} is not exists from job {}", dbId, id); - throw new MetaNotFoundException("db " + dbId + " is not exists from job " + id); - } - int aliveBeNum = systemInfoService.getBackendIds(true).size(); + int aliveBeNum = systemInfoService.getClusterBackendIds(getClusterName(), true).size(); int partitionNum = currentKafkaPartitions.size(); if (desireTaskConcurrentNum == 0) { desireTaskConcurrentNum = partitionNum; @@ -178,16 +163,17 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { LOG.info("current concurrent task number is min " + "(current size of partition {}, desire task concurrent num {}, alive be num {})", partitionNum, desireTaskConcurrentNum, aliveBeNum); - return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)); + return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); } @Override - protected void updateProgress(RoutineLoadProgress progress) { - this.progress.update(progress); + protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + super.updateProgress(attachment); + this.progress.update(attachment.getProgress()); } @Override - protected RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, + protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { // remove old task routineLoadTaskInfoList.remove(routineLoadTaskInfo); @@ -198,22 +184,38 @@ protected RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) return kafkaTaskInfo; } + @Override + protected void executeUpdate() { + updateNewPartitionProgress(); + } + // if customKafkaPartition is not null, then return false immediately // else if kafka partitions of topic has been changed, return true. // else return false + // update current kafka partition at the same time + // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions @Override protected boolean needReschedule() { if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { + currentKafkaPartitions = customKafkaPartitions; return false; } else { - List newCurrentKafkaPartition = getAllKafkaPartitions(); + List newCurrentKafkaPartition; + try { + newCurrentKafkaPartition = getAllKafkaPartitions(); + } catch (Exception e) { + LOG.warn("Job {} failed to fetch all current partition", id); + return false; + } if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) { + currentKafkaPartitions = newCurrentKafkaPartition; return true; } else { return false; } } else { + currentKafkaPartitions = newCurrentKafkaPartition; return true; } @@ -232,12 +234,15 @@ private List getAllKafkaPartitions() { public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) throws AnalysisException, LoadException { - checkCreate(stmt); - // find dbId + // check db and table Database database = Catalog.getCurrentCatalog().getDb(stmt.getDBTableName().getDb()); - Table table; + if (database == null) { + throw new AnalysisException("There is no database named " + stmt.getDBTableName().getDb()); + } database.readLock(); + Table table; try { + unprotectCheckCreate(stmt); table = database.getTable(stmt.getDBTableName().getTbl()); } finally { database.readUnlock(); @@ -246,27 +251,19 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr // init kafka routine load job KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(stmt.getName(), database.getId(), table.getId(), - stmt.getKafkaEndpoint(), + stmt.getKafkaBrokerList(), stmt.getKafkaTopic()); kafkaRoutineLoadJob.setOptional(stmt); return kafkaRoutineLoadJob; } - // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions - private void updateCurrentKafkaPartitions() { - if (customKafkaPartitions == null || customKafkaPartitions.size() == 0) { - LOG.debug("All of partitions which belong to topic will be loaded for {} routine load job", name); - // fetch all of kafkaPartitions in topic - currentKafkaPartitions.addAll(getAllKafkaPartitions()); - } else { - currentKafkaPartitions = customKafkaPartitions; - } + private void updateNewPartitionProgress() { // update the progress of new partitions for (Integer kafkaPartition : currentKafkaPartitions) { - try { + if (((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) { ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition); - } catch (NullPointerException e) { + } else { ((KafkaProgress) progress).getPartitionIdToOffset().put(kafkaPartition, 0L); } } @@ -274,7 +271,7 @@ private void updateCurrentKafkaPartitions() { private void setConsumer() { Properties props = new Properties(); - props.put("bootstrap.servers", this.serverAddress); + props.put("bootstrap.servers", this.brokerList); props.put("group.id", FE_GROUP_ID); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 5ffd7c3b3dd8e6..d3ea5f5ae06ce6 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -20,22 +20,15 @@ import com.google.common.base.Joiner; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; -import org.apache.doris.planner.StreamLoadPlanner; -import org.apache.doris.task.KafkaRoutineLoadTask; -import org.apache.doris.task.RoutineLoadTask; -import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TKafkaLoadInfo; import org.apache.doris.thrift.TLoadSourceType; +import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TRoutineLoadTask; -import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; @@ -52,8 +45,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private List partitions; - public KafkaTaskInfo(UUID id, long jobId) throws LabelAlreadyUsedException, - BeginTransactionException, AnalysisException { + public KafkaTaskInfo(UUID id, long jobId) { super(id, jobId); this.partitions = new ArrayList<>(); } @@ -74,7 +66,7 @@ public List getPartitions() { // todo: reuse plan fragment of stream load @Override - public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException { + public TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException { KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId); Map partitionIdToOffset = Maps.newHashMap(); for (Integer partitionId : partitions) { @@ -96,7 +88,7 @@ public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, U tRoutineLoadTask.setTbl(database.getTable(routineLoadJob.getTableId()).getName()); StringBuilder stringBuilder = new StringBuilder(); // label = (serviceAddress_topic_partition1:offset_partition2:offset).hashcode() - String label = String.valueOf(stringBuilder.append(routineLoadJob.getServerAddress()).append("_") + String label = String.valueOf(stringBuilder.append(routineLoadJob.getBrokerList()).append("_") .append(routineLoadJob.getTopic()).append("_") .append(Joiner.on("_").withKeyValueSeparator(":") .join(partitionIdToOffset)).toString().hashCode()); @@ -104,21 +96,19 @@ public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, U tRoutineLoadTask.setAuth_code(routineLoadJob.getAuthCode()); TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo(); tKafkaLoadInfo.setTopic((routineLoadJob).getTopic()); - tKafkaLoadInfo.setBrokers((routineLoadJob).getServerAddress()); + tKafkaLoadInfo.setBrokers((routineLoadJob).getBrokerList()); tKafkaLoadInfo.setPartition_begin_offset(partitionIdToOffset); tRoutineLoadTask.setKafka_load_info(tKafkaLoadInfo); tRoutineLoadTask.setType(TLoadSourceType.KAFKA); - tRoutineLoadTask.setParams(createTExecPlanFragmentParams(routineLoadJob)); + tRoutineLoadTask.setParams(updateTExecPlanFragmentParams(routineLoadJob)); return tRoutineLoadTask; } - private TExecPlanFragmentParams createTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { - StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadTaskInfo(this); - Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId()); - StreamLoadPlanner planner = new StreamLoadPlanner(database, - (OlapTable) database.getTable(routineLoadJob.getTableId()), - streamLoadTask); - return planner.plan(); + private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { + TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams(); + TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); + tPlanFragment.getOutput_sink().getOlap_table_sink().setTxn_id(this.txnId); + return tExecPlanFragmentParams; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 886428d98a37b0..524c577bbfb8b9 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.IdGenerator; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -32,16 +31,14 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.TxnStateChangeListener; -import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendServiceImpl; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TLoadTxnCommitRequest; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.transaction.AbortTransactionException; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -57,9 +54,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.NoSuchElementException; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -76,6 +71,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; private static final int BASE_OF_ERROR_RATE = 10000; private static final String STAR_STRING = "*"; + protected static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3; /** * +-----------------+ @@ -132,6 +128,7 @@ public boolean isFinalState() { protected RoutineLoadProgress progress; protected String pausedReason; + protected String cancelReason; // currentErrorNum and currentTotalNum will be update // when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum @@ -213,17 +210,28 @@ public long getDbId() { return dbId; } - public String getDbFullName() { + public String getDbFullName() throws MetaNotFoundException { Database database = Catalog.getCurrentCatalog().getDb(dbId); - return database.getFullName(); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } + database.readLock(); + try { + return database.getFullName(); + } finally { + database.readUnlock(); + } } public long getTableId() { return tableId; } - public String getTableName() { + public String getTableName() throws MetaNotFoundException { Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } database.readLock(); try { Table table = database.getTable(tableId); @@ -237,24 +245,16 @@ public JobState getState() { return state; } - public void setState(JobState state) { - this.state = state; - } - public long getAuthCode() { return authCode; } + // this is a unprotected method which is called in the initialization function protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException { - writeLock(); - try { - if (this.routineLoadDesc != null) { - throw new LoadException("Routine load desc has been initialized"); - } - this.routineLoadDesc = routineLoadDesc; - } finally { - writeUnlock(); + if (this.routineLoadDesc != null) { + throw new LoadException("Routine load desc has been initialized"); } + this.routineLoadDesc = routineLoadDesc; } public RoutineLoadDesc getRoutineLoadDesc() { @@ -270,13 +270,28 @@ public RoutineLoadProgress getProgress() { } public String getPartitions() { - if (routineLoadDesc.getPartitionNames() == null || routineLoadDesc.getPartitionNames().size() == 0) { + if (routineLoadDesc == null + || routineLoadDesc.getPartitionNames() == null + || routineLoadDesc.getPartitionNames().size() == 0) { return STAR_STRING; } else { return String.join(",", routineLoadDesc.getPartitionNames()); } } + public String getClusterName() throws MetaNotFoundException { + Database database = Catalog.getCurrentCatalog().getDb(id); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } + database.readLock(); + try { + return database.getClusterName(); + } finally { + database.readUnlock(); + } + } + protected void setDesireTaskConcurrentNum(int desireTaskConcurrentNum) throws LoadException { writeLock(); try { @@ -323,13 +338,8 @@ public List getNeedScheduleTaskInfoList() { return needScheduleTaskInfoList; } - public void updateState(JobState jobState) { - writeLock(); - try { - state = jobState; - } finally { - writeUnlock(); - } + public TExecPlanFragmentParams gettExecPlanFragmentParams() { + return tExecPlanFragmentParams; } public List processTimeoutTasks() { @@ -355,7 +365,7 @@ public List processTimeoutTasks() { } try { - result.add(reNewTask(routineLoadTaskInfo)); + result.add(unprotectRenewTask(routineLoadTaskInfo)); LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS); } catch (UserException e) { @@ -397,7 +407,10 @@ public void removeNeedScheduleTask(RoutineLoadTaskInfo routineLoadTaskInfo) { } } - abstract void updateProgress(RoutineLoadProgress progress); + // if rate of error data is more then max_filter_ratio, pause job + protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows()); + } public boolean containsTask(String taskId) { readLock(); @@ -413,11 +426,6 @@ public boolean containsTask(String taskId) { private void checkStateTransform(RoutineLoadJob.JobState desireState) throws UnsupportedOperationException { switch (state) { - case RUNNING: - if (desireState == JobState.NEED_SCHEDULE) { - throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); - } - break; case PAUSED: if (desireState == JobState.PAUSED) { throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); @@ -461,9 +469,24 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) { } } - abstract RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, + abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException; + public void plan() throws UserException { + StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadJob(this); + Database database = Catalog.getCurrentCatalog().getDb(this.getDbId()); + + database.readLock(); + try { + StreamLoadPlanner planner = new StreamLoadPlanner(database, + (OlapTable) database.getTable(this.tableId), + streamLoadTask); + tExecPlanFragmentParams = planner.plan(); + } finally { + database.readUnlock(); + } + } + @Override public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) throws AbortTransactionException { @@ -472,6 +495,7 @@ public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusC if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { case TIMEOUT: + default: String taskId = txnState.getLabel(); if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().equals(taskId))) { throw new AbortTransactionException( @@ -497,29 +521,28 @@ public void onCommitted(TransactionState txnState) { Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() .filter(entity -> entity.getId().equals(txnState.getLabel())).findFirst(); - RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); - - // step2: update job progress - updateProgress(rlTaskTxnCommitAttachment.getProgress()); + if (routineLoadTaskInfoOptional.isPresent()) { + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); - // step4: if rate of error data is more then max_filter_ratio, pause job - updateNumOfData(rlTaskTxnCommitAttachment.getFilteredRows(), rlTaskTxnCommitAttachment.getLoadedRows()); + // step2: update job progress + updateProgress(rlTaskTxnCommitAttachment); - if (state == JobState.RUNNING) { - // step5: create a new task for partitions - RoutineLoadTaskInfo newRoutineLoadTaskInfo = reNewTask(routineLoadTaskInfo); - Catalog.getCurrentCatalog().getRoutineLoadManager() - .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); + if (state == JobState.RUNNING) { + // step3: create a new task for partitions + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadManager() + .getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo); + } + } else { + LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " + + " Transaction {} will not be committed", + txnState.getLabel(), txnState.getTransactionId()); } - } catch (NoSuchElementException e) { - LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. " - + " Transaction {} will not be committed", - txnState.getLabel(), txnState.getTransactionId()); } catch (Throwable e) { LOG.error("failed to update offset in routine load task {} when transaction {} has been committed. " + "change job to paused", rlTaskTxnCommitAttachment.getTaskId(), txnState.getTransactionId(), e); - executePause("failed to update offset when transaction " + updateState(JobState.PAUSED, "failed to update offset when transaction " + txnState.getTransactionId() + " has been committed"); } finally { writeUnlock(); @@ -528,12 +551,12 @@ public void onCommitted(TransactionState txnState) { @Override public void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) { - pause(txnStatusChangeReason.name()); + updateState(JobState.PAUSED, txnStatusChangeReason.name()); LOG.debug("job {} need to be pause while txn {} abort with reason {}", id, txnState.getTransactionId(), txnStatusChangeReason.name()); } - protected static void checkCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { + protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { // check table belong to db, partitions belong to table if (stmt.getRoutineLoadDesc() == null) { checkDBSemantics(stmt.getDBTableName(), null); @@ -546,44 +569,54 @@ private static void checkDBSemantics(TableName dbTableName, List partiti throws AnalysisException { String tableName = dbTableName.getTbl(); String dbName = dbTableName.getDb(); - // check database + + // check table belong to database Database database = Catalog.getCurrentCatalog().getDb(dbName); - if (database == null) { - throw new AnalysisException("There is no database named " + dbName); + Table table = database.getTable(tableName); + if (table == null) { + throw new AnalysisException("There is no table named " + tableName + " in " + dbName); + } + // check table type + if (table.getType() != Table.TableType.OLAP) { + throw new AnalysisException("Only doris table support routine load"); } - database.readLock(); - try { - Table table = database.getTable(tableName); - // check table belong to database - if (table == null) { - throw new AnalysisException("There is no table named " + tableName + " in " + dbName); - } - // check table type - if (table.getType() != Table.TableType.OLAP) { - throw new AnalysisException("Only doris table support routine load"); - } - - if (partitionNames == null || partitionNames.size() == 0) { - return; - } - // check partitions belong to table - Optional partitionNotInTable = partitionNames.parallelStream() - .filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst(); - if (partitionNotInTable != null && partitionNotInTable.isPresent()) { - throw new AnalysisException("Partition " + partitionNotInTable.get() - + " does not belong to table " + tableName); - } - } finally { - database.readUnlock(); + if (partitionNames == null || partitionNames.size() == 0) { + return; + } + // check partitions belong to table + Optional partitionNotInTable = partitionNames.parallelStream() + .filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst(); + if (partitionNotInTable != null && partitionNotInTable.isPresent()) { + throw new AnalysisException("Partition " + partitionNotInTable.get() + + " does not belong to table " + tableName); } } - public void pause(String reason) { + public void updateState(JobState jobState) { + updateState(jobState, null); + } + + public void updateState(JobState jobState, String reason) { writeLock(); try { - checkStateTransform(JobState.PAUSED); - executePause(reason); + checkStateTransform(jobState); + switch (jobState) { + case PAUSED: + executePause(reason); + break; + case NEED_SCHEDULE: + executeNeedSchedule(); + break; + case STOPPED: + executeStop(); + break; + case CANCELLED: + executeCancel(reason); + break; + default: + break; + } } finally { writeUnlock(); } @@ -598,45 +631,56 @@ private void executePause(String reason) { needScheduleTaskInfoList.clear(); } - public void resume() { + private void executeNeedSchedule() { // TODO(ml): edit log - writeLock(); - try { - checkStateTransform(JobState.NEED_SCHEDULE); - state = JobState.NEED_SCHEDULE; - } finally { - writeUnlock(); - } + state = JobState.NEED_SCHEDULE; + routineLoadTaskInfoList.clear(); + needScheduleTaskInfoList.clear(); } - public void stop() { + private void executeStop() { // TODO(ml): edit log - writeLock(); + state = JobState.STOPPED; + routineLoadTaskInfoList.clear(); + needScheduleTaskInfoList.clear(); + } + + private void executeCancel(String reason) { + cancelReason = reason; + state = JobState.CANCELLED; + routineLoadTaskInfoList.clear(); + needScheduleTaskInfoList.clear(); + } + + public void update() { + // check if db and table exist + Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + LOG.info("The database {} has been deleted. Change {} job state to stopped", dbId, id); + updateState(JobState.STOPPED); + } + database.readLock(); try { - checkStateTransform(JobState.STOPPED); - state = JobState.STOPPED; - routineLoadTaskInfoList.clear(); - needScheduleTaskInfoList.clear(); + Table table = database.getTable(tableId); + // check table belong to database + if (table == null) { + LOG.info("The table {} has been deleted. Change {} job state to stopeed", tableId, id); + updateState(JobState.STOPPED); + } } finally { - writeUnlock(); + database.readUnlock(); } - } - public void reschedule() { + // check if partition has been changed if (needReschedule()) { - writeLock(); - try { - if (state == JobState.RUNNING) { - state = JobState.NEED_SCHEDULE; - routineLoadTaskInfoList.clear(); - needScheduleTaskInfoList.clear(); - } - } finally { - writeUnlock(); - } + executeUpdate(); + updateState(JobState.NEED_SCHEDULE); } } + protected void executeUpdate() { + } + protected boolean needReschedule() { return false; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 8f9b4f52a588df..baf048d01e5f28 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -92,6 +92,10 @@ public Queue getNeedScheduleTasksQueue() { return needScheduleTasksQueue; } + public void addTasksToNeedScheduleQueue(List routineLoadTaskInfoList) { + needScheduleTasksQueue.addAll(routineLoadTaskInfoList); + } + private void updateBeIdToMaxConcurrentTasks() { beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); @@ -172,7 +176,7 @@ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) throws DdlException writeLock(); try { // check if db.routineLoadName has been used - if (isNameUsed(routineLoadJob.dbId, routineLoadJob.getName())) { + if (isNameUsed(routineLoadJob.getDbId(), routineLoadJob.getName())) { throw new DdlException("Name " + routineLoadJob.getName() + " already used in db " + routineLoadJob.getDbId()); } @@ -228,17 +232,22 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throw throw new DdlException("There is not routine load job with name " + pauseRoutineLoadStmt.getName()); } // check auth - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + try { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + routineLoadJob.getDbFullName(), + routineLoadJob.getTableName(), + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + routineLoadJob.getTableName()); + } + } catch (MetaNotFoundException e) { + throw new DdlException(e.getMessage()); } - routineLoadJob.pause("User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job"); + routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, + "User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job"); } public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException, @@ -248,16 +257,20 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th throw new DdlException("There is not routine load job with name " + resumeRoutineLoadStmt.getName()); } // check auth - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + try { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + routineLoadJob.getDbFullName(), + routineLoadJob.getTableName(), + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + routineLoadJob.getTableName()); + } + } catch (MetaNotFoundException e) { + throw new DdlException(e.getMessage()); } - routineLoadJob.resume(); + routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE); } public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException { @@ -266,16 +279,20 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws D throw new DdlException("There is not routine load job with name " + stopRoutineLoadStmt.getName()); } // check auth - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + try { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + routineLoadJob.getDbFullName(), + routineLoadJob.getTableName(), + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + routineLoadJob.getTableName()); + } + } catch (MetaNotFoundException e) { + throw new DdlException(e.getMessage()); } - routineLoadJob.stop(); + routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED); } public int getSizeOfIdToRoutineLoadTask() { @@ -304,21 +321,28 @@ public int getClusterIdleSlotNum() { } } - public long getMinTaskBeId() throws LoadException { + public long getMinTaskBeId(String clusterName) throws LoadException { + List beIdsInCluster = new ArrayList<>(); + try { + beIdsInCluster = Catalog.getCurrentCatalog().getBackendIdsByCluster(clusterName); + } catch (MetaNotFoundException e) { + throw new LoadException(e.getMessage()); + } + readLock(); try { long result = -1L; int maxIdleSlotNum = 0; updateBeIdToMaxConcurrentTasks(); - for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { - if (beIdToConcurrentTasks.get(entry.getKey()) == null) { - result = maxIdleSlotNum < entry.getValue() ? entry.getKey() : result; - maxIdleSlotNum = Math.max(maxIdleSlotNum, entry.getValue()); - } else { - int idelTaskNum = entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); - result = maxIdleSlotNum < idelTaskNum ? entry.getKey() : result; - maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum); - } + for (Long beId : beIdsInCluster) { + int idleTaskNum = 0; + if (beIdToConcurrentTasks.containsKey(beId)) { + idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); + } else { + idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; + } + result = maxIdleSlotNum < idleTaskNum ? beId : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); } if (result < 0) { throw new LoadException("There is no empty slot in cluster"); @@ -369,17 +393,12 @@ public RoutineLoadJob getJobByTaskId(String taskId) throws MetaNotFoundException throw new MetaNotFoundException("could not found task by id " + taskId); } - public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException { - List jobs = new ArrayList<>(); - Collection stateJobs = null; + public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) { LOG.debug("begin to get routine load job by state {}", jobState.name()); - stateJobs = idToRoutineLoadJob.values().stream() + List stateJobs = idToRoutineLoadJob.values().stream() .filter(entity -> entity.getState() == jobState).collect(Collectors.toList()); - if (stateJobs != null) { - jobs.addAll(stateJobs); - LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name()); - } - return jobs; + LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name()); + return stateJobs; } public List processTimeoutTasks() { @@ -397,9 +416,9 @@ public void removeOldRoutineLoadJobs() { // TODO(ml): remove old routine load job } - public void rescheduleRoutineLoadJob() { + public void updateRoutineLoadJob() { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - routineLoadJob.reschedule(); + routineLoadJob.update(); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 0757376f58cb78..e8e744d22848cf 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.common.annotations.VisibleForTesting; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -30,8 +31,20 @@ public class RoutineLoadScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class); + private static final int DEFAULT_INTERVAL_SECONDS = 10; - private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); + private RoutineLoadManager routineLoadManager; + + @VisibleForTesting + public RoutineLoadScheduler() { + super(); + routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); + } + + public RoutineLoadScheduler(RoutineLoadManager routineLoadManager) { + super("Routine load", DEFAULT_INTERVAL_SECONDS * 1000); + this.routineLoadManager = routineLoadManager; + } @Override protected void runOneCycle() { @@ -44,7 +57,7 @@ protected void runOneCycle() { private void process() { // update - routineLoadManager.rescheduleRoutineLoadJob(); + routineLoadManager.updateRoutineLoadJob(); // get need schedule routine jobs List routineLoadJobList = null; try { @@ -53,9 +66,11 @@ private void process() { LOG.error("failed to get need schedule routine jobs"); } - LOG.debug("there are {} job need schedule", routineLoadJobList.size()); + LOG.info("there are {} job need schedule", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { try { + // create plan of routine load job + routineLoadJob.plan(); // judge nums of tasks more then max concurrent tasks of cluster int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getSizeOfIdToRoutineLoadTask(); @@ -68,20 +83,21 @@ private void process() { totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); break; } - // divide job into tasks - List needScheduleTasksList = - routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); - // save task into queue of needScheduleTasks - routineLoadManager.getNeedScheduleTasksQueue().addAll(needScheduleTasksList); + // check state and divide job into tasks + routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); } catch (MetaNotFoundException e) { - routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED); + routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED, e.getMessage()); + } catch (Throwable e) { + LOG.warn("failed to scheduler job, change job state to paused", e); + routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, e.getMessage()); + continue; } } LOG.debug("begin to check timeout tasks"); // check timeout tasks List rescheduleTasksList = routineLoadManager.processTimeoutTasks(); - routineLoadManager.getNeedScheduleTasksQueue().addAll(rescheduleTasksList); + routineLoadManager.addTasksToNeedScheduleQueue(rescheduleTasksList); } private List getNeedScheduleRoutineJobs() throws LoadException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index aae26a7fb87fb0..c0fe2139f80c27 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -47,17 +47,11 @@ public abstract class RoutineLoadTaskInfo { private long createTimeMs; private long loadStartTimeMs; private TExecPlanFragmentParams tExecPlanFragmentParams; - - public RoutineLoadTaskInfo(UUID id, long jobId) throws BeginTransactionException, - LabelAlreadyUsedException, AnalysisException { + + public RoutineLoadTaskInfo(UUID id, long jobId) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); - // begin a txn for task - RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); - txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - routineLoadJob.getDbId(), id.toString(), -1, "streamLoad", - TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob); } public UUID getId() { @@ -79,8 +73,16 @@ public long getLoadStartTimeMs() { public long getTxnId() { return txnId; } - - abstract TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException; + + abstract TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException; + + public void setTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + // begin a txn for task + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); + txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( + routineLoadJob.getDbId(), id.toString(), -1, "streamLoad", + TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob); + } @Override public boolean equals(Object obj) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 3c380043895e47..088b4290b1d2e2 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -17,6 +17,9 @@ package org.apache.doris.load.routineload; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.ClientPool; import org.apache.doris.common.LoadException; @@ -52,12 +55,16 @@ public class RoutineLoadTaskScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class); private RoutineLoadManager routineLoadManager; - private LinkedBlockingQueue needScheduleTasksQueue; + @VisibleForTesting public RoutineLoadTaskScheduler() { super("routine load task", 0); - routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); - needScheduleTasksQueue = (LinkedBlockingQueue) routineLoadManager.getNeedScheduleTasksQueue(); + this.routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); + } + + public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) { + super("routine load task", 0); + this.routineLoadManager = routineLoadManager; } @Override @@ -70,7 +77,9 @@ protected void runOneCycle() { } } - private void process() throws LoadException, UserException { + private void process() throws LoadException, UserException, InterruptedException { + LinkedBlockingQueue needScheduleTasksQueue = + (LinkedBlockingQueue) routineLoadManager.getNeedScheduleTasksQueue(); // update current beIdMaps for tasks routineLoadManager.updateBeIdTaskMaps(); @@ -83,17 +92,13 @@ private void process() throws LoadException, UserException { int scheduledTaskNum = 0; // get idle be task num // allocate task to be + if (needScheduleTaskNum == 0) { + Thread.sleep(1000); + return; + } while (needScheduleTaskNum > 0) { - RoutineLoadTaskInfo routineLoadTaskInfo = null; - try { - routineLoadTaskInfo = needScheduleTasksQueue.take(); - } catch (InterruptedException e) { - LOG.warn("Taking routine load task from queue has been interrupted with error msg {}", - e.getMessage()); - return; - } - - long beId = routineLoadManager.getMinTaskBeId(); + // allocate be to task and begin transaction for task + RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.peek(); RoutineLoadJob routineLoadJob = null; try { routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId().toString()); @@ -101,7 +106,27 @@ private void process() throws LoadException, UserException { LOG.warn("task {} has been abandoned", routineLoadTaskInfo.getId()); return; } - TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(beId); + long beId; + try { + beId = routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName()); + routineLoadTaskInfo.setTxn(); + } catch (Exception e) { + LOG.warn("put task to the rear of queue with error " + e.getMessage()); + needScheduleTasksQueue.take(); + needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTaskNum--; + continue; + } + + // task to thrift + try { + routineLoadTaskInfo = needScheduleTasksQueue.take(); + } catch (InterruptedException e) { + LOG.warn("Taking routine load task from queue has been interrupted with error msg {}", + e.getMessage()); + return; + } + TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(); // remove task for needScheduleTasksList in job routineLoadJob.removeNeedScheduleTask(routineLoadTaskInfo); routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); @@ -123,7 +148,6 @@ private void process() throws LoadException, UserException { LOG.info("{} tasks have bean allocated to be.", scheduledTaskNum); } - // todo: change to batch submit and reuse client private void submitBatchTask(Map> beIdToRoutineLoadTask) { for (Map.Entry> entry : beIdToRoutineLoadTask.entrySet()) { Backend backend = Catalog.getCurrentSystemInfo().getBackend(entry.getKey()); @@ -133,7 +157,6 @@ private void submitBatchTask(Map> beIdToRoutineLoad try { client = ClientPool.backendPool.borrowObject(address); client.submit_routine_load_task(entry.getValue()); - ok = true; } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backend.getId(), e); diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 570d718fdcbcaa..44758b2c0e11d3 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -80,6 +80,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.proc.BackendsProcDir; import org.apache.doris.common.proc.FrontendsProcNode; @@ -801,14 +802,18 @@ private void handleShowRoutineLoad() throws AnalysisException { } // check auth - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); + try { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + routineLoadJob.getDbFullName(), + routineLoadJob.getTableName(), + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + routineLoadJob.getTableName()); + } + } catch (MetaNotFoundException e) { + throw new AnalysisException(e.getMessage()); } // get routine load info diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 30d9a4896b628f..b37d16b2ae19e2 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -45,6 +45,7 @@ import java.io.StringReader; import java.util.Map; +import java.util.UUID; public class StreamLoadTask { @@ -131,19 +132,18 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws } } - public static StreamLoadTask fromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo) { - TUniqueId queryId = new TUniqueId(routineLoadTaskInfo.getId().getMostSignificantBits(), - routineLoadTaskInfo.getId().getLeastSignificantBits()); - StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, routineLoadTaskInfo.getTxnId(), + // the taskId and txnId is faked + public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), + taskId.getLeastSignificantBits()); + StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, -1L, TFileType.FILE_STREAM, TFileFormatType.FORMAT_CSV_PLAIN); - RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); - streamLoadTask.setOptionalFromRoutineLoadTaskInfo(routineLoadTaskInfo, - routineLoadManager.getJob(routineLoadTaskInfo.getJobId())); + streamLoadTask.setOptionalFromRoutineLoadJob(routineLoadJob); return streamLoadTask; } - private void setOptionalFromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo, - RoutineLoadJob routineLoadJob) { + private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) { if (routineLoadJob.getRoutineLoadDesc() != null) { RoutineLoadDesc routineLoadDesc = routineLoadJob.getRoutineLoadDesc(); if (routineLoadDesc.getColumnsInfo() != null) { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index d39c0e085c7677..b7e814ba48a647 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -317,6 +317,8 @@ public void setTransactionStatus(TransactionStatus transactionStatus, TxnStatusC case ABORTED: txnStateChangeListener.beforeAborted(this, txnStatusChangeReason); break; + default: + break; } } @@ -426,10 +428,6 @@ public boolean isPublishTimeout() { return System.currentTimeMillis() - publishVersionTime > timeoutMillis; } - public void setTxnStateChangeListener(TxnStateChangeListener txnStateChangeListener) { - this.txnStateChangeListener = txnStateChangeListener; - } - public TxnStateChangeListener getTxnStateChangeListener() { return txnStateChangeListener; } diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index 6b210b1d9c4b15..dc17df77deee1c 100644 --- a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -63,7 +63,7 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro Map customProperties = Maps.newHashMap(); customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, @@ -109,7 +109,7 @@ public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { Map customProperties = Maps.newHashMap(); customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, @@ -129,7 +129,7 @@ public void analyze(Analyzer analyzer1) { Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames()); Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum()); Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum()); - Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaEndpoint()); + Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaBrokerList()); Assert.assertEquals(topicName, createRoutineLoadStmt.getKafkaTopic()); Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(createRoutineLoadStmt.getKafkaPartitions())); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 76a29589fd544f..366567f4a9dcea 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -367,7 +367,7 @@ private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { Map customProperties = Maps.newHashMap(); customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 8681411a857c7d..c78908b697892f 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -76,7 +76,7 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, String topicName = "topic1"; customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, loadPropertyList, properties, typeName, customProperties); @@ -142,7 +142,7 @@ public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, String topicName = "topic1"; customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, loadPropertyList, properties, typeName, customProperties); @@ -236,7 +236,7 @@ public void testGetMinTaskBeId() throws LoadException { new Expectations() { { - systemInfoService.getBackendIds(true); + systemInfoService.getClusterBackendIds(anyString, true); result = beIds; Catalog.getCurrentSystemInfo(); result = systemInfoService; @@ -245,7 +245,7 @@ public void testGetMinTaskBeId() throws LoadException { RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addNumOfConcurrentTasksByBeId(1L); - Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId()); + Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId("default")); } @Test diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 98578f0e729c5d..ccd7023642592e 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -77,7 +77,7 @@ public void testNormalRunOneCycle(@Mocked KafkaConsumer consumer, new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, 1L, routineLoadDesc ,3, 0, "", "", new KafkaProgress()); - routineLoadJob.setState(RoutineLoadJob.JobState.NEED_SCHEDULE); + Deencapsulation.setField(routineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE); List routineLoadJobList = new ArrayList<>(); routineLoadJobList.add(routineLoadJob); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 05d26e31b28426..824cd29193d4be 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -113,7 +113,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 routineLoadManager.getNeedScheduleTasksQueue(); result = routineLoadTaskInfoQueue; - routineLoadManager.getMinTaskBeId(); + routineLoadManager.getMinTaskBeId(anyString); result = beId; routineLoadManager.getJobByTaskId(anyString); result = kafkaRoutineLoadJob1; diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 5afdff7accdf5c..2768fabef2fe39 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -321,7 +321,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); transactionState.setTransactionStatus(TransactionStatus.PREPARE); - transactionState.setTxnStateChangeListener(routineLoadJob); + Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); @@ -330,7 +330,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet KafkaProgress oldkafkaProgress = new KafkaProgress(); oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); - routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); rlTaskTxnCommitAttachment.setId(new TUniqueId()); @@ -395,7 +395,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); transactionState.setTransactionStatus(TransactionStatus.PREPARE); - transactionState.setTxnStateChangeListener(routineLoadJob); + Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); @@ -404,7 +404,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi KafkaProgress oldkafkaProgress = new KafkaProgress(); oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); - routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); rlTaskTxnCommitAttachment.setId(new TUniqueId()); From e01cc3fdbfd64a255e6dc6ac1e517e4bba9b6334 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Tue, 5 Mar 2019 16:34:10 +0800 Subject: [PATCH 2/2] Catch MetaNotFoundException before check priv --- .../org/apache/doris/catalog/Catalog.java | 17 ---- .../load/routineload/RoutineLoadManager.java | 80 +++++++++++-------- .../load/routineload/RoutineLoadTaskInfo.java | 2 +- .../routineload/RoutineLoadTaskScheduler.java | 5 +- .../org/apache/doris/qe/ShowExecutor.java | 24 +++--- 5 files changed, 61 insertions(+), 67 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index cb5f4f0ba264b9..081edb6e5e48d4 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -6079,22 +6079,5 @@ public void replayBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) { replica.setBad(backendTabletsInfo.isBad()); } } - - public List getBackendIdsByCluster(String clusterName) throws MetaNotFoundException { - if (nameToCluster.containsKey(clusterName)) { - Cluster cluster = nameToCluster.get(clusterName); - if (cluster == null) { - throw new MetaNotFoundException("Cluster " + clusterName + "has been deleted"); - } - tryLock(true); - try { - return cluster.getBackendIdList(); - } finally { - unlock(); - } - } else { - throw new MetaNotFoundException("Cluster " + clusterName + "has been deleted"); - } - } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index baf048d01e5f28..4c5b938446d7ed 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -232,18 +232,22 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throw throw new DdlException("There is not routine load job with name " + pauseRoutineLoadStmt.getName()); } // check auth + String dbFullName; + String tableName; try { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); - } + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); } catch (MetaNotFoundException e) { - throw new DdlException(e.getMessage()); + throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); + } + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbFullName, + tableName, + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + tableName); } routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, @@ -257,18 +261,22 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th throw new DdlException("There is not routine load job with name " + resumeRoutineLoadStmt.getName()); } // check auth + String dbFullName; + String tableName; try { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); - } + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); } catch (MetaNotFoundException e) { - throw new DdlException(e.getMessage()); + throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); + } + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbFullName, + tableName, + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + tableName); } routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE); } @@ -279,18 +287,22 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws D throw new DdlException("There is not routine load job with name " + stopRoutineLoadStmt.getName()); } // check auth + String dbFullName; + String tableName; try { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); - } + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); } catch (MetaNotFoundException e) { - throw new DdlException(e.getMessage()); + throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e); + } + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbFullName, + tableName, + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + tableName); } routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED); } @@ -322,11 +334,9 @@ public int getClusterIdleSlotNum() { } public long getMinTaskBeId(String clusterName) throws LoadException { - List beIdsInCluster = new ArrayList<>(); - try { - beIdsInCluster = Catalog.getCurrentCatalog().getBackendIdsByCluster(clusterName); - } catch (MetaNotFoundException e) { - throw new LoadException(e.getMessage()); + List beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName); + if (beIdsInCluster == null) { + throw new LoadException("The " + clusterName + " has been deleted"); } readLock(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index c0fe2139f80c27..3fa4ce1efccda5 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -76,7 +76,7 @@ public long getTxnId() { abstract TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException; - public void setTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { // begin a txn for task RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 088b4290b1d2e2..9141c2e8f3ed7f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -31,9 +31,6 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TRoutineLoadTask; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -109,7 +106,7 @@ private void process() throws LoadException, UserException, InterruptedException long beId; try { beId = routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName()); - routineLoadTaskInfo.setTxn(); + routineLoadTaskInfo.beginTxn(); } catch (Exception e) { LOG.warn("put task to the rear of queue with error " + e.getMessage()); needScheduleTasksQueue.take(); diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 44758b2c0e11d3..0bc7a8c74ae18d 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -802,18 +802,22 @@ private void handleShowRoutineLoad() throws AnalysisException { } // check auth + String dbFullName; + String tableName; try { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - routineLoadJob.getDbFullName(), - routineLoadJob.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - routineLoadJob.getTableName()); - } + dbFullName = routineLoadJob.getDbFullName(); + tableName = routineLoadJob.getTableName(); } catch (MetaNotFoundException e) { - throw new AnalysisException(e.getMessage()); + throw new AnalysisException("The metadata of job has been changed. The job will be cancelled automatically", e); + } + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbFullName, + tableName, + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + tableName); } // get routine load info