From e2ab2fb0335674070cddfef27d8e8967fa2630c5 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Wed, 20 Mar 2019 17:47:01 +0800 Subject: [PATCH] Implement ShowRoutineLoadStmt and ShowRoutineLoadTaskStmt 1. ShowRoutineLoadStmt is sames like class description. It does not support show all of routine load job in all of db 2. ShowRoutineLoadTaskStmt is sames like class description. It does not support show all of routine laod task in all of job 3. Init partitionIdsToOffset in constructor of KafkaProgress 4. Change Create/Pause/Resume/Stop routine load job to LabelName such as [db.]name 5. Exclude final job when updating job 6. Catch all of exception when scheduling one job. The exception will not block the another jobs. --- fe/src/main/cup/sql_parser.cup | 50 +++- .../doris/analysis/CreateRoutineLoadStmt.java | 35 ++- .../doris/analysis/PauseRoutineLoadStmt.java | 19 +- .../doris/analysis/ResumeRoutineLoadStmt.java | 18 +- .../doris/analysis/ShowRoutineLoadStmt.java | 86 +++++- .../analysis/ShowRoutineLoadTaskStmt.java | 144 +++++++++ .../doris/analysis/StopRoutineLoadStmt.java | 18 +- .../doris/load/routineload/KafkaProgress.java | 30 +- .../load/routineload/KafkaRoutineLoadJob.java | 48 ++- .../doris/load/routineload/KafkaTaskInfo.java | 33 +-- .../load/routineload/RoutineLoadJob.java | 277 +++++++++++------- .../load/routineload/RoutineLoadManager.java | 130 +++++--- .../load/routineload/RoutineLoadProgress.java | 2 + .../routineload/RoutineLoadScheduler.java | 27 +- .../load/routineload/RoutineLoadTaskInfo.java | 21 +- .../org/apache/doris/qe/ShowExecutor.java | 100 +++++-- .../org/apache/doris/task/StreamLoadTask.java | 25 +- fe/src/main/jflex/sql_scanner.flex | 1 + .../analysis/CreateRoutineLoadStmtTest.java | 7 +- .../routineload/KafkaRoutineLoadJobTest.java | 11 +- .../routineload/RoutineLoadManagerTest.java | 7 +- .../RoutineLoadTaskSchedulerTest.java | 17 +- .../transaction/GlobalTransactionMgrTest.java | 4 +- 23 files changed, 791 insertions(+), 319 deletions(-) create mode 100644 fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 6febb436eafc76..ceab4cee7e1eda 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -206,7 +206,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_JOIN, KW_KEY, KW_KILL, KW_LABEL, KW_LARGEINT, KW_LAST, KW_LEFT, KW_LESS, KW_LEVEL, KW_LIKE, KW_LIMIT, KW_LINK, KW_LOAD, - KW_ROUTINE, KW_PAUSE, KW_RESUME, KW_STOP, + KW_ROUTINE, KW_PAUSE, KW_RESUME, KW_STOP, KW_TASK, KW_LOCAL, KW_LOCATION, KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY, KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS, @@ -243,7 +243,8 @@ terminal String COMMENTED_PLAN_HINTS; // Statement that the result of this parser. nonterminal StatementBase query, stmt, show_stmt, show_param, help_stmt, load_stmt, - create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, show_routine_load_stmt, + create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, + show_routine_load_stmt, show_routine_load_task_stmt, describe_stmt, alter_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_where_stmt; @@ -374,6 +375,7 @@ nonterminal AccessPrivilege privilege_type; nonterminal DataDescription data_desc; nonterminal List data_desc_list; nonterminal LabelName job_label; +nonterminal LabelName opt_job_label; nonterminal String opt_system; nonterminal String opt_cluster; nonterminal BrokerDesc opt_broker; @@ -529,6 +531,8 @@ stmt ::= {: RESULT = stmt; :} | show_routine_load_stmt : stmt {: RESULT = stmt; :} + | show_routine_load_task_stmt : stmt + {: RESULT = stmt; :} | cancel_stmt : stmt {: RESULT = stmt; :} | delete_stmt : stmt @@ -1001,6 +1005,17 @@ load_stmt ::= :} ; +opt_job_label ::= + /* Empty */ + {: + RESULT = null; + :} + | job_label:jobLabel + {: + RESULT = jobLabel; + :} + ; + job_label ::= ident:label {: @@ -1147,12 +1162,12 @@ opt_cluster ::= // Routine load statement create_routine_load_stmt ::= - KW_CREATE KW_ROUTINE KW_LOAD ident:jobName KW_ON table_name:dbTableName + KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel KW_ON ident:tableName opt_load_property_list:loadPropertyList opt_properties:properties KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN {: - RESULT = new CreateRoutineLoadStmt(jobName, dbTableName, loadPropertyList, properties, type, customProperties); + RESULT = new CreateRoutineLoadStmt(jobLabel, tableName, loadPropertyList, properties, type, customProperties); :} ; @@ -1191,30 +1206,41 @@ load_property ::= ; pause_routine_load_stmt ::= - KW_PAUSE KW_ROUTINE KW_LOAD ident:name + KW_PAUSE KW_ROUTINE KW_LOAD job_label:jobLabel {: - RESULT = new PauseRoutineLoadStmt(name); + RESULT = new PauseRoutineLoadStmt(jobLabel); :} ; resume_routine_load_stmt ::= - KW_RESUME KW_ROUTINE KW_LOAD ident:name + KW_RESUME KW_ROUTINE KW_LOAD job_label:jobLabel {: - RESULT = new ResumeRoutineLoadStmt(name); + RESULT = new ResumeRoutineLoadStmt(jobLabel); :} ; stop_routine_load_stmt ::= - KW_STOP KW_ROUTINE KW_LOAD ident:name + KW_STOP KW_ROUTINE KW_LOAD job_label:jobLabel {: - RESULT = new StopRoutineLoadStmt(name); + RESULT = new StopRoutineLoadStmt(jobLabel); :} ; show_routine_load_stmt ::= - KW_SHOW KW_ROUTINE KW_LOAD ident:name + KW_SHOW KW_ROUTINE KW_LOAD opt_job_label:jobLabel + {: + RESULT = new ShowRoutineLoadStmt(jobLabel, false); + :} + | KW_SHOW KW_ALL KW_ROUTINE KW_LOAD opt_job_label:jobLabel + {: + RESULT = new ShowRoutineLoadStmt(jobLabel, true); + :} + ; + +show_routine_load_task_stmt ::= + KW_SHOW KW_ROUTINE KW_LOAD KW_TASK opt_db:dbName opt_wild_where {: - RESULT = new ShowRoutineLoadStmt(name); + RESULT = new ShowRoutineLoadTaskStmt(dbName, parser.where); :} ; diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index d8b5bc9ebc1334..ae81f0a44792d4 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -38,7 +38,7 @@ Create routine Load statement, continually load data from a streaming app syntax: - CREATE ROUTINE LOAD name ON database.table + CREATE ROUTINE LOAD [database.]name on table [load properties] [PROPERTIES ( @@ -108,8 +108,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(KAFKA_OFFSETS_PROPERTY) .build(); - private final String name; - private final TableName dbTableName; + private final LabelName labelName; + private final String tableName; private final List loadPropertyList; private final Map jobProperties; private final String typeName; @@ -117,6 +117,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { // the following variables will be initialized after analyze // -1 as unset, the default value will set in RoutineLoadJob + private String name; + private String dbName; private RoutineLoadDesc routineLoadDesc; private int desiredConcurrentNum = 1; private int maxErrorNum = -1; @@ -130,11 +132,11 @@ public class CreateRoutineLoadStmt extends DdlStmt { // pair private List> kafkaPartitionOffsets = Lists.newArrayList(); - public CreateRoutineLoadStmt(String name, TableName dbTableName, List loadPropertyList, + public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, Map jobProperties, String typeName, Map dataSourceProperties) { - this.name = name; - this.dbTableName = dbTableName; + this.labelName = labelName; + this.tableName = tableName; this.loadPropertyList = loadPropertyList; this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties; this.typeName = typeName.toUpperCase(); @@ -145,8 +147,12 @@ public String getName() { return name; } - public TableName getDBTableName() { - return dbTableName; + public String getDBName() { + return dbName; + } + + public String getTableName() { + return tableName; } public String getTypeName() { @@ -192,10 +198,10 @@ public List> getKafkaPartitionOffsets() { @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); + // check dbName and tableName + checkDBTable(analyzer); // check name FeNameFormat.checkCommonName(NAME_TYPE, name); - // check dbName and tableName - dbTableName.analyze(analyzer); // check load properties include column separator etc. checkLoadProperties(analyzer); // check routine load job properties include desired concurrent number etc. @@ -204,6 +210,15 @@ public void analyze(Analyzer analyzer) throws UserException { checkDataSourceProperties(); } + public void checkDBTable(Analyzer analyzer) throws AnalysisException { + labelName.analyze(analyzer); + dbName = labelName.getDbName(); + name = labelName.getLabelName(); + if (Strings.isNullOrEmpty(tableName)) { + throw new AnalysisException("Table name should not be null"); + } + } + public void checkLoadProperties(Analyzer analyzer) throws UserException { if (loadPropertyList == null) { return; diff --git a/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java index a7227211ad428d..18f7633806c0c7 100644 --- a/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java @@ -25,26 +25,27 @@ Pause routine load by name syntax: - PAUSE ROUTINE LOAD name + PAUSE ROUTINE LOAD [database.]name */ public class PauseRoutineLoadStmt extends DdlStmt { - private final String name; + private final LabelName labelName; - public PauseRoutineLoadStmt(String name) { - this.name = name; + public PauseRoutineLoadStmt(LabelName labelName) { + this.labelName = labelName; } public String getName() { - return name; + return labelName.getLabelName(); + } + + public String getDbFullName(){ + return labelName.getDbName(); } @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); - - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); - } + labelName.analyze(analyzer); } } diff --git a/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java index 28bd4ae6833840..9bd8b6e142ec71 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java @@ -25,25 +25,27 @@ Resume routine load job by name syntax: - RESUME ROUTINE LOAD name + RESUME ROUTINE LOAD [database.]name */ public class ResumeRoutineLoadStmt extends DdlStmt{ - private final String name; + private final LabelName labelName; - public ResumeRoutineLoadStmt(String name) { - this.name = name; + public ResumeRoutineLoadStmt(LabelName labelName) { + this.labelName = labelName; } public String getName() { - return name; + return labelName.getLabelName(); + } + + public String getDBFullName(){ + return labelName.getDbName(); } @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); - } + labelName.analyze(analyzer); } } diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 3ff0d283ae2f50..71a9f871a8f050 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.qe.ShowResultSetMetaData; @@ -30,40 +31,95 @@ Show routine load progress by routine load name syntax: - SHOW ROUTINE LOAD name + SHOW [ALL] ROUTINE LOAD [database.][name] + + without ALL: only show job which is not final + with ALL: show all of job include history job + + without name: show all of routine load job with different name + with name: show all of job named ${name} + + without on db: show all of job in connection db + if user does not choose db before, return error + with on db: show all of job in ${db} + + example: + show routine load named test in database1 + SHOW ROUTINE LOAD database1.test; + + show routine load in database1 + SHOW ROUTINE LOAD database1; + + show routine load in database1 include history + use database1; + SHOW ALL ROUTINE LOAD; + + show routine load in all of database + please use show proc */ public class ShowRoutineLoadStmt extends ShowStmt { private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("id") - .add("name") - .add("db_id") - .add("table_id") - .add("partitions") - .add("state") - .add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY) - .add("progress") + .add("Id") + .add("Name") + .add("DBId") + .add("TableId") + .add("State") + .add("DataSourceType") + .add("JobProperties") + .add("DataSourceProperties") + .add("CurrentTaskConcurrentNumber") + .add("TotalRows") + .add("TotalErrorRows") + .add("Progress") + .add("ReasonOfStateChanged") .build(); - private final String name; + private final LabelName labelName; + private String dbFullName; // optional + private String name; // optional + private boolean includeHistory = false; + - public ShowRoutineLoadStmt(String name) { - this.name = name; + public ShowRoutineLoadStmt(LabelName labelName, boolean includeHistory) { + this.labelName = labelName; + this.includeHistory = includeHistory; + } + + public String getDbFullName() { + return dbFullName; } public String getName() { return name; } + public boolean isIncludeHistory() { + return includeHistory; + } + @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); + checkLabelName(analyzer); + } + + private void checkLabelName(Analyzer analyzer) throws AnalysisException { + String dbName = labelName == null ? null : labelName.getDbName(); + if (Strings.isNullOrEmpty(dbName)) { + dbFullName = analyzer.getContext().getDatabase(); + if (Strings.isNullOrEmpty(dbFullName)) { + throw new AnalysisException("please choose a database firstly " + + "such as use db, show routine load db.name etc."); + } + } else { + dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName); } + name = labelName == null ? null : labelName.getLabelName(); } + @Override public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java new file mode 100644 index 00000000000000..0649ef2feb18fe --- /dev/null +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.analysis; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ShowResultSetMetaData; + +import java.util.Arrays; +import java.util.List; + +/* + show all of task belong to job + SHOW ROUTINE LOAD TASK FROM DB where expr; + + where expr: JobName=xxx + */ +public class ShowRoutineLoadTaskStmt extends ShowStmt { + private static final List supportColumn = Arrays.asList("JobName"); + private static final ImmutableList TITLE_NAMES = + new ImmutableList.Builder() + .add("TaskId") + .add("TxnId") + .add("JobId") + .add("CreateTimeMs") + .add("LoadStartTimeMs") + .add("BeId") + .add("DataSourceProperties") + .build(); + + private final String dbName; + private final Expr jobNameExpr; + + private String jobName; + private String dbFullName; + + public ShowRoutineLoadTaskStmt(String dbName, Expr jobNameExpr) { + this.dbName = dbName; + this.jobNameExpr = jobNameExpr; + } + + public String getJobName() { + return jobName; + } + + public String getDbFullName() { + return dbFullName; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + checkDB(analyzer); + checkJobNameExpr(analyzer); + } + + private void checkDB(Analyzer analyzer) throws AnalysisException { + if (Strings.isNullOrEmpty(dbName)) { + if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { + throw new AnalysisException("please designate a database in show stmt"); + } + dbFullName = analyzer.getDefaultDb(); + } else { + dbFullName = ClusterNamespace.getFullName(analyzer.getClusterName(), dbName); + } + } + + private void checkJobNameExpr(Analyzer analyzer) throws AnalysisException { + if (jobNameExpr == null) { + throw new AnalysisException("please designate a name in where expr such as name=xxx"); + } + + boolean valid = true; + CHECK: + { + // check predicate + if (!(jobNameExpr instanceof BinaryPredicate)) { + valid = false; + break CHECK; + } + BinaryPredicate binaryPredicate = (BinaryPredicate) jobNameExpr; + if (binaryPredicate.getOp() != BinaryPredicate.Operator.EQ) { + valid = false; + break CHECK; + } + + // check child(0) + if (!(binaryPredicate.getChild(0) instanceof SlotRef)) { + valid = false; + break CHECK; + } + SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0); + if (!supportColumn.stream().anyMatch(entity -> entity.equals(slotRef.getColumnName()))) { + valid = false; + break CHECK; + } + + // check child(1) + if (!(binaryPredicate.getChild(1) instanceof StringLiteral)) { + valid = false; + break CHECK; + } + StringLiteral stringLiteral = (StringLiteral) binaryPredicate.getChild(1); + jobName = stringLiteral.getValue(); + } + + if (!valid) { + throw new AnalysisException("show routine load job only support one equal expr which is sames like JobName=xxx"); + } + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + + for (String title : TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(30))); + } + return builder.build(); + } +} diff --git a/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java index a8f15e4150082a..170d484f7f3ce5 100644 --- a/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/StopRoutineLoadStmt.java @@ -26,25 +26,27 @@ Stop routine load job by name syntax: - STOP ROUTINE LOAD name + STOP ROUTINE LOAD [database.]name */ public class StopRoutineLoadStmt extends DdlStmt { - private final String name; + private final LabelName labelName; - public StopRoutineLoadStmt(String name) { - this.name = name; + public StopRoutineLoadStmt(LabelName labelName) { + this.labelName = labelName; } public String getName() { - return name; + return labelName.getLabelName(); + } + + public String getDBFullName(){ + return labelName.getDbName(); } @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); - if (Strings.isNullOrEmpty(name)) { - throw new AnalysisException("routine load name could not be empty or null"); - } + labelName.analyze(analyzer); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 3eb722ca04ee83..c43a263bbb436d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.gson.Gson; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TKafkaRLTaskProgress; @@ -27,6 +28,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -37,7 +39,7 @@ public class KafkaProgress extends RoutineLoadProgress { // (partition id, begin offset) - private Map partitionIdToOffset = Maps.newHashMap(); + private Map partitionIdToOffset = Maps.newConcurrentMap(); public KafkaProgress() { super(LoadDataSourceType.KAFKA); @@ -52,12 +54,20 @@ public Map getPartitionIdToOffset() { return partitionIdToOffset; } - public void addPartitionOffset(Pair partitionOffset) { - partitionIdToOffset.put(partitionOffset.first, partitionOffset.second); + public Map getPartitionIdToOffset(List partitionIds) { + Map result = Maps.newHashMap(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + for (Integer partitionId : partitionIds) { + if (entry.getKey().equals(partitionId)) { + result.put(partitionId, entry.getValue()); + } + } + } + return result; } - public void setPartitionIdToOffset(Map partitionIdToOffset) { - this.partitionIdToOffset = partitionIdToOffset; + public void addPartitionOffset(Pair partitionOffset) { + partitionIdToOffset.put(partitionOffset.first, partitionOffset.second); } // (partition id, end offset) @@ -79,6 +89,16 @@ public void update(RoutineLoadProgress progress) { .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); } + @Override + public String toJsonString() { + Map showPartitionIdToOffset = new HashMap<>(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); + } + Gson gson = new Gson(); + return gson.toJson(showPartitionIdToOffset); + } + @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 9b0710ca1c340e..47716b31cebdc1 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,6 +17,8 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Maps; +import com.google.gson.Gson; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -51,6 +53,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -120,18 +123,21 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) { if (state == JobState.NEED_SCHEDULE) { // divide kafkaPartitions into tasks for (int i = 0; i < currentConcurrentTaskNum; i++) { - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName); + Map taskKafkaProgress = Maps.newHashMap(); + for (int j = 0; j < currentKafkaPartitions.size(); j++) { + if (j % currentConcurrentTaskNum == 0) { + int kafkaPartition = currentKafkaPartitions.get(j); + taskKafkaProgress.put(kafkaPartition, + ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition)); + } + } + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName, taskKafkaProgress); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } + // change job state to running if (result.size() != 0) { - for (int i = 0; i < currentKafkaPartitions.size(); i++) { - ((KafkaTaskInfo) routineLoadTaskInfoList.get(i % currentConcurrentTaskNum)) - .addKafkaPartition(currentKafkaPartitions.get(i)); - } - // change job state to running - // TODO(ml): edit log - state = JobState.RUNNING; + unprotectUpdateState(JobState.RUNNING, null, false); } } else { LOG.debug("Ignore to divide routine load job while job state {}", state); @@ -155,7 +161,9 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { LOG.info("current concurrent task number is min " + "(current size of partition {}, desire task concurrent num {}, alive be num {})", partitionNum, desireTaskConcurrentNum, aliveBeNum); - return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); + currentTaskConcurrentNum = + Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); + return currentTaskConcurrentNum; } // partitionIdToOffset must be not empty when loaded rows > 0 @@ -192,8 +200,10 @@ protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { @Override protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { + KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; // add new task - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo((KafkaTaskInfo) routineLoadTaskInfo); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo, + ((KafkaProgress)progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions())); // remove old task routineLoadTaskInfoList.remove(routineLoadTaskInfo); // add new task @@ -275,16 +285,16 @@ private List getAllKafkaPartitions() { public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) throws UserException { // check db and table - Database db = Catalog.getCurrentCatalog().getDb(stmt.getDBTableName().getDb()); + Database db = Catalog.getCurrentCatalog().getDb(stmt.getDBName()); if (db == null) { - ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, stmt.getDBTableName().getDb()); + ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, stmt.getDBName()); } long tableId = -1L; db.readLock(); try { - unprotectedCheckMeta(db, stmt.getDBTableName().getTbl(), stmt.getRoutineLoadDesc()); - tableId = db.getTable(stmt.getDBTableName().getTbl()).getId(); + unprotectedCheckMeta(db, stmt.getTableName(), stmt.getRoutineLoadDesc()); + tableId = db.getTable(stmt.getTableName()).getId(); } finally { db.readUnlock(); } @@ -343,6 +353,16 @@ private void setCustomKafkaPartitions(List> kafkaPartitionOf } } + @Override + protected String dataSourcePropertiesJsonToString() { + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("brokerList", brokerList); + dataSourceProperties.put("topic", topic); + dataSourceProperties.put("currentKafkaPartitions", Joiner.on(",").join(currentKafkaPartitions)); + Gson gson = new Gson(); + return gson.toJson(dataSourceProperties); + } + @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index fda5b29cef21fc..9f1dcd11bd8b30 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -17,6 +17,8 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; +import com.google.gson.Gson; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; @@ -43,39 +45,27 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); - private List partitions; + // + private Map partitionIdToOffset; - public KafkaTaskInfo(UUID id, long jobId, String clusterName) { + public KafkaTaskInfo(UUID id, long jobId, String clusterName, Map partitionIdToOffset) { super(id, jobId, clusterName); - this.partitions = new ArrayList<>(); + this.partitionIdToOffset = partitionIdToOffset; } - public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException, + public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionIdToOffset) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(), kafkaTaskInfo.getBeId()); - this.partitions = kafkaTaskInfo.getPartitions(); - } - - public void addKafkaPartition(int partition) { - partitions.add(partition); + this.partitionIdToOffset = partitionIdToOffset; } public List getPartitions() { - return partitions; + return new ArrayList<>(partitionIdToOffset.keySet()); } - // TODO: reuse plan fragment of stream load @Override public TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException { KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId); - Map partitionIdToOffset = Maps.newHashMap(); - for (Integer partitionId : partitions) { - KafkaProgress kafkaProgress = (KafkaProgress) routineLoadJob.getProgress(); - if (!kafkaProgress.getPartitionIdToOffset().containsKey(partitionId)) { - kafkaProgress.getPartitionIdToOffset().put(partitionId, 0L); - } - partitionIdToOffset.put(partitionId, kafkaProgress.getPartitionIdToOffset().get(partitionId)); - } // init tRoutineLoadTask and create plan fragment TRoutineLoadTask tRoutineLoadTask = new TRoutineLoadTask(); @@ -107,6 +97,11 @@ public TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserExcept return tRoutineLoadTask; } + @Override + protected String getTaskDataSourceProperties() { + Gson gson = new Gson(); + return gson.toJson(partitionIdToOffset); + } private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams().deepCopy(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0607045ecfd475..ef167cfc05da0b 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,10 +17,15 @@ package org.apache.doris.load.routineload; +import com.google.common.base.Joiner; +import com.google.gson.Gson; +import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.ImportColumnsStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; -import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; @@ -31,7 +36,6 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.LabelAlreadyUsedException; -import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -86,7 +90,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable private static final int DEFAULT_MAX_INTERVAL_SECOND = 5; private static final int DEFAULT_MAX_BATCH_ROWS = 100000; private static final int DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB - private static final String STAR_STRING = "*"; + protected static final String STAR_STRING = "*"; protected static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3; /** @@ -130,7 +134,11 @@ public boolean isFinalState() { protected long tableId; // this code is used to verify be task request protected long authCode; - protected RoutineLoadDesc routineLoadDesc; // optional + // protected RoutineLoadDesc routineLoadDesc; // optional + protected List partitions; // optional + protected Map columnToColumnExpr; // optional + protected Expr whereExpr; // optional + protected ColumnSeparator columnSeparator; // optional protected int desireTaskConcurrentNum; // optional protected JobState state = JobState.NEED_SCHEDULE; protected LoadDataSourceType dataSourceType; @@ -142,6 +150,7 @@ public boolean isFinalState() { protected int maxBatchRows = DEFAULT_MAX_BATCH_ROWS; protected int maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; + protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; protected String pausedReason; protected String cancelReason; @@ -202,14 +211,13 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId, this.name = name; this.dbId = dbId; this.tableId = tableId; - this.routineLoadDesc = routineLoadDesc; this.desireTaskConcurrentNum = desireTaskConcurrentNum; this.dataSourceType = dataSourceType; this.maxErrorNum = maxErrorNum; } protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { - this.routineLoadDesc = stmt.getRoutineLoadDesc(); + setRoutineLoadDesc(stmt.getRoutineLoadDesc()); if (stmt.getDesiredConcurrentNum() != -1) { this.desireTaskConcurrentNum = stmt.getDesiredConcurrentNum(); } @@ -227,6 +235,29 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { } } + private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { + if (routineLoadDesc != null) { + if (routineLoadDesc.getColumnsInfo() != null) { + ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo(); + if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { + columnToColumnExpr = Maps.newHashMap(); + for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { + columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr()); + } + } + } + if (routineLoadDesc.getWherePredicate() != null) { + whereExpr = routineLoadDesc.getWherePredicate().getExpr(); + } + if (routineLoadDesc.getColumnSeparator() != null) { + columnSeparator = routineLoadDesc.getColumnSeparator(); + } + if (routineLoadDesc.getPartitionNames() != null && routineLoadDesc.getPartitionNames().size() != 0) { + partitions = routineLoadDesc.getPartitionNames(); + } + } + } + @Override public long getId() { return id; @@ -281,6 +312,9 @@ public String getTableName() throws MetaNotFoundException { database.readLock(); try { Table table = database.getTable(tableId); + if (table == null) { + throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId); + } return table.getName(); } finally { database.readUnlock(); @@ -299,38 +333,24 @@ public long getEndTimestamp() { return endTimestamp; } - // this is a unprotected method which is called in the initialization function - protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException { - if (this.routineLoadDesc != null) { - throw new LoadException("Routine load desc has been initialized"); - } - this.routineLoadDesc = routineLoadDesc; + public List getPartitions() { + return partitions; } - public RoutineLoadDesc getRoutineLoadDesc() { - return routineLoadDesc; + public Map getColumnToColumnExpr() { + return columnToColumnExpr; } - public RoutineLoadProgress getProgress() { - return progress; - } - - public String getPartitions() { - if (routineLoadDesc == null - || routineLoadDesc.getPartitionNames() == null - || routineLoadDesc.getPartitionNames().size() == 0) { - return STAR_STRING; - } else { - return String.join(",", routineLoadDesc.getPartitionNames()); - } + public Expr getWhereExpr() { + return whereExpr; } - public int getDesiredConcurrentNumber() { - return desireTaskConcurrentNum; + public ColumnSeparator getColumnSeparator() { + return columnSeparator; } - public int getMaxErrorNum() { - return maxErrorNum; + public RoutineLoadProgress getProgress() { + return progress; } public int getMaxBatchIntervalS() { @@ -463,41 +483,41 @@ private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean i if (currentTotalRows > ERROR_SAMPLE_NUM) { if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "current error num is more then max error num, begin to pause job") + .add("msg", "current error rows is more then max error num, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - updateState(JobState.PAUSED, "current error num of job is more then max error num", isReplay); + updateState(JobState.PAUSED, "current error rows of job is more then max error num", isReplay); } LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "reset current total num and current error num when current total num is more then base") + .add("msg", "reset current total rows and current error rows when current total rows is more then base") .build()); // reset currentTotalNum and currentErrorNum currentErrorRows = 0; currentTotalRows = 0; } else if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "current error num is more then max error num, begin to pause job") + .add("msg", "current error rows is more then max error rows, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - updateState(JobState.PAUSED, "current error num is more then max error num", isReplay); + updateState(JobState.PAUSED, "current error rows is more then max error num", isReplay); // reset currentTotalNum and currentErrorNum currentErrorRows = 0; currentTotalRows = 0; LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalRows) - .add("current_error_num", currentErrorRows) + .add("current_total_rows", currentTotalRows) + .add("current_error_rows", currentErrorRows) .add("max_error_num", maxErrorNum) - .add("msg", "reset current total num and current error num when current total num is more then max error num") + .add("msg", "reset current total rows and current error rows when current total rows is more then max error num") .build()); } } @@ -570,13 +590,16 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti @Override public ListenResult onCommitted(TransactionState txnState) throws TransactionException { ListenResult result = ListenResult.UNCHANGED; + long taskBeId = -1L; writeLock(); try { // find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { - executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); + taskBeId = routineLoadTaskInfo.getBeId(); + executeCommitTask(routineLoadTaskInfo, txnState); result = ListenResult.CHANGED; } else { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", @@ -591,8 +614,8 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc throw e; } catch (Throwable e) { LOG.warn(e.getMessage(), e); - updateState(JobState.PAUSED, "failed to update offset when transaction " - + txnState.getTransactionId() + " has been committed", false /* not replay */); + updateState(JobState.PAUSED, "be " + taskBeId + " commit task failed " + txnState.getLabel() + " with error " + e.getMessage() + + " while transaction " + txnState.getTransactionId() + " has been committed", false /* not replay */); } finally { writeUnlock(); } @@ -612,12 +635,15 @@ public void replayOnCommitted(TransactionState txnState) { @Override public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReasonString) { ListenResult result = ListenResult.UNCHANGED; + long taskBeId = -1L; writeLock(); try { // step0: find task in job - Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( + Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); + taskBeId = routineLoadTaskInfo.getBeId(); // step1: job state will be changed depending on txnStatusChangeReasonString if (txnStatusChangeReasonString != null) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", @@ -628,8 +654,8 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { case OFFSET_OUT_OF_RANGE: - updateState(JobState.CANCELLED, txnStatusChangeReason.toString(), - false /* not replay */); + updateState(JobState.CANCELLED, "be " + taskBeId + " abort task " + + "with reason " + txnStatusChangeReason.toString(), false /* not replay */); return result; default: break; @@ -641,15 +667,16 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR txnState.getTransactionId()).add("msg", "txn abort").build()); } // step2: commit task , update progress, maybe create a new task - executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + executeCommitTask(routineLoadTaskInfo, txnState); result = ListenResult.CHANGED; } } catch (Exception e) { - updateState(JobState.PAUSED, - "failed to renew task when txn has been aborted with error " + e.getMessage(), - false /* not replay */); - // TODO(ml): edit log - LOG.warn("failed to renew a routine load task in job {} with error message {}", id, e.getMessage(), e); + updateState(JobState.PAUSED, "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(), + false /* not replay */); + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("task_id", txnState.getLabel()) + .add("error_msg", "change job state to paused when task has been aborted with error " + e.getMessage()) + .build()); } finally { writeUnlock(); } @@ -721,43 +748,6 @@ protected static void unprotectedCheckMeta(Database db, String tblName, RoutineL // columns will be checked when planing } - protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { - // check table belong to db, partitions belong to table - if (stmt.getRoutineLoadDesc() == null) { - checkDBSemantics(stmt.getDBTableName(), null); - } else { - checkDBSemantics(stmt.getDBTableName(), stmt.getRoutineLoadDesc().getPartitionNames()); - } - } - - private static void checkDBSemantics(TableName dbTableName, List partitionNames) - throws AnalysisException { - String tableName = dbTableName.getTbl(); - String dbName = dbTableName.getDb(); - - // check table belong to database - Database database = Catalog.getCurrentCatalog().getDb(dbName); - Table table = database.getTable(tableName); - if (table == null) { - throw new AnalysisException("There is no table named " + tableName + " in " + dbName); - } - // check table type - if (table.getType() != Table.TableType.OLAP) { - throw new AnalysisException("Only doris table support routine load"); - } - - if (partitionNames == null || partitionNames.size() == 0) { - return; - } - // check partitions belong to table - Optional partitionNotInTable = partitionNames.parallelStream() - .filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst(); - if (partitionNotInTable != null && partitionNotInTable.isPresent()) { - throw new AnalysisException("Partition " + partitionNotInTable.get() - + " does not belong to table " + tableName); - } - } - public void updateState(JobState jobState, String reason, boolean isReplay) { writeLock(); try { @@ -775,6 +765,9 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is .build()); checkStateTransform(jobState); switch (jobState) { + case RUNNING: + executeRunning(); + break; case PAUSED: executePause(reason); break; @@ -795,7 +788,7 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().unregister(id); } - if (!isReplay) { + if (!isReplay && jobState != JobState.RUNNING) { Catalog.getInstance().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -805,6 +798,10 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is .build()); } + private void executeRunning() { + state = JobState.RUNNING; + } + private void executePause(String reason) { // remove all of task in jobs and change job state to paused pausedReason = reason; @@ -836,22 +833,38 @@ public void update() { if (database == null) { LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("db_id", dbId) - .add("msg", "The database has been deleted. Change job state to stopped").build()); - updateState(JobState.STOPPED, "db not exist", false /* not replay */); + .add("msg", "The database has been deleted. Change job state to cancelled").build()); + writeLock(); + try { + if (!state.isFinalState()) { + unprotectUpdateState(JobState.CANCELLED, "db not exist", false /* not replay */); + } + } finally { + writeUnlock(); + } } + + // check table belong to database database.readLock(); + Table table; try { - Table table = database.getTable(tableId); - // check table belong to database - if (table == null) { - LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id).add("db_id", dbId) - .add("table_id", tableId) - .add("msg", "The table has been deleted Change job state to stopped").build()); - updateState(JobState.STOPPED, "table not exist", false /* not replay */); - } + table = database.getTable(tableId); } finally { database.readUnlock(); } + if (table == null) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id).add("db_id", dbId) + .add("table_id", tableId) + .add("msg", "The table has been deleted change job state to cancelled").build()); + writeLock(); + try { + if (!state.isFinalState()) { + unprotectUpdateState(JobState.CANCELLED, "table not exist", false /* not replay */); + } + } finally { + writeUnlock(); + } + } // check if partition has been changed writeLock(); @@ -879,13 +892,59 @@ public void setOrigStmt(String origStmt) { this.origStmt = origStmt; } - public String getOrigStmt() { - return origStmt; - } - // check the correctness of commit info abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + public List getShowInfo() { + List row = Lists.newArrayList(); + row.add(String.valueOf(id)); + row.add(name); + row.add(String.valueOf(dbId)); + row.add(String.valueOf(tableId)); + row.add(getState().name()); + row.add(dataSourceType.name()); + row.add(jobPropertiesToJsonString()); + row.add(dataSourcePropertiesJsonToString()); + row.add(String.valueOf(currentTaskConcurrentNum)); + row.add(String.valueOf(totalRows)); + row.add(String.valueOf(errorRows)); + row.add(getProgress().toJsonString()); + switch (state) { + case PAUSED: + row.add(pausedReason); + break; + case CANCELLED: + row.add(cancelReason); + break; + default: + row.add(""); + } + return row; + } + + public List> getTasksShowInfo() { + List> rows = Lists.newArrayList(); + routineLoadTaskInfoList.stream().forEach(entity -> rows.add(entity.getTaskShowInfo())); + return rows; + } + + private String jobPropertiesToJsonString() { + Map jobProperties = Maps.newHashMap(); + jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions)); + jobProperties.put("columnToColumnExpr", columnToColumnExpr == null ? + STAR_STRING : Joiner.on(",").withKeyValueSeparator(":").join(columnToColumnExpr)); + jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toString()); + jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString()); + jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum)); + jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS)); + jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows)); + jobProperties.put("maxBatchSizeBytes", String.valueOf(maxBatchSizeBytes)); + Gson gson = new Gson(); + return gson.toJson(jobProperties); + } + + abstract String dataSourcePropertiesJsonToString(); + public static RoutineLoadJob read(DataInput in) throws IOException { RoutineLoadJob job = null; LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); @@ -970,7 +1029,7 @@ public void readFields(DataInput in) throws IOException { try { stmt = (CreateRoutineLoadStmt) parser.parse().value; stmt.checkLoadProperties(null); - routineLoadDesc = stmt.getRoutineLoadDesc(); + setRoutineLoadDesc(stmt.getRoutineLoadDesc()); } catch (Throwable e) { throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 3f160c7caf1606..a3153ed115865f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -47,6 +47,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -118,13 +119,14 @@ public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, St throws UserException { // check load auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), - createRoutineLoadStmt.getDBTableName().getDb(), - createRoutineLoadStmt.getDBTableName().getTbl(), + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getTableName(), PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - createRoutineLoadStmt.getDBTableName()); + createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getTableName()); } RoutineLoadJob routineLoadJob = null; @@ -194,10 +196,11 @@ private boolean isNameUsed(Long dbId, String name) { return false; } - public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throws DdlException, AnalysisException { - RoutineLoadJob routineLoadJob = getJobByName(pauseRoutineLoadStmt.getName()); + public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) + throws DdlException, AnalysisException, MetaNotFoundException { + RoutineLoadJob routineLoadJob = getJobByName(pauseRoutineLoadStmt.getDbFullName(), pauseRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new DdlException("There is not routine load job with name " + pauseRoutineLoadStmt.getName()); + throw new DdlException("There is not operable routine load job with name " + pauseRoutineLoadStmt.getName()); } // check auth String dbFullName; @@ -219,16 +222,20 @@ public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throw } routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, - "User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job", - false /* not replay */); - LOG.info("pause routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); + "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job", + false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been paused by user") + .build()); } public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException, - AnalysisException { - RoutineLoadJob routineLoadJob = getJobByName(resumeRoutineLoadStmt.getName()); + AnalysisException, MetaNotFoundException { + RoutineLoadJob routineLoadJob = getJobByName(resumeRoutineLoadStmt.getDBFullName(), resumeRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new DdlException("There is not routine load job with name " + resumeRoutineLoadStmt.getName()); + throw new DdlException("There is not operable routine load job with name " + resumeRoutineLoadStmt.getName() + "."); } // check auth String dbFullName; @@ -248,14 +255,19 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th ConnectContext.get().getRemoteIP(), tableName); } - routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, "user operation", false /* not replay */); - LOG.info("resume routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); + routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been resumed by user") + .build()); } - public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException { - RoutineLoadJob routineLoadJob = getJobByName(stopRoutineLoadStmt.getName()); + public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) + throws DdlException, AnalysisException, MetaNotFoundException { + RoutineLoadJob routineLoadJob = getJobByName(stopRoutineLoadStmt.getDBFullName(), stopRoutineLoadStmt.getName()); if (routineLoadJob == null) { - throw new DdlException("There is not routine load job with name " + stopRoutineLoadStmt.getName()); + throw new DdlException("There is not operable routine load job with name " + stopRoutineLoadStmt.getName()); } // check auth String dbFullName; @@ -275,8 +287,14 @@ public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws D ConnectContext.get().getRemoteIP(), tableName); } - routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, "user operation", false /* not replay */); - LOG.info("stop routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName()); + routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, + "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job", + false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been stopped by user") + .build()); } public int getSizeOfIdToRoutineLoadTask() { @@ -373,31 +391,52 @@ public RoutineLoadJob getJob(long jobId) { return idToRoutineLoadJob.get(jobId); } - public RoutineLoadJob getJobByName(String jobName) { - String dbfullName = ConnectContext.get().getDatabase(); - Database database = Catalog.getCurrentCatalog().getDb(dbfullName); - if (database == null) { + public RoutineLoadJob getJobByName(String dbFullName, String jobName) throws MetaNotFoundException { + List routineLoadJobList = getJobByName(dbFullName, jobName, false); + if (routineLoadJobList == null || routineLoadJobList.size() == 0) { return null; + } else { + return routineLoadJobList.get(0); } - readLock(); - try { - Map> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(database.getId()); - if (nameToRoutineLoadJob == null) { - return null; + } + + public List getJobByName(String dbFullName, String jobName, boolean includeHistory) + throws MetaNotFoundException { + // return all of routine load job + List result; + RESULT: + { + if (dbFullName == null) { + result = new ArrayList<>(idToRoutineLoadJob.values()); + break RESULT; } - List routineLoadJobList = nameToRoutineLoadJob.get(jobName); - if (routineLoadJobList == null) { - return null; + + long dbId = 0L; + Database database = Catalog.getCurrentCatalog().getDb(dbFullName); + if (database == null) { + throw new MetaNotFoundException("failed to find database by dbFullName " + dbFullName); } - Optional optional = routineLoadJobList.stream() - .filter(entity -> !entity.getState().isFinalState()).findFirst(); - if (!optional.isPresent()) { - return null; + dbId = database.getId(); + if (!dbToNameToRoutineLoadJob.containsKey(dbId)) { + result = new ArrayList<>(); + break RESULT; } - return optional.get(); - } finally { - readUnlock(); + if (jobName == null) { + result = dbToNameToRoutineLoadJob.get(dbId).values().stream().flatMap(x -> x.stream()) + .collect(Collectors.toList()); + break RESULT; + } + if (dbToNameToRoutineLoadJob.get(dbId).containsKey(jobName)) { + result = new ArrayList<>(dbToNameToRoutineLoadJob.get(dbId).get(jobName)); + break RESULT; + } + return null; } + + if (!includeHistory) { + result = result.stream().filter(entity -> !entity.getState().isFinalState()).collect(Collectors.toList()); + } + return result; } public RoutineLoadJob getJobByTaskId(UUID taskId) throws MetaNotFoundException { @@ -468,19 +507,26 @@ public void replayRemoveOldRoutineLoad(RoutineLoadOperation operation) { public void updateRoutineLoadJob() { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - routineLoadJob.update(); + if (!routineLoadJob.state.isFinalState()) { + routineLoadJob.update(); + } } } public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { unprotectedAddJob(routineLoadJob); - LOG.info("replay add routine load job: {}", routineLoadJob.getId()); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("msg", "replay create routine load job") + .build()); } public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { RoutineLoadJob job = getJob(operation.getId()); - job.updateState(operation.getJobState(), "replay", true /* is replay */); - LOG.info("replay change routine load job: {}, state: {}", operation.getId(), operation.getJobState()); + job.updateState(operation.getJobState(), null, true /* is replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId()) + .add("current_state", operation.getJobState()) + .add("msg", "replay change routine load job") + .build()); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index 0b0eb90fe9c87e..344fdc1569be28 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -39,6 +39,8 @@ public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) { abstract void update(RoutineLoadProgress progress); + abstract String toJsonString(); + public static RoutineLoadProgress read(DataInput in) throws IOException { RoutineLoadProgress progress = null; LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index abdf911a34a33a..1ec68c352a987c 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -71,6 +71,8 @@ private void process() { LOG.info("there are {} job need schedule", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { + RoutineLoadJob.JobState errorJobState = null; + Throwable throwable = null; try { // create plan of routine load job routineLoadJob.plan(); @@ -91,11 +93,28 @@ private void process() { // check state and divide job into tasks routineLoadJob.divideRoutineLoadJob(desiredConcurrentTaskNum); } catch (MetaNotFoundException e) { - routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED, e.getMessage(), false /* not replay */); + errorJobState = RoutineLoadJob.JobState.CANCELLED; + throwable = e; } catch (Throwable e) { - LOG.warn("failed to scheduler job, change job state to paused", e); - routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, e.getMessage(), false /* not replay */); - continue; + errorJobState = RoutineLoadJob.JobState.PAUSED; + throwable = e; + } + + if (errorJobState != null) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("desired_state", errorJobState) + .add("warn_msg", "failed to scheduler job, change job state to desired_state with error reason " + throwable.getMessage()) + .build(), throwable); + try { + routineLoadJob.updateState(errorJobState, throwable.getMessage(), false); + } catch (Throwable e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("desired_state", errorJobState) + .add("warn_msg", "failed to change state to desired state") + .build(), e); + } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 35608d8280e6b6..c535da4947bbe9 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -18,16 +18,19 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; +import java.util.List; import java.util.UUID; /** @@ -41,12 +44,12 @@ public abstract class RoutineLoadTaskInfo { private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); protected UUID id; - protected long txnId; + protected long txnId = -1L; protected long jobId; protected String clusterName; private long createTimeMs; - private long loadStartTimeMs; + private long loadStartTimeMs = -1L; // the be id of previous task protected long previousBeId = -1L; // the be id of this task @@ -113,6 +116,20 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti routineLoadJob.getDbId(), DebugUtil.printId(id), -1, "streamLoad", TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId()); } + + public List getTaskShowInfo() { + List row = Lists.newArrayList(); + row.add(DebugUtil.printId(id)); + row.add(String.valueOf(txnId)); + row.add(String.valueOf(jobId)); + row.add(String.valueOf(TimeUtils.longToTimeString(createTimeMs))); + row.add(String.valueOf(TimeUtils.longToTimeString(loadStartTimeMs))); + row.add(String.valueOf(beId)); + row.add(getTaskDataSourceProperties()); + return row; + } + + abstract String getTaskDataSourceProperties(); @Override public boolean equals(Object obj) { diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 69914661aa8360..f540403f8dafc1 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import com.google.common.base.Strings; import org.apache.doris.analysis.AdminShowConfigStmt; import org.apache.doris.analysis.AdminShowReplicaDistributionStmt; import org.apache.doris.analysis.AdminShowReplicaStatusStmt; @@ -50,6 +51,7 @@ import org.apache.doris.analysis.ShowRolesStmt; import org.apache.doris.analysis.ShowRollupStmt; import org.apache.doris.analysis.ShowRoutineLoadStmt; +import org.apache.doris.analysis.ShowRoutineLoadTaskStmt; import org.apache.doris.analysis.ShowSnapshotStmt; import org.apache.doris.analysis.ShowStmt; import org.apache.doris.analysis.ShowTableStatusStmt; @@ -88,6 +90,8 @@ import org.apache.doris.common.proc.PartitionsProcDir; import org.apache.doris.common.proc.ProcNodeInterface; import org.apache.doris.common.proc.TabletsProcDir; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; @@ -169,6 +173,8 @@ public ShowResultSet execute() throws AnalysisException { handleShowLoadWarnings(); } else if (stmt instanceof ShowRoutineLoadStmt) { handleShowRoutineLoad(); + } else if (stmt instanceof ShowRoutineLoadTaskStmt) { + handleShowRoutineLoadTask(); } else if (stmt instanceof ShowDeleteStmt) { handleShowDelete(); } else if (stmt instanceof ShowAlterStmt) { @@ -794,21 +800,86 @@ private void handleShowLoadWarningsFromURL(ShowLoadWarningsStmt showWarningsStmt private void handleShowRoutineLoad() throws AnalysisException { ShowRoutineLoadStmt showRoutineLoadStmt = (ShowRoutineLoadStmt) stmt; + List> rows = Lists.newArrayList(); + // if job exists + List routineLoadJobList; + try { + routineLoadJobList = + Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadStmt.getDbFullName(), + showRoutineLoadStmt.getName(), + showRoutineLoadStmt.isIncludeHistory()); + } catch (MetaNotFoundException e) { + LOG.warn(e.getMessage(), e); + throw new AnalysisException(e.getMessage()); + } + + if (routineLoadJobList != null) { + // check auth + String dbFullName = showRoutineLoadStmt.getDbFullName(); + String tableName; + for (RoutineLoadJob routineLoadJob : routineLoadJobList) { + try { + tableName = routineLoadJob.getTableName(); + } catch (MetaNotFoundException e) { + // TODO(ml): how to show the cancelled job caused by deleted table + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("error_msg", "The table metadata of job has been changed. " + + "The job will be cancelled automatically") + .build(), e); + continue; + } + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbFullName, + tableName, + PrivPredicate.LOAD)) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("operator", "show routine load job") + .add("user", ConnectContext.get().getQualifiedUser()) + .add("remote_ip", ConnectContext.get().getRemoteIP()) + .add("db_full_name", dbFullName) + .add("table_name", tableName) + .add("error_msg", "The table access denied")); + continue; + } + + // get routine load info + rows.add(routineLoadJob.getShowInfo()); + } + } + + if (!Strings.isNullOrEmpty(showRoutineLoadStmt.getName()) && rows.size() == 0) { + // if the jobName has been specified + throw new AnalysisException("There is no job named " + showRoutineLoadStmt.getName() + + " in db " + showRoutineLoadStmt.getDbFullName() + + " include history " + showRoutineLoadStmt.isIncludeHistory()); + } + resultSet = new ShowResultSet(showRoutineLoadStmt.getMetaData(), rows); + } + + private void handleShowRoutineLoadTask() throws AnalysisException { + ShowRoutineLoadTaskStmt showRoutineLoadTaskStmt = (ShowRoutineLoadTaskStmt) stmt; + List> rows = Lists.newArrayList(); // if job exists - RoutineLoadJob routineLoadJob = - Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadStmt.getName()); + RoutineLoadJob routineLoadJob; + try { + routineLoadJob = Catalog.getCurrentCatalog().getRoutineLoadManager().getJobByName(showRoutineLoadTaskStmt.getDbFullName(), + showRoutineLoadTaskStmt.getJobName()); + } catch (MetaNotFoundException e) { + LOG.warn(e.getMessage(), e); + throw new AnalysisException(e.getMessage()); + } if (routineLoadJob == null) { - throw new AnalysisException("There is no routine load job with name " + showRoutineLoadStmt.getName()); + throw new AnalysisException("The job named " + showRoutineLoadTaskStmt.getJobName() + "does not exists " + + "or job state is stopped or cancelled"); } // check auth - String dbFullName; + String dbFullName = showRoutineLoadTaskStmt.getDbFullName(); String tableName; try { - dbFullName = routineLoadJob.getDbFullName(); tableName = routineLoadJob.getTableName(); } catch (MetaNotFoundException e) { - throw new AnalysisException("The metadata of job has been changed. The job will be cancelled automatically", e); + throw new AnalysisException("The table metadata of job has been changed. The job will be cancelled automatically", e); } if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbFullName, @@ -820,20 +891,9 @@ private void handleShowRoutineLoad() throws AnalysisException { tableName); } - // get routine load info - List> rows = Lists.newArrayList(); - List row = Lists.newArrayList(); - row.add(String.valueOf(routineLoadJob.getId())); - row.add(routineLoadJob.getName()); - row.add(String.valueOf(routineLoadJob.getDbId())); - row.add(String.valueOf(routineLoadJob.getTableId())); - row.add(routineLoadJob.getPartitions()); - row.add(routineLoadJob.getState().name()); - row.add(String.valueOf(routineLoadJob.getDesiredConcurrentNumber())); - row.add(routineLoadJob.getProgress().toString()); - rows.add(row); - - resultSet = new ShowResultSet(showRoutineLoadStmt.getMetaData(), rows); + // get routine load task info + rows.addAll(routineLoadJob.getTasksShowInfo()); + resultSet = new ShowResultSet(showRoutineLoadTaskStmt.getMetaData(), rows); } // Show user property statement diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 040d492ad92398..a477a2a920738c 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -143,27 +143,10 @@ public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { } private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) { - if (routineLoadJob.getRoutineLoadDesc() != null) { - RoutineLoadDesc routineLoadDesc = routineLoadJob.getRoutineLoadDesc(); - if (routineLoadDesc.getColumnsInfo() != null) { - ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo(); - if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { - columnToColumnExpr = Maps.newHashMap(); - for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { - columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr()); - } - } - } - if (routineLoadDesc.getWherePredicate() != null) { - whereExpr = routineLoadDesc.getWherePredicate().getExpr(); - } - if (routineLoadDesc.getColumnSeparator() != null) { - columnSeparator = routineLoadDesc.getColumnSeparator(); - } - if (routineLoadDesc.getPartitionNames() != null && routineLoadDesc.getPartitionNames().size() != 0) { - partitions = Joiner.on(",").join(routineLoadDesc.getPartitionNames()); - } - } + columnToColumnExpr = routineLoadJob.getColumnToColumnExpr(); + whereExpr = routineLoadJob.getWhereExpr(); + columnSeparator = routineLoadJob.getColumnSeparator(); + partitions = routineLoadJob.getPartitions() == null ? null : Joiner.on(",").join(routineLoadJob.getPartitions()); } private void setColumnToColumnExpr(String columns) throws UserException { diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index 40f8c7660e1687..c3d33347abe43f 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -198,6 +198,7 @@ import org.apache.doris.common.util.SqlUtils; keywordMap.put("pause", new Integer(SqlParserSymbols.KW_PAUSE)); keywordMap.put("resume", new Integer(SqlParserSymbols.KW_RESUME)); keywordMap.put("stop", new Integer(SqlParserSymbols.KW_STOP)); + keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK)); keywordMap.put("local", new Integer(SqlParserSymbols.KW_LOCAL)); keywordMap.put("location", new Integer(SqlParserSymbols.KW_LOCATION)); keywordMap.put("max", new Integer(SqlParserSymbols.KW_MAX)); diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index 70eef69c0e4cb4..574f51512279fd 100644 --- a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -45,6 +45,7 @@ public class CreateRoutineLoadStmtTest { public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) throws UserException { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; String topicName = "topic1"; String serverAddress = "http://127.0.0.1:8080"; @@ -55,7 +56,6 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro ColumnSeparator columnSeparator = new ColumnSeparator(","); // duplicate load property - TableName tableName = new TableName(dbName, tableNameString); List loadPropertyList = new ArrayList<>(); loadPropertyList.add(columnSeparator); loadPropertyList.add(columnSeparator); @@ -68,7 +68,7 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); @@ -91,6 +91,7 @@ public void analyze(Analyzer analyzer1) { public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; String topicName = "topic1"; String serverAddress = "127.0.0.1:8080"; @@ -114,7 +115,7 @@ public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); new MockUp() { diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 06c6d5bbb7acc3..0ce34289531b9c 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.TableName; @@ -75,6 +76,7 @@ public class KafkaRoutineLoadJobTest { private String jobName = "job1"; private String dbName = "db1"; + private LabelName labelName = new LabelName(dbName, jobName); private String tableNameString = "table1"; private String topicName = "topic1"; private String serverAddress = "http://127.0.0.1:8080"; @@ -219,8 +221,9 @@ public void testProcessTimeOutTasks(@Mocked KafkaConsumer kafkaConsumer, }; List routineLoadTaskInfoList = new ArrayList<>(); - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster"); - kafkaTaskInfo.addKafkaPartition(100); + Map partitionIdsToOffset = Maps.newHashMap(); + partitionIdsToOffset.put(100, 0L); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", partitionIdsToOffset); kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000); routineLoadTaskInfoList.add(kafkaTaskInfo); @@ -357,7 +360,7 @@ public void testFromCreateStmt(@Mocked Catalog catalog, Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic")); List kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "customKafkaPartitions"); Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult)); - Assert.assertEquals(routineLoadDesc, kafkaRoutineLoadJob.getRoutineLoadDesc()); +// Assert.assertEquals(routineLoadDesc, kafkaRoutineLoadJob.getRoutineLoadDesc()); } private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { @@ -374,7 +377,7 @@ private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); return createRoutineLoadStmt; diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index d1685efd757a88..4d6d8be221e003 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; @@ -68,6 +69,7 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, @Mocked Catalog catalog) throws UserException { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; TableName tableName = new TableName(dbName, tableNameString); List loadPropertyList = new ArrayList<>(); @@ -81,7 +83,7 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); @@ -133,6 +135,7 @@ public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, @Mocked Catalog catalog) { String jobName = "job1"; String dbName = "db1"; + LabelName labelName = new LabelName(dbName, jobName); String tableNameString = "table1"; TableName tableName = new TableName(dbName, tableNameString); List loadPropertyList = new ArrayList<>(); @@ -146,7 +149,7 @@ public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); String serverAddress = "http://127.0.0.1:8080"; customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 80cbea8f222030..c05dbad770981e 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -58,22 +58,19 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 MetaNotFoundException, AnalysisException, LabelAlreadyUsedException, BeginTransactionException { long beId = 100L; + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 100L); + partitionIdToOffset.put(2, 200L); + KafkaProgress kafkaProgress = new KafkaProgress(); + Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); + Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l, "default_cluster"); - routineLoadTaskInfo1.addKafkaPartition(1); - routineLoadTaskInfo1.addKafkaPartition(2); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l, "default_cluster", partitionIdToOffset); routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); - Map idToRoutineLoadTask = Maps.newHashMap(); idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); - Map partitionIdToOffset = Maps.newHashMap(); - partitionIdToOffset.put(1, 100L); - partitionIdToOffset.put(2, 200L); - KafkaProgress kafkaProgress = new KafkaProgress(); - kafkaProgress.setPartitionIdToOffset(partitionIdToOffset); - Map idToRoutineLoadJob = Maps.newConcurrentMap(); idToRoutineLoadJob.put("1", routineLoadJob); diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 010575a55cdf46..65f34a5513cf2b 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -328,7 +328,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Map oldKafkaProgressMap = Maps.newHashMap(); oldKafkaProgressMap.put(1, 0L); KafkaProgress oldkafkaProgress = new KafkaProgress(); - oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); + Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); @@ -401,7 +401,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Map oldKafkaProgressMap = Maps.newHashMap(); oldKafkaProgressMap.put(1, 0L); KafkaProgress oldkafkaProgress = new KafkaProgress(); - oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap); + Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap); Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);