diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 6febb436eafc76..ceab4cee7e1eda 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -206,7 +206,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_JOIN, KW_KEY, KW_KILL, KW_LABEL, KW_LARGEINT, KW_LAST, KW_LEFT, KW_LESS, KW_LEVEL, KW_LIKE, KW_LIMIT, KW_LINK, KW_LOAD, - KW_ROUTINE, KW_PAUSE, KW_RESUME, KW_STOP, + KW_ROUTINE, KW_PAUSE, KW_RESUME, KW_STOP, KW_TASK, KW_LOCAL, KW_LOCATION, KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY, KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS, @@ -243,7 +243,8 @@ terminal String COMMENTED_PLAN_HINTS; // Statement that the result of this parser. nonterminal StatementBase query, stmt, show_stmt, show_param, help_stmt, load_stmt, - create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, show_routine_load_stmt, + create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, + show_routine_load_stmt, show_routine_load_task_stmt, describe_stmt, alter_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_where_stmt; @@ -374,6 +375,7 @@ nonterminal AccessPrivilege privilege_type; nonterminal DataDescription data_desc; nonterminal List data_desc_list; nonterminal LabelName job_label; +nonterminal LabelName opt_job_label; nonterminal String opt_system; nonterminal String opt_cluster; nonterminal BrokerDesc opt_broker; @@ -529,6 +531,8 @@ stmt ::= {: RESULT = stmt; :} | show_routine_load_stmt : stmt {: RESULT = stmt; :} + | show_routine_load_task_stmt : stmt + {: RESULT = stmt; :} | cancel_stmt : stmt {: RESULT = stmt; :} | delete_stmt : stmt @@ -1001,6 +1005,17 @@ load_stmt ::= :} ; +opt_job_label ::= + /* Empty */ + {: + RESULT = null; + :} + | job_label:jobLabel + {: + RESULT = jobLabel; + :} + ; + job_label ::= ident:label {: @@ -1147,12 +1162,12 @@ opt_cluster ::= // Routine load statement create_routine_load_stmt ::= - KW_CREATE KW_ROUTINE KW_LOAD ident:jobName KW_ON table_name:dbTableName + KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel KW_ON ident:tableName opt_load_property_list:loadPropertyList opt_properties:properties KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN {: - RESULT = new CreateRoutineLoadStmt(jobName, dbTableName, loadPropertyList, properties, type, customProperties); + RESULT = new CreateRoutineLoadStmt(jobLabel, tableName, loadPropertyList, properties, type, customProperties); :} ; @@ -1191,30 +1206,41 @@ load_property ::= ; pause_routine_load_stmt ::= - KW_PAUSE KW_ROUTINE KW_LOAD ident:name + KW_PAUSE KW_ROUTINE KW_LOAD job_label:jobLabel {: - RESULT = new PauseRoutineLoadStmt(name); + RESULT = new PauseRoutineLoadStmt(jobLabel); :} ; resume_routine_load_stmt ::= - KW_RESUME KW_ROUTINE KW_LOAD ident:name + KW_RESUME KW_ROUTINE KW_LOAD job_label:jobLabel {: - RESULT = new ResumeRoutineLoadStmt(name); + RESULT = new ResumeRoutineLoadStmt(jobLabel); :} ; stop_routine_load_stmt ::= - KW_STOP KW_ROUTINE KW_LOAD ident:name + KW_STOP KW_ROUTINE KW_LOAD job_label:jobLabel {: - RESULT = new StopRoutineLoadStmt(name); + RESULT = new StopRoutineLoadStmt(jobLabel); :} ; show_routine_load_stmt ::= - KW_SHOW KW_ROUTINE KW_LOAD ident:name + KW_SHOW KW_ROUTINE KW_LOAD opt_job_label:jobLabel + {: + RESULT = new ShowRoutineLoadStmt(jobLabel, false); + :} + | KW_SHOW KW_ALL KW_ROUTINE KW_LOAD opt_job_label:jobLabel + {: + RESULT = new ShowRoutineLoadStmt(jobLabel, true); + :} + ; + +show_routine_load_task_stmt ::= + KW_SHOW KW_ROUTINE KW_LOAD KW_TASK opt_db:dbName opt_wild_where {: - RESULT = new ShowRoutineLoadStmt(name); + RESULT = new ShowRoutineLoadTaskStmt(dbName, parser.where); :} ; diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index d8b5bc9ebc1334..ae81f0a44792d4 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -38,7 +38,7 @@ Create routine Load statement, continually load data from a streaming app syntax: - CREATE ROUTINE LOAD name ON database.table + CREATE ROUTINE LOAD [database.]name on table [load properties] [PROPERTIES ( @@ -108,8 +108,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(KAFKA_OFFSETS_PROPERTY) .build(); - private final String name; - private final TableName dbTableName; + private final LabelName labelName; + private final String tableName; private final List loadPropertyList; private final Map jobProperties; private final String typeName; @@ -117,6 +117,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { // the following variables will be initialized after analyze // -1 as unset, the default value will set in RoutineLoadJob + private String name; + private String dbName; private RoutineLoadDesc routineLoadDesc; private int desiredConcurrentNum = 1; private int maxErrorNum = -1; @@ -130,11 +132,11 @@ public class CreateRoutineLoadStmt extends DdlStmt { // pair private List> kafkaPartitionOffsets = Lists.newArrayList(); - public CreateRoutineLoadStmt(String name, TableName dbTableName, List loadPropertyList, + public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, Map jobProperties, String typeName, Map dataSourceProperties) { - this.name = name; - this.dbTableName = dbTableName; + this.labelName = labelName; + this.tableName = tableName; this.loadPropertyList = loadPropertyList; this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties; this.typeName = typeName.toUpperCase(); @@ -145,8 +147,12 @@ public String getName() { return name; } - public TableName getDBTableName() { - return dbTableName; + public String getDBName() { + return dbName; + } + + public String getTableName() { + return tableName; } public String getTypeName() { @@ -192,10 +198,10 @@ public List> getKafkaPartitionOffsets() { @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); + // check dbName and tableName + checkDBTable(analyzer); // check name FeNameFormat.checkCommonName(NAME_TYPE, name); - // check dbName and tableName - dbTableName.analyze(analyzer); // check load properties include column separator etc. checkLoadProperties(analyzer); // check routine load job properties include desired concurrent number etc. @@ -204,6 +210,15 @@ public void analyze(Analyzer analyzer) throws UserException { checkDataSourceProperties(); } + public void checkDBTable(Analyzer analyzer) throws AnalysisException { + labelName.analyze(analyzer); + dbName = labelName.getDbName(); + name = labelName.getLabelName(); + if (Strings.isNullOrEmpty(tableName)) { + throw new AnalysisException("Table name should not be null"); + } + } + public void checkLoadProperties(Analyzer analyzer) throws UserException { if (loadPropertyList == null) { return; diff --git a/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java index a7227211ad428d..18f7633806c0c7 100644 --- a/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java @@ -25,26 +25,27 @@ Pause routine load by name syntax: - PAUSE ROUTINE LOAD name + PAUSE ROUTINE LOAD [database.]name */ public class PauseRoutineLoadStmt extends DdlStmt { - private final String name; + private final LabelName labelName; - public PauseRoutineLoadStmt(String name) { - this.name = name; + public PauseRoutineLoadStmt(LabelName labelName) { + this.labelName = labelName; } public String getName() { - return name; + return labelName.getLabelName(); + } + + public String getDbFullName(){ + return labelName.getDbName(); } @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); - - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); - } + labelName.analyze(analyzer); } } diff --git a/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java index 28bd4ae6833840..9bd8b6e142ec71 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java @@ -25,25 +25,27 @@ Resume routine load job by name syntax: - RESUME ROUTINE LOAD name + RESUME ROUTINE LOAD [database.]name */ public class ResumeRoutineLoadStmt extends DdlStmt{ - private final String name; + private final LabelName labelName; - public ResumeRoutineLoadStmt(String name) { - this.name = name; + public ResumeRoutineLoadStmt(LabelName labelName) { + this.labelName = labelName; } public String getName() { - return name; + return labelName.getLabelName(); + } + + public String getDBFullName(){ + return labelName.getDbName(); } @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); - } + labelName.analyze(analyzer); } } diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 3ff0d283ae2f50..71a9f871a8f050 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.qe.ShowResultSetMetaData; @@ -30,40 +31,95 @@ Show routine load progress by routine load name syntax: - SHOW ROUTINE LOAD name + SHOW [ALL] ROUTINE LOAD [database.][name] + + without ALL: only show job which is not final + with ALL: show all of job include history job + + without name: show all of routine load job with different name + with name: show all of job named ${name} + + without on db: show all of job in connection db + if user does not choose db before, return error + with on db: show all of job in ${db} + + example: + show routine load named test in database1 + SHOW ROUTINE LOAD database1.test; + + show routine load in database1 + SHOW ROUTINE LOAD database1; + + show routine load in database1 include history + use database1; + SHOW ALL ROUTINE LOAD; + + show routine load in all of database + please use show proc */ public class ShowRoutineLoadStmt extends ShowStmt { private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("id") - .add("name") - .add("db_id") - .add("table_id") - .add("partitions") - .add("state") - .add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY) - .add("progress") + .add("Id") + .add("Name") + .add("DBId") + .add("TableId") + .add("State") + .add("DataSourceType") + .add("JobProperties") + .add("DataSourceProperties") + .add("CurrentTaskConcurrentNumber") + .add("TotalRows") + .add("TotalErrorRows") + .add("Progress") + .add("ReasonOfStateChanged") .build(); - private final String name; + private final LabelName labelName; + private String dbFullName; // optional + private String name; // optional + private boolean includeHistory = false; + - public ShowRoutineLoadStmt(String name) { - this.name = name; + public ShowRoutineLoadStmt(LabelName labelName, boolean includeHistory) { + this.labelName = labelName; + this.includeHistory = includeHistory; + } + + public String getDbFullName() { + return dbFullName; } public String getName() { return name; } + public boolean isIncludeHistory() { + return includeHistory; + } + @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); + checkLabelName(analyzer); + } + + private void checkLabelName(Analyzer analyzer) throws AnalysisException { + String dbName = labelName == null ? null : labelName.getDbName(); + if (Strings.isNullOrEmpty(dbName)) { + dbFullName = analyzer.getContext().getDatabase(); + if (Strings.isNullOrEmpty(dbFullName)) { + throw new AnalysisException("please choose a database firstly " + + "such as use db, show routine load db.name etc."); + } + } else { + dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName); } + name = labelName == null ? null : labelName.getLabelName(); } + @Override public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java new file mode 100644 index 00000000000000..0649ef2feb18fe --- /dev/null +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.analysis; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ShowResultSetMetaData; + +import java.util.Arrays; +import java.util.List; + +/* + show all of task belong to job + SHOW ROUTINE LOAD TASK FROM DB where expr; + + where expr: JobName=xxx + */ +public class ShowRoutineLoadTaskStmt extends ShowStmt { + private static final List supportColumn = Arrays.asList("JobName"); + private static final ImmutableList TITLE_NAMES = + new ImmutableList.Builder() + .add("TaskId") + .add("TxnId") + .add("JobId") + .add("CreateTimeMs") + .add("LoadStartTimeMs") + .add("BeId") + .add("DataSourceProperties") + .build(); + + private final String dbName; + private final Expr jobNameExpr; + + private String jobName; + private String dbFullName; + + public ShowRoutineLoadTaskStmt(String dbName, Expr jobNameExpr) { + this.dbName = dbName; + this.jobNameExpr = jobNameExpr; + } + + public String getJobName() { + return jobName; + } + + public String getDbFullName() { + return dbFullName; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + checkDB(analyzer); + checkJobNameExpr(analyzer); + } + + private void checkDB(Analyzer analyzer) throws AnalysisException { + if (Strings.isNullOrEmpty(dbName)) { + if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { + throw new AnalysisException("please designate a database in show stmt"); + } + dbFullName = analyzer.getDefaultDb(); + } else { + dbFullName = ClusterNamespace.getFullName(analyzer.getClusterName(), dbName); + } + } + + private void checkJobNameExpr(Analyzer analyzer) throws AnalysisException { + if (jobNameExpr == null) { + throw new AnalysisException("please designate a name in where expr such as name=xxx"); + } + + boolean valid = true; + CHECK: + { + // check predicate + if (!(jobNameExpr instanceof BinaryPredicate)) { + valid = false; + break CHECK; + } + BinaryPredicate binaryPredicate = (BinaryPredicate) jobNameExpr; + if (binaryPredicate.getOp() != BinaryPredicate.Operator.EQ) { + valid = false; + break CHECK; + } + + // check child(0) + if (!(binaryPredicate.getChild(0) instanceof SlotRef)) { + valid = false; + break CHECK; + } + SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0); + if (!supportColumn.stream().anyMatch(entity -> entity.equals(slotRef.getColumnName()))) { + valid = false; + break CHECK; + } + + // check child(1) + if (!(binaryPredicate.getChild(1) instanceof StringLiteral)) { + valid = false; + break CHECK; + } + StringLiteral stringLiteral = (StringLiteral) binaryPredicate.getChild(1); + jobName = stringLiteral.getValue(); + } + + if (!valid) { + throw new AnalysisException("show routine load job only support one equal expr which is sames like JobName=xxx"); + } + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + + for (String title : TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(30))); + } + return builder.build(); + } +} diff --git a/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java index a8f15e4150082a..170d484f7f3ce5 100644 --- a/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java @@ -26,25 +26,27 @@ Stop routine load job by name syntax: - STOP ROUTINE LOAD name + STOP ROUTINE LOAD [database.]name */ public class StopRoutineLoadStmt extends DdlStmt { - private final String name; + private final LabelName labelName; - public StopRoutineLoadStmt(String name) { - this.name = name; + public StopRoutineLoadStmt(LabelName labelName) { + this.labelName = labelName; } public String getName() { - return name; + return labelName.getLabelName(); + } + + public String getDBFullName(){ + return labelName.getDbName(); } @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); - } + labelName.analyze(analyzer); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 3eb722ca04ee83..c43a263bbb436d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.gson.Gson; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TKafkaRLTaskProgress; @@ -27,6 +28,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -37,7 +39,7 @@ public class KafkaProgress extends RoutineLoadProgress { // (partition id, begin offset) - private Map partitionIdToOffset = Maps.newHashMap(); + private Map partitionIdToOffset = Maps.newConcurrentMap(); public KafkaProgress() { super(LoadDataSourceType.KAFKA); @@ -52,12 +54,20 @@ public Map getPartitionIdToOffset() { return partitionIdToOffset; } - public void addPartitionOffset(Pair partitionOffset) { - partitionIdToOffset.put(partitionOffset.first, partitionOffset.second); + public Map getPartitionIdToOffset(List partitionIds) { + Map result = Maps.newHashMap(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + for (Integer partitionId : partitionIds) { + if (entry.getKey().equals(partitionId)) { + result.put(partitionId, entry.getValue()); + } + } + } + return result; } - public void setPartitionIdToOffset(Map partitionIdToOffset) { - this.partitionIdToOffset = partitionIdToOffset; + public void addPartitionOffset(Pair partitionOffset) { + partitionIdToOffset.put(partitionOffset.first, partitionOffset.second); } // (partition id, end offset) @@ -79,6 +89,16 @@ public void update(RoutineLoadProgress progress) { .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); } + @Override + public String toJsonString() { + Map showPartitionIdToOffset = new HashMap<>(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); + } + Gson gson = new Gson(); + return gson.toJson(showPartitionIdToOffset); + } + @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 9b0710ca1c340e..47716b31cebdc1 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,6 +17,8 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Maps; +import com.google.gson.Gson; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -51,6 +53,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -120,18 +123,21 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) { if (state == JobState.NEED_SCHEDULE) { // divide kafkaPartitions into tasks for (int i = 0; i < currentConcurrentTaskNum; i++) { - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName); + Map taskKafkaProgress = Maps.newHashMap(); + for (int j = 0; j < currentKafkaPartitions.size(); j++) { + if (j % currentConcurrentTaskNum == 0) { + int kafkaPartition = currentKafkaPartitions.get(j); + taskKafkaProgress.put(kafkaPartition, + ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition)); + } + } + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName, taskKafkaProgress); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } + // change job state to running if (result.size() != 0) { - for (int i = 0; i < currentKafkaPartitions.size(); i++) { - ((KafkaTaskInfo) routineLoadTaskInfoList.get(i % currentConcurrentTaskNum)) - .addKafkaPartition(currentKafkaPartitions.get(i)); - } - // change job state to running - // TODO(ml): edit log - state = JobState.RUNNING; + unprotectUpdateState(JobState.RUNNING, null, false); } } else { LOG.debug("Ignore to divide routine load job while job state {}", state); @@ -155,7 +161,9 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { LOG.info("current concurrent task number is min " + "(current size of partition {}, desire task concurrent num {}, alive be num {})", partitionNum, desireTaskConcurrentNum, aliveBeNum); - return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); + currentTaskConcurrentNum = + Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); + return currentTaskConcurrentNum; } // partitionIdToOffset must be not empty when loaded rows > 0 @@ -192,8 +200,10 @@ protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { @Override protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { + KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; // add new task - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo((KafkaTaskInfo) routineLoadTaskInfo); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo, + ((KafkaProgress)progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions())); // remove old task routineLoadTaskInfoList.remove(routineLoadTaskInfo); // add new task @@ -275,16 +285,16 @@ private List getAllKafkaPartitions() { public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) throws UserException { // check db and table - Database db = Catalog.getCurrentCatalog().getDb(stmt.getDBTableName().getDb()); + Database db = Catalog.getCurrentCatalog().getDb(stmt.getDBName()); if (db == null) { - ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, stmt.getDBTableName().getDb()); + ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, stmt.getDBName()); } long tableId = -1L; db.readLock(); try { - unprotectedCheckMeta(db, stmt.getDBTableName().getTbl(), stmt.getRoutineLoadDesc()); - tableId = db.getTable(stmt.getDBTableName().getTbl()).getId(); + unprotectedCheckMeta(db, stmt.getTableName(), stmt.getRoutineLoadDesc()); + tableId = db.getTable(stmt.getTableName()).getId(); } finally { db.readUnlock(); } @@ -343,6 +353,16 @@ private void setCustomKafkaPartitions(List> kafkaPartitionOf } } + @Override + protected String dataSourcePropertiesJsonToString() { + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("brokerList", brokerList); + dataSourceProperties.put("topic", topic); + dataSourceProperties.put("currentKafkaPartitions", Joiner.on(",").join(currentKafkaPartitions)); + Gson gson = new Gson(); + return gson.toJson(dataSourceProperties); + } + @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index fda5b29cef21fc..9f1dcd11bd8b30 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -17,6 +17,8 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; +import com.google.gson.Gson; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; @@ -43,39 +45,27 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); - private List partitions; + // + private Map partitionIdToOffset; - public KafkaTaskInfo(UUID id, long jobId, String clusterName) { + public KafkaTaskInfo(UUID id, long jobId, String clusterName, Map partitionIdToOffset) { super(id, jobId, clusterName); - this.partitions = new ArrayList<>(); + this.partitionIdToOffset = partitionIdToOffset; } - public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException, + public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionIdToOffset) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(), kafkaTaskInfo.getBeId()); - this.partitions = kafkaTaskInfo.getPartitions(); - } - - public void addKafkaPartition(int partition) { - partitions.add(partition); + this.partitionIdToOffset = partitionIdToOffset; } public List getPartitions() { - return partitions; + return new ArrayList<>(partitionIdToOffset.keySet()); } - // TODO: reuse plan fragment of stream load @Override public TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException { KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId); - Map partitionIdToOffset = Maps.newHashMap(); - for (Integer partitionId : partitions) { - KafkaProgress kafkaProgress = (KafkaProgress) routineLoadJob.getProgress(); - if (!kafkaProgress.getPartitionIdToOffset().containsKey(partitionId)) { - kafkaProgress.getPartitionIdToOffset().put(partitionId, 0L); - } - partitionIdToOffset.put(partitionId, kafkaProgress.getPartitionIdToOffset().get(partitionId)); - } // init tRoutineLoadTask and create plan fragment TRoutineLoadTask tRoutineLoadTask = new TRoutineLoadTask(); @@ -107,6 +97,11 @@ public TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserExcept return tRoutineLoadTask; } + @Override + protected String getTaskDataSourceProperties() { + Gson gson = new Gson(); + return gson.toJson(partitionIdToOffset); + } private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams().deepCopy(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0607045ecfd475..ef167cfc05da0b 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,10 +17,15 @@ package org.apache.doris.load.routineload; +import com.google.common.base.Joiner; +import com.google.gson.Gson; +import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.ImportColumnsStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; -import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; @@ -31,7 +36,6 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.LabelAlreadyUsedException; -import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -86,7 +90,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable private static final int DEFAULT_MAX_INTERVAL_SECOND = 5; private static final int DEFAULT_MAX_BATCH_ROWS = 100000; private static final int DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB - private static final String STAR_STRING = "*"; + protected static final String STAR_STRING = "*"; protected static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3; /** @@ -130,7 +134,11 @@ public boolean isFinalState() { protected long tableId; // this code is used to verify be task request protected long authCode; - protected RoutineLoadDesc routineLoadDesc; // optional + // protected RoutineLoadDesc routineLoadDesc; // optional + protected List partitions; // optional + protected Map columnToColumnExpr; // optional + protected Expr whereExpr; // optional + protected ColumnSeparator columnSeparator; // optional protected int desireTaskConcurrentNum; // optional protected JobState state = JobState.NEED_SCHEDULE; protected LoadDataSourceType dataSourceType; @@ -142,6 +150,7 @@ public boolean isFinalState() { protected int maxBatchRows = DEFAULT_MAX_BATCH_ROWS; protected int maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; + protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; protected String pausedReason; protected String cancelReason; @@ -202,14 +211,13 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId, this.name = name; this.dbId = dbId; this.tableId = tableId; - this.routineLoadDesc = routineLoadDesc; this.desireTaskConcurrentNum = desireTaskConcurrentNum; this.dataSourceType = dataSourceType; this.maxErrorNum = maxErrorNum; } protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { - this.routineLoadDesc = stmt.getRoutineLoadDesc(); + setRoutineLoadDesc(stmt.getRoutineLoadDesc()); if (stmt.getDesiredConcurrentNum() != -1) { this.desireTaskConcurrentNum = stmt.getDesiredConcurrentNum(); } @@ -227,6 +235,29 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { } } + private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { + if (routineLoadDesc != null) { + if (routineLoadDesc.getColumnsInfo() != null) { + ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo(); + if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { + columnToColumnExpr = Maps.newHashMap(); + for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { + columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr()); + } + } + } + if (routineLoadDesc.getWherePredicate() != null) { + whereExpr = routineLoadDesc.getWherePredicate().getExpr(); + } + if (routineLoadDesc.getColumnSeparator() != null) { + columnSeparator = routineLoadDesc.getColumnSeparator(); + } + if (routineLoadDesc.getPartitionNames() != null && routineLoadDesc.getPartitionNames().size() != 0) { + partitions = routineLoadDesc.getPartitionNames(); + } + } + } + @Override public long getId() { return id; @@ -281,6 +312,9 @@ public String getTableName() throws MetaNotFoundException { database.readLock(); try { Table table = database.getTable(tableId); + if (table == null) { + throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId); + } return table.getName(); } finally { database.readUnlock(); @@ -299,38 +333,24 @@ public long getEndTimestamp() { return endTimestamp; } - // this is a unprotected method which is called in the initialization function - protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException { - if (this.routineLoadDesc != null) { - throw new LoadException("Routine load desc has been initialized"); - } - this.routineLoadDesc = routineLoadDesc; + public List getPartitions() { + return partitions; } - public RoutineLoadDesc getRoutineLoadDesc() { - return routineLoadDesc; + public Map getColumnToColumnExpr() { + return columnToColumnExpr; } - public RoutineLoadProgress getProgress() { - return progress; - } - - public String getPartitions() { - if (routineLoadDesc == null - || routineLoadDesc.getPartitionNames() == null - || routineLoadDesc.getPartitionNames().size() == 0) { - return STAR_STRING; - } else { - return String.join(",", routineLoadDesc.getPartitionNames()); - } + public Expr getWhereExpr() { + return whereExpr; } - public int getDesiredConcurrentNumber() { - return desireTaskConcurrentNum; + public ColumnSeparator getColumnSeparator() { + return columnSeparator; } - public int getMaxErrorNum() { - return maxErrorNum; + public RoutineLoadProgress getProgress() { + return progress; } public int getMaxBatchIntervalS() { @@ -463,41 +483,41 @@ private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean i if (currentTotalRows > ERROR_SAMPLE_NUM) { if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "current error num is more then max error num, begin to pause job") + .add("msg", "current error rows is more then max error num, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - updateState(JobState.PAUSED, "current error num of job is more then max error num", isReplay); + updateState(JobState.PAUSED, "current error rows of job is more then max error num", isReplay); } LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "reset current total num and current error num when current total num is more then base") + .add("msg", "reset current total rows and current error rows when current total rows is more then base") .build()); // reset currentTotalNum and currentErrorNum currentErrorRows = 0; currentTotalRows = 0; } else if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "current error num is more then max error num, begin to pause job") + .add("msg", "current error rows is more then max error rows, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - updateState(JobState.PAUSED, "current error num is more then max error num", isReplay); + updateState(JobState.PAUSED, "current error rows is more then max error num", isReplay); // reset currentTotalNum and currentErrorNum currentErrorRows = 0; currentTotalRows = 0; LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "reset current total num and current error num when current total num is more then max error num") + .add("msg", "reset current total rows and current error rows when current total rows is more then max error num") .build()); } } @@ -570,13 +590,16 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti @Override public ListenResult onCommitted(TransactionState txnState) throws TransactionException { ListenResult result = ListenResult.UNCHANGED; + long taskBeId = -1L; writeLock(); try { // find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { - executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); + taskBeId = routineLoadTaskInfo.getBeId(); + executeCommitTask(routineLoadTaskInfo, txnState); result = ListenResult.CHANGED; } else { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", @@ -591,8 +614,8 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc throw e; } catch (Throwable e) { LOG.warn(e.getMessage(), e); - updateState(JobState.PAUSED, "failed to update offset when transaction " - + txnState.getTransactionId() + " has been committed", false /* not replay */); + updateState(JobState.PAUSED, "be " + taskBeId + " commit task failed " + txnState.getLabel() + " with error " + e.getMessage() + + " while transaction " + txnState.getTransactionId() + " has been committed", false /* not replay */); } finally { writeUnlock(); } @@ -612,12 +635,15 @@ public void replayOnCommitted(TransactionState txnState) { @Override public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReasonString) { ListenResult result = ListenResult.UNCHANGED; + long taskBeId = -1L; writeLock(); try { // step0: find task in job - Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( + Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); + taskBeId = routineLoadTaskInfo.getBeId(); // step1: job state will be changed depending on txnStatusChangeReasonString if (txnStatusChangeReasonString != null) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", @@ -628,8 +654,8 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { case OFFSET_OUT_OF_RANGE: - updateState(JobState.CANCELLED, txnStatusChangeReason.toString(), - false /* not replay */); + updateState(JobState.CANCELLED, "be " + taskBeId + " abort task " + + "with reason " + txnStatusChangeReason.toString(), false /* not replay */); return result; default: break; @@ -641,15 +667,16 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR txnState.getTransactionId()).add("msg", "txn abort").build()); } // step2: commit task , update progress, maybe create a new task - executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + executeCommitTask(routineLoadTaskInfo, txnState); result = ListenResult.CHANGED; } } catch (Exception e) { - updateState(JobState.PAUSED, - "failed to renew task when txn has been aborted with error " + e.getMessage(), - false /* not replay */); - // TODO(ml): edit log - LOG.warn("failed to renew a routine load task in job {} with error message {}", id, e.getMessage(), e); + updateState(JobState.PAUSED, "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(), + false /* not replay */); + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("task_id", txnState.getLabel()) + .add("error_msg", "change job state to paused when task has been aborted with error " + e.getMessage()) + .build()); } finally { writeUnlock(); } @@ -721,43 +748,6 @@ protected static void unprotectedCheckMeta(Database db, String tblName, RoutineL // columns will be checked when planing } - protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { - // check table belong to db, partitions belong to table - if (stmt.getRoutineLoadDesc() == null) { - checkDBSemantics(stmt.getDBTableName(), null); - } else { - checkDBSemantics(stmt.getDBTableName(), stmt.getRoutineLoadDesc().getPartitionNames()); - } - } - - private static void checkDBSemantics(TableName dbTableName, List partitionNames) - throws AnalysisException { - String tableName = dbTableName.getTbl(); - String dbName = dbTableName.getDb(); - - // check table belong to database - Database database = Catalog.getCurrentCatalog().getDb(dbName); - Table table = database.getTable(tableName); - if (table == null) { - throw new AnalysisException("There is no table named " + tableName + " in " + dbName); - } - // check table type - if (table.getType() != Table.TableType.OLAP) { - throw new AnalysisException("Only doris table support routine load"); - } - - if (partitionNames == null || partitionNames.size() == 0) { - return; - } - // check partitions belong to table - Optional partitionNotInTable = partitionNames.parallelStream() - .filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst(); - if (partitionNotInTable != null && partitionNotInTable.isPresent()) { - throw new AnalysisException("Partition " + partitionNotInTable.get() - + " does not belong to table " + tableName); - } - } - public void updateState(JobState jobState, String reason, boolean isReplay) { writeLock(); try { @@ -775,6 +765,9 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is .build()); checkStateTransform(jobState); switch (jobState) { + case RUNNING: + executeRunning(); + break; case PAUSED: executePause(reason); break; @@ -795,7 +788,7 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().unregister(id); } - if (!isReplay) { + if (!isReplay && jobState != JobState.RUNNING) { Catalog.getInstance().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -805,6 +798,10 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is .build()); } + private void executeRunning() { + state = JobState.RUNNING; + } + private void executePause(String reason) { // remove all of task in jobs and change job state to paused pausedReason = reason; @@ -836,22 +833,38 @@ public void update() { if (database == null) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("db_id", dbId) - .add("msg", "The database has been deleted. Change job state to stopped").build()); - updateState(JobState.STOPPED, "db not exist", false /* not replay */); + .add("msg", "The database has been deleted. Change job state to cancelled").build()); + writeLock(); + try { + if (!state.isFinalState()) { + unprotectUpdateState(JobState.CANCELLED, "db not exist", false /* not replay */); + } + } finally { + writeUnlock(); + } } + + // check table belong to database database.readLock(); + Table table; try { - Table table = database.getTable(tableId); - // check table belong to database - if (table == null) { - LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id).add("db_id", dbId) - .add("table_id", tableId) - .add("msg", "The table has been deleted Change job state to stopped").build()); - updateState(JobState.STOPPED, "table not exist", false /* not replay */); - } + table = database.getTable(tableId); } finally { database.readUnlock(); } + if (table == null) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id).add("db_id", dbId) + .add("table_id", tableId) + .add("msg", "The table has been deleted change job state to cancelled").build()); + writeLock(); + try { + if (!state.isFinalState()) { + unprotectUpdateState(JobState.CANCELLED, "table not exist", false /* not replay */); + } + } finally { + writeUnlock(); + } + } // check if partition has been changed writeLock(); @@ -879,13 +892,59 @@ public void setOrigStmt(String origStmt) { this.origStmt = origStmt; } - public String getOrigStmt() { - return origStmt; - } - // check the correctness of commit info abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + public List getShowInfo() { + List row = Lists.newArrayList(); + row.add(String.valueOf(id)); + row.add(name); + row.add(String.valueOf(dbId)); + row.add(String.valueOf(tableId)); + row.add(getState().name()); + row.add(dataSourceType.name()); + row.add(jobPropertiesToJsonString()); + row.add(dataSourcePropertiesJsonToString()); + row.add(String.valueOf(currentTaskConcurrentNum)); + row.add(String.valueOf(totalRows)); + row.add(String.valueOf(errorRows)); + row.add(getProgress().toJsonString()); + switch (state) { + case PAUSED: + row.add(pausedReason); + break; + case CANCELLED: + row.add(cancelReason); + break; + default: + row.add(""); + } + return row; + } + + public List> getTasksShowInfo() { + List> rows = Lists.newArrayList(); + routineLoadTaskInfoList.stream().forEach(entity -> rows.add(entity.getTaskShowInfo())); + return rows; + } + + private String jobPropertiesToJsonString() { + Map jobProperties = Maps.newHashMap(); + jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions)); + jobProperties.put("columnToColumnExpr", columnToColumnExpr == null ? + STAR_STRING : Joiner.on(",").withKeyValueSeparator(":").join(columnToColumnExpr)); + jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toString()); + jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString()); + jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum)); + jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS)); + jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows)); + jobProperties.put("maxBatchSizeBytes", String.valueOf(maxBatchSizeBytes)); + Gson gson = new Gson(); + return gson.toJson(jobProperties); + } + + abstract String dataSourcePropertiesJsonToString(); + public static RoutineLoadJob read(DataInput in) throws IOException { RoutineLoadJob job = null; LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); @@ -970,7 +1029,7 @@ public void readFields(DataInput in) throws IOException { try { stmt = (CreateRoutineLoadStmt) parser.parse().value; stmt.checkLoadProperties(null); - routineLoadDesc = stmt.getRoutineLoadDesc(); + setRoutineLoadDesc(stmt.getRoutineLoadDesc()); } catch (Throwable e) { throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 3f160c7caf1606..a3153ed115865f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -47,6 +47,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -118,13 +119,14 @@ public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, St throws UserException { // check load auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - createRoutineLoadStmt.getDBTableName().getDb(), - createRoutineLoadStmt.getDBTableName().getTbl(), + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getTableName(), PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - createRoutineLoadStmt.getDBTableName()); + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getTableName()); } RoutineLoadJob routineLoadJob = null; @@ -194,10 +196,11 @@ private boolean isNameUsed(Long dbId, String name) { return false; } - public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throws DdlException, AnalysisException { - RoutineLoadJob routineLoadJob = getJobByName(pauseRoutineLoadStmt.getName()); + public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) + throws DdlException, AnalysisException, MetaNotFoundException { + RoutineLoadJob routineLoadJob = getJobByName(pauseRoutineLoadStmt.getDbFullName(), pauseRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new DdlException("There is not routine load job with name " + pauseRoutineLoadStmt.getName()); + throw new DdlException("There is not operable routine load job with name " + pauseRoutineLoadStmt.getName()); } // check auth String dbFullName; @@ -219,16 +222,20 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throw } routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, - "User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job", - false /* not replay */); - LOG.info("pause routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); + "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job", + false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been paused by user") + .build()); } public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException, - AnalysisException { - RoutineLoadJob routineLoadJob = getJobByName(resumeRoutineLoadStmt.getName()); + AnalysisException, MetaNotFoundException { + RoutineLoadJob routineLoadJob = getJobByName(resumeRoutineLoadStmt.getDBFullName(), resumeRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new DdlException("There is not routine load job with name " + resumeRoutineLoadStmt.getName()); + throw new DdlException("There is not operable routine load job with name " + resumeRoutineLoadStmt.getName() + "."); } // check auth String dbFullName; @@ -248,14 +255,19 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th ConnectContext.get().getRemoteIP(), tableName); } - routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, "user operation", false /* not replay */); - LOG.info("resume routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); + routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been resumed by user") + .build()); } - public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException { - RoutineLoadJob routineLoadJob = getJobByName(stopRoutineLoadStmt.getName()); + public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) + throws DdlException, AnalysisException, MetaNotFoundException { + RoutineLoadJob routineLoadJob = getJobByName(stopRoutineLoadStmt.getDBFullName(), stopRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new DdlException("There is not routine load job with name " + stopRoutineLoadStmt.getName()); + throw new DdlException("There is not operable routine load job with name " + stopRoutineLoadStmt.getName()); } // check auth String dbFullName; @@ -275,8 +287,14 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws D ConnectContext.get().getRemoteIP(), tableName); } - routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, "user operation", false /* not replay */); - LOG.info("stop routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); + routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, + "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job", + false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been stopped by user") + .build()); } public int getSizeOfIdToRoutineLoadTask() { @@ -373,31 +391,52 @@ public RoutineLoadJob getJob(long jobId) { return idToRoutineLoadJob.get(jobId); } - public RoutineLoadJob getJobByName(String jobName) { - String dbfullName = ConnectContext.get().getDatabase(); - Database database = Catalog.getCurrentCatalog().getDb(dbfullName); - if (database == null) { + public RoutineLoadJob getJobByName(String dbFullName, String jobName) throws MetaNotFoundException { + List routineLoadJobList = getJobByName(dbFullName, jobName, false); + if (routineLoadJobList == null || routineLoadJobList.size() == 0) { return null; + } else { + return routineLoadJobList.get(0); } - readLock(); - try { - Map> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(database.getId()); - if (nameToRoutineLoadJob == null) { - return null; + } + + public List getJobByName(String dbFullName, String jobName, boolean includeHistory) + throws MetaNotFoundException { + // return all of routine load job + List result; + RESULT: + { + if (dbFullName == null) { + result = new ArrayList<>(idToRoutineLoadJob.values()); + break RESULT; } - List routineLoadJobList = nameToRoutineLoadJob.get(jobName); - if (routineLoadJobList == null) { - return null; + + long dbId = 0L; + Database database = Catalog.getCurrentCatalog().getDb(dbFullName); + if (database == null) { + throw new MetaNotFoundException("failed to find database by dbFullName " + dbFullName); } - Optional optional = routineLoadJobList.stream() - .filter(entity -> !entity.getState().isFinalState()).findFirst(); - if (!optional.isPresent()) { - return null; + dbId = database.getId(); + if (!dbToNameToRoutineLoadJob.containsKey(dbId)) { + result = new ArrayList<>(); + break RESULT; } - return optional.get(); - } finally { - readUnlock(); + if (jobName == null) { + result = dbToNameToRoutineLoadJob.get(dbId).values().stream().flatMap(x -> x.stream()) + .collect(Collectors.toList()); + break RESULT; + } + if (dbToNameToRoutineLoadJob.get(dbId).containsKey(jobName)) { + result = new ArrayList<>(dbToNameToRoutineLoadJob.get(dbId).get(jobName)); + break RESULT; + } + return null; } + + if (!includeHistory) { + result = result.stream().filter(entity -> !entity.getState().isFinalState()).collect(Collectors.toList()); + } + return result; } public RoutineLoadJob getJobByTaskId(UUID taskId) throws MetaNotFoundException { @@ -468,19 +507,26 @@ public void replayRemoveOldRoutineLoad(RoutineLoadOperation operation) { public void updateRoutineLoadJob() { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - routineLoadJob.update(); + if (!routineLoadJob.state.isFinalState()) { + routineLoadJob.update(); + } } } public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { unprotectedAddJob(routineLoadJob); - LOG.info("replay add routine load job: {}", routineLoadJob.getId()); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("msg", "replay create routine load job") + .build()); } public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { RoutineLoadJob job = getJob(operation.getId()); - job.updateState(operation.getJobState(), "replay", true /* is replay */); - LOG.info("replay change routine load job: {}, state: {}", operation.getId(), operation.getJobState()); + job.updateState(operation.getJobState(), null, true /* is replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId()) + .add("current_state", operation.getJobState()) + .add("msg", "replay change routine load job") + .build()); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index 0b0eb90fe9c87e..344fdc1569be28 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -39,6 +39,8 @@ public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) { abstract void update(RoutineLoadProgress progress); + abstract String toJsonString(); + public static RoutineLoadProgress read(DataInput in) throws IOException { RoutineLoadProgress progress = null; LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index abdf911a34a33a..1ec68c352a987c 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -71,6 +71,8 @@ private void process() { LOG.info("there are {} job need schedule", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { + RoutineLoadJob.JobState errorJobState = null; + Throwable throwable = null; try { // create plan of routine load job routineLoadJob.plan(); @@ -91,11 +93,28 @@ private void process() { // check state and divide job into tasks routineLoadJob.divideRoutineLoadJob(desiredConcurrentTaskNum); } catch (MetaNotFoundException e) { - routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED, e.getMessage(), false /* not replay */); + errorJobState = RoutineLoadJob.JobState.CANCELLED; + throwable = e; } catch (Throwable e) { - LOG.warn("failed to scheduler job, change job state to paused", e); - routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, e.getMessage(), false /* not replay */); - continue; + errorJobState = RoutineLoadJob.JobState.PAUSED; + throwable = e; + } + + if (errorJobState != null) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("desired_state", errorJobState) + .add("warn_msg", "failed to scheduler job, change job state to desired_state with error reason " + throwable.getMessage()) + .build(), throwable); + try { + routineLoadJob.updateState(errorJobState, throwable.getMessage(), false); + } catch (Throwable e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("desired_state", errorJobState) + .add("warn_msg", "failed to change state to desired state") + .build(), e); + } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 35608d8280e6b6..c535da4947bbe9 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -18,16 +18,19 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; +import java.util.List; import java.util.UUID; /** @@ -41,12 +44,12 @@ public abstract class RoutineLoadTaskInfo { private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); protected UUID id; - protected long txnId; + protected long txnId = -1L; protected long jobId; protected String clusterName; private long createTimeMs; - private long loadStartTimeMs; + private long loadStartTimeMs = -1L; // the be id of previous task protected long previousBeId = -1L; // the be id of this task @@ -113,6 +116,20 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti routineLoadJob.getDbId(), DebugUtil.printId(id), -1, "streamLoad", TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId()); } + + public List getTaskShowInfo() { + List row = Lists.newArrayList(); + row.add(DebugUtil.printId(id)); + row.add(String.valueOf(txnId)); + row.add(String.valueOf(jobId)); + row.add(String.valueOf(TimeUtils.longToTimeString(createTimeMs))); + row.add(String.valueOf(TimeUtils.longToTimeString(loadStartTimeMs))); + row.add(String.valueOf(beId)); + row.add(getTaskDataSourceProperties()); + return row; + } + + abstract String getTaskDataSourceProperties(); @Override public boolean equals(Object obj) { diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 69914661aa8360..f540403f8dafc1 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import com.google.common.base.Strings; import org.apache.doris.analysis.AdminShowConfigStmt; import org.apache.doris.analysis.AdminShowReplicaDistributionStmt; import org.apache.doris.analysis.AdminShowReplicaStatusStmt; @@ -50,6 +51,7 @@ import org.apache.doris.analysis.ShowRolesStmt; import org.apache.doris.analysis.ShowRollupStmt; import org.apache.doris.analysis.ShowRoutineLoadStmt; +import org.apache.doris.analysis.ShowRoutineLoadTaskStmt; import org.apache.doris.analysis.ShowSnapshotStmt; import org.apache.doris.analysis.ShowStmt; import org.apache.doris.analysis.ShowTableStatusStmt; @@ -88,6 +90,8 @@ import org.apache.doris.common.proc.PartitionsProcDir; import org.apache.doris.common.proc.ProcNodeInterface; import org.apache.doris.common.proc.TabletsProcDir; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; @@ -169,6 +173,8 @@ public ShowResultSet execute() throws AnalysisException { handleShowLoadWarnings(); } else if (stmt instanceof ShowRoutineLoadStmt) { handleShowRoutineLoad(); + } else if (stmt instanceof ShowRoutineLoadTaskStmt) { + handleShowRoutineLoadTask(); } else if (stmt instanceof ShowDeleteStmt) { handleShowDelete(); } else if (stmt instanceof ShowAlterStmt) { @@ -794,21 +800,86 @@ private void handleShowLoadWarningsFromURL(ShowLoadWarningsStmt showWarningsStmt private void handleShowRoutineLoad() throws AnalysisException { ShowRoutineLoadStmt showRoutineLoadStmt = (ShowRoutineLoadStmt) stmt; + List> rows = Lists.newArrayList(); + // if job exists + List routineLoadJobList; + try { + routineLoadJobList = + Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadStmt.getDbFullName(), + showRoutineLoadStmt.getName(), + showRoutineLoadStmt.isIncludeHistory()); + } catch (MetaNotFoundException e) { + LOG.warn(e.getMessage(), e); + throw new AnalysisException(e.getMessage()); + } + + if (routineLoadJobList != null) { + // check auth + String dbFullName = showRoutineLoadStmt.getDbFullName(); + String tableName; + for (RoutineLoadJob routineLoadJob : routineLoadJobList) { + try { + tableName = routineLoadJob.getTableName(); + } catch (MetaNotFoundException e) { + // TODO(ml): how to show the cancelled job caused by deleted table + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("error_msg", "The table metadata of job has been changed. " + + "The job will be cancelled automatically") + .build(), e); + continue; + } + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbFullName, + tableName, + PrivPredicate.LOAD)) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("operator", "show routine load job") + .add("user", ConnectContext.get().getQualifiedUser()) + .add("remote_ip", ConnectContext.get().getRemoteIP()) + .add("db_full_name", dbFullName) + .add("table_name", tableName) + .add("error_msg", "The table access denied")); + continue; + } + + // get routine load info + rows.add(routineLoadJob.getShowInfo()); + } + } + + if (!Strings.isNullOrEmpty(showRoutineLoadStmt.getName()) && rows.size() == 0) { + // if the jobName has been specified + throw new AnalysisException("There is no job named " + showRoutineLoadStmt.getName() + + " in db " + showRoutineLoadStmt.getDbFullName() + + " include history " + showRoutineLoadStmt.isIncludeHistory()); + } + resultSet = new ShowResultSet(showRoutineLoadStmt.getMetaData(), rows); + } + + private void handleShowRoutineLoadTask() throws AnalysisException { + ShowRoutineLoadTaskStmt showRoutineLoadTaskStmt = (ShowRoutineLoadTaskStmt) stmt; + List> rows = Lists.newArrayList(); // if job exists - RoutineLoadJob routineLoadJob = - Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadStmt.getName()); + RoutineLoadJob routineLoadJob; + try { + routineLoadJob = Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadTaskStmt.getDbFullName(), + showRoutineLoadTaskStmt.getJobName()); + } catch (MetaNotFoundException e) { + LOG.warn(e.getMessage(), e); + throw new AnalysisException(e.getMessage()); + } if (routineLoadJob == null) { - throw new AnalysisException("There is no routine load job with name " + showRoutineLoadStmt.getName()); + throw new AnalysisException("The job named " + showRoutineLoadTaskStmt.getJobName() + "does not exists " + + "or job state is stopped or cancelled"); } // check auth - String dbFullName; + String dbFullName = showRoutineLoadTaskStmt.getDbFullName(); String tableName; try { - dbFullName = routineLoadJob.getDbFullName(); tableName = routineLoadJob.getTableName(); } catch (MetaNotFoundException e) { - throw new AnalysisException("The metadata of job has been changed. The job will be cancelled automatically", e); + throw new AnalysisException("The table metadata of job has been changed. The job will be cancelled automatically", e); } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbFullName, @@ -820,20 +891,9 @@ private void handleShowRoutineLoad() throws AnalysisException { tableName); } - // get routine load info - List> rows = Lists.newArrayList(); - List row = Lists.newArrayList(); - row.add(String.valueOf(routineLoadJob.getId())); - row.add(routineLoadJob.getName()); - row.add(String.valueOf(routineLoadJob.getDbId())); - row.add(String.valueOf(routineLoadJob.getTableId())); - row.add(routineLoadJob.getPartitions()); - row.add(routineLoadJob.getState().name()); - row.add(String.valueOf(routineLoadJob.getDesiredConcurrentNumber())); - row.add(routineLoadJob.getProgress().toString()); - rows.add(row); - - resultSet = new ShowResultSet(showRoutineLoadStmt.getMetaData(), rows); + // get routine load task info + rows.addAll(routineLoadJob.getTasksShowInfo()); + resultSet = new ShowResultSet(showRoutineLoadTaskStmt.getMetaData(), rows); } // Show user property statement diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 040d492ad92398..a477a2a920738c 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -143,27 +143,10 @@ public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { } private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) { - if (routineLoadJob.getRoutineLoadDesc() != null) { - RoutineLoadDesc routineLoadDesc = routineLoadJob.getRoutineLoadDesc(); - if (routineLoadDesc.getColumnsInfo() != null) { - ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo(); - if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { - columnToColumnExpr = Maps.newHashMap(); - for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { - columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr()); - } - } - } - if (routineLoadDesc.getWherePredicate() != null) { - whereExpr = routineLoadDesc.getWherePredicate().getExpr(); - } - if (routineLoadDesc.getColumnSeparator() != null) { - columnSeparator = routineLoadDesc.getColumnSeparator(); - } - if (routineLoadDesc.getPartitionNames() != null && routineLoadDesc.getPartitionNames().size() != 0) { - partitions = Joiner.on(",").join(routineLoadDesc.getPartitionNames()); - } - } + columnToColumnExpr = routineLoadJob.getColumnToColumnExpr(); + whereExpr = routineLoadJob.getWhereExpr(); + columnSeparator = routineLoadJob.getColumnSeparator(); + partitions = routineLoadJob.getPartitions() == null ? null : Joiner.on(",").join(routineLoadJob.getPartitions()); } private void setColumnToColumnExpr(String columns) throws UserException { diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index 40f8c7660e1687..c3d33347abe43f 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -198,6 +198,7 @@ import org.apache.doris.common.util.SqlUtils; keywordMap.put("pause", new Integer(SqlParserSymbols.KW_PAUSE)); keywordMap.put("resume", new Integer(SqlParserSymbols.KW_RESUME)); keywordMap.put("stop", new Integer(SqlParserSymbols.KW_STOP)); + keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK)); keywordMap.put("local", new Integer(SqlParserSymbols.KW_LOCAL)); keywordMap.put("location", new Integer(SqlParserSymbols.KW_LOCATION)); keywordMap.put("max", new Integer(SqlParserSymbols.KW_MAX)); diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index 70eef69c0e4cb4..574f51512279fd 100644 --- a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -45,6 +45,7 @@ public class CreateRoutineLoadStmtTest { public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) throws UserException { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; String topicName = "topic1"; String serverAddress = "http://127.0.0.1:8080"; @@ -55,7 +56,6 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro ColumnSeparator columnSeparator = new ColumnSeparator(","); // duplicate load property - TableName tableName = new TableName(dbName, tableNameString); List loadPropertyList = new ArrayList<>(); loadPropertyList.add(columnSeparator); loadPropertyList.add(columnSeparator); @@ -68,7 +68,7 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); @@ -91,6 +91,7 @@ public void analyze(Analyzer analyzer1) { public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; String topicName = "topic1"; String serverAddress = "127.0.0.1:8080"; @@ -114,7 +115,7 @@ public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); new MockUp() { diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 06c6d5bbb7acc3..0ce34289531b9c 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.TableName; @@ -75,6 +76,7 @@ public class KafkaRoutineLoadJobTest { private String jobName = "job1"; private String dbName = "db1"; + private LabelName labelName = new LabelName(dbName, jobName); private String tableNameString = "table1"; private String topicName = "topic1"; private String serverAddress = "http://127.0.0.1:8080"; @@ -219,8 +221,9 @@ public void testProcessTimeOutTasks(@Mocked KafkaConsumer kafkaConsumer, }; List routineLoadTaskInfoList = new ArrayList<>(); - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster"); - kafkaTaskInfo.addKafkaPartition(100); + Map partitionIdsToOffset = Maps.newHashMap(); + partitionIdsToOffset.put(100, 0L); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", partitionIdsToOffset); kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000); routineLoadTaskInfoList.add(kafkaTaskInfo); @@ -357,7 +360,7 @@ public void testFromCreateStmt(@Mocked Catalog catalog, Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic")); List kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "customKafkaPartitions"); Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult)); - Assert.assertEquals(routineLoadDesc, kafkaRoutineLoadJob.getRoutineLoadDesc()); +// Assert.assertEquals(routineLoadDesc, kafkaRoutineLoadJob.getRoutineLoadDesc()); } private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { @@ -374,7 +377,7 @@ private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); return createRoutineLoadStmt; diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index d1685efd757a88..4d6d8be221e003 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; @@ -68,6 +69,7 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, @Mocked Catalog catalog) throws UserException { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; TableName tableName = new TableName(dbName, tableNameString); List loadPropertyList = new ArrayList<>(); @@ -81,7 +83,7 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); @@ -133,6 +135,7 @@ public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, @Mocked Catalog catalog) { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; TableName tableName = new TableName(dbName, tableNameString); List loadPropertyList = new ArrayList<>(); @@ -146,7 +149,7 @@ public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 80cbea8f222030..c05dbad770981e 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -58,22 +58,19 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 MetaNotFoundException, AnalysisException, LabelAlreadyUsedException, BeginTransactionException { long beId = 100L; + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 100L); + partitionIdToOffset.put(2, 200L); + KafkaProgress kafkaProgress = new KafkaProgress(); + Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); + Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l, "default_cluster"); - routineLoadTaskInfo1.addKafkaPartition(1); - routineLoadTaskInfo1.addKafkaPartition(2); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l, "default_cluster", partitionIdToOffset); routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); - Map idToRoutineLoadTask = Maps.newHashMap(); idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); - Map partitionIdToOffset = Maps.newHashMap(); - partitionIdToOffset.put(1, 100L); - partitionIdToOffset.put(2, 200L); - KafkaProgress kafkaProgress = new KafkaProgress(); - kafkaProgress.setPartitionIdToOffset(partitionIdToOffset); - Map idToRoutineLoadJob = Maps.newConcurrentMap(); idToRoutineLoadJob.put("1", routineLoadJob); diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 010575a55cdf46..65f34a5513cf2b 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -328,7 +328,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Map oldKafkaProgressMap = Maps.newHashMap(); oldKafkaProgressMap.put(1, 0L); KafkaProgress oldkafkaProgress = new KafkaProgress(); - oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); + Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); @@ -401,7 +401,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Map oldKafkaProgressMap = Maps.newHashMap(); oldKafkaProgressMap.put(1, 0L); KafkaProgress oldkafkaProgress = new KafkaProgress(); - oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); + Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);