Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions fe/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A
KW_JOIN,
KW_KEY, KW_KILL,
KW_LABEL, KW_LARGEINT, KW_LAST, KW_LEFT, KW_LESS, KW_LEVEL, KW_LIKE, KW_LIMIT, KW_LINK, KW_LOAD,
KW_ROUTINE, KW_PAUSE, KW_RESUME, KW_STOP,
KW_ROUTINE, KW_PAUSE, KW_RESUME, KW_STOP, KW_TASK,
KW_LOCAL, KW_LOCATION,
KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY,
KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS,
Expand Down Expand Up @@ -243,7 +243,8 @@ terminal String COMMENTED_PLAN_HINTS;

// Statement that the result of this parser.
nonterminal StatementBase query, stmt, show_stmt, show_param, help_stmt, load_stmt,
create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, show_routine_load_stmt,
create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt,
describe_stmt, alter_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_where_stmt;
Expand Down Expand Up @@ -374,6 +375,7 @@ nonterminal AccessPrivilege privilege_type;
nonterminal DataDescription data_desc;
nonterminal List<DataDescription> data_desc_list;
nonterminal LabelName job_label;
nonterminal LabelName opt_job_label;
nonterminal String opt_system;
nonterminal String opt_cluster;
nonterminal BrokerDesc opt_broker;
Expand Down Expand Up @@ -529,6 +531,8 @@ stmt ::=
{: RESULT = stmt; :}
| show_routine_load_stmt : stmt
{: RESULT = stmt; :}
| show_routine_load_task_stmt : stmt
{: RESULT = stmt; :}
| cancel_stmt : stmt
{: RESULT = stmt; :}
| delete_stmt : stmt
Expand Down Expand Up @@ -1001,6 +1005,17 @@ load_stmt ::=
:}
;

opt_job_label ::=
/* Empty */
{:
RESULT = null;
:}
| job_label:jobLabel
{:
RESULT = jobLabel;
:}
;

job_label ::=
ident:label
{:
Expand Down Expand Up @@ -1147,12 +1162,12 @@ opt_cluster ::=

// Routine load statement
create_routine_load_stmt ::=
KW_CREATE KW_ROUTINE KW_LOAD ident:jobName KW_ON table_name:dbTableName
KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel KW_ON ident:tableName
opt_load_property_list:loadPropertyList
opt_properties:properties
KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
{:
RESULT = new CreateRoutineLoadStmt(jobName, dbTableName, loadPropertyList, properties, type, customProperties);
RESULT = new CreateRoutineLoadStmt(jobLabel, tableName, loadPropertyList, properties, type, customProperties);
:}
;

Expand Down Expand Up @@ -1191,30 +1206,41 @@ load_property ::=
;

pause_routine_load_stmt ::=
KW_PAUSE KW_ROUTINE KW_LOAD ident:name
KW_PAUSE KW_ROUTINE KW_LOAD job_label:jobLabel
{:
RESULT = new PauseRoutineLoadStmt(name);
RESULT = new PauseRoutineLoadStmt(jobLabel);
:}
;

resume_routine_load_stmt ::=
KW_RESUME KW_ROUTINE KW_LOAD ident:name
KW_RESUME KW_ROUTINE KW_LOAD job_label:jobLabel
{:
RESULT = new ResumeRoutineLoadStmt(name);
RESULT = new ResumeRoutineLoadStmt(jobLabel);
:}
;

stop_routine_load_stmt ::=
KW_STOP KW_ROUTINE KW_LOAD ident:name
KW_STOP KW_ROUTINE KW_LOAD job_label:jobLabel
{:
RESULT = new StopRoutineLoadStmt(name);
RESULT = new StopRoutineLoadStmt(jobLabel);
:}
;

show_routine_load_stmt ::=
KW_SHOW KW_ROUTINE KW_LOAD ident:name
KW_SHOW KW_ROUTINE KW_LOAD opt_job_label:jobLabel
{:
RESULT = new ShowRoutineLoadStmt(jobLabel, false);
:}
| KW_SHOW KW_ALL KW_ROUTINE KW_LOAD opt_job_label:jobLabel
{:
RESULT = new ShowRoutineLoadStmt(jobLabel, true);
:}
;

show_routine_load_task_stmt ::=
KW_SHOW KW_ROUTINE KW_LOAD KW_TASK opt_db:dbName opt_wild_where
{:
RESULT = new ShowRoutineLoadStmt(name);
RESULT = new ShowRoutineLoadTaskStmt(dbName, parser.where);
:}
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
Create routine Load statement, continually load data from a streaming app

syntax:
CREATE ROUTINE LOAD name ON database.table
CREATE ROUTINE LOAD [database.]name on table
[load properties]
[PROPERTIES
(
Expand Down Expand Up @@ -108,15 +108,17 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(KAFKA_OFFSETS_PROPERTY)
.build();

private final String name;
private final TableName dbTableName;
private final LabelName labelName;
private final String tableName;
private final List<ParseNode> loadPropertyList;
private final Map<String, String> jobProperties;
private final String typeName;
private final Map<String, String> dataSourceProperties;

// the following variables will be initialized after analyze
// -1 as unset, the default value will set in RoutineLoadJob
private String name;
private String dbName;
private RoutineLoadDesc routineLoadDesc;
private int desiredConcurrentNum = 1;
private int maxErrorNum = -1;
Expand All @@ -130,11 +132,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
// pair<partition id, offset>
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();

public CreateRoutineLoadStmt(String name, TableName dbTableName, List<ParseNode> loadPropertyList,
public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNode> loadPropertyList,
Map<String, String> jobProperties,
String typeName, Map<String, String> dataSourceProperties) {
this.name = name;
this.dbTableName = dbTableName;
this.labelName = labelName;
this.tableName = tableName;
this.loadPropertyList = loadPropertyList;
this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties;
this.typeName = typeName.toUpperCase();
Expand All @@ -145,8 +147,12 @@ public String getName() {
return name;
}

public TableName getDBTableName() {
return dbTableName;
public String getDBName() {
return dbName;
}

public String getTableName() {
return tableName;
}

public String getTypeName() {
Expand Down Expand Up @@ -192,10 +198,10 @@ public List<Pair<Integer, Long>> getKafkaPartitionOffsets() {
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check dbName and tableName
checkDBTable(analyzer);
// check name
FeNameFormat.checkCommonName(NAME_TYPE, name);
// check dbName and tableName
dbTableName.analyze(analyzer);
// check load properties include column separator etc.
checkLoadProperties(analyzer);
// check routine load job properties include desired concurrent number etc.
Expand All @@ -204,6 +210,15 @@ public void analyze(Analyzer analyzer) throws UserException {
checkDataSourceProperties();
}

public void checkDBTable(Analyzer analyzer) throws AnalysisException {
labelName.analyze(analyzer);
dbName = labelName.getDbName();
name = labelName.getLabelName();
if (Strings.isNullOrEmpty(tableName)) {
throw new AnalysisException("Table name should not be null");
}
}

public void checkLoadProperties(Analyzer analyzer) throws UserException {
if (loadPropertyList == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,27 @@
Pause routine load by name

syntax:
PAUSE ROUTINE LOAD name
PAUSE ROUTINE LOAD [database.]name
*/
public class PauseRoutineLoadStmt extends DdlStmt {

private final String name;
private final LabelName labelName;

public PauseRoutineLoadStmt(String name) {
this.name = name;
public PauseRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}

public String getName() {
return name;
return labelName.getLabelName();
}

public String getDbFullName(){
return labelName.getDbName();
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);

if (Strings.isNullOrEmpty(name)) {
throw new AnalysisException("routine load name could not be empty or null");
}
labelName.analyze(analyzer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,27 @@
Resume routine load job by name

syntax:
RESUME ROUTINE LOAD name
RESUME ROUTINE LOAD [database.]name
*/
public class ResumeRoutineLoadStmt extends DdlStmt{

private final String name;
private final LabelName labelName;

public ResumeRoutineLoadStmt(String name) {
this.name = name;
public ResumeRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}

public String getName() {
return name;
return labelName.getLabelName();
}

public String getDBFullName(){
return labelName.getDbName();
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
if (Strings.isNullOrEmpty(name)) {
throw new AnalysisException("routine load name could not be empty or null");
}
labelName.analyze(analyzer);
}
}
86 changes: 71 additions & 15 deletions fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ShowResultSetMetaData;
Expand All @@ -30,40 +31,95 @@
Show routine load progress by routine load name

syntax:
SHOW ROUTINE LOAD name
SHOW [ALL] ROUTINE LOAD [database.][name]

without ALL: only show job which is not final
with ALL: show all of job include history job

without name: show all of routine load job with different name
with name: show all of job named ${name}

without on db: show all of job in connection db
if user does not choose db before, return error
with on db: show all of job in ${db}

example:
show routine load named test in database1
SHOW ROUTINE LOAD database1.test;

show routine load in database1
SHOW ROUTINE LOAD database1;

show routine load in database1 include history
use database1;
SHOW ALL ROUTINE LOAD;

show routine load in all of database
please use show proc
*/
public class ShowRoutineLoadStmt extends ShowStmt {

private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("id")
.add("name")
.add("db_id")
.add("table_id")
.add("partitions")
.add("state")
.add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)
.add("progress")
.add("Id")
.add("Name")
.add("DBId")
.add("TableId")
.add("State")
.add("DataSourceType")
.add("JobProperties")
.add("DataSourceProperties")
.add("CurrentTaskConcurrentNumber")
.add("TotalRows")
.add("TotalErrorRows")
.add("Progress")
.add("ReasonOfStateChanged")
.build();

private final String name;
private final LabelName labelName;
private String dbFullName; // optional
private String name; // optional
private boolean includeHistory = false;


public ShowRoutineLoadStmt(String name) {
this.name = name;
public ShowRoutineLoadStmt(LabelName labelName, boolean includeHistory) {
this.labelName = labelName;
this.includeHistory = includeHistory;
}

public String getDbFullName() {
return dbFullName;
}

public String getName() {
return name;
}

public boolean isIncludeHistory() {
return includeHistory;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (Strings.isNullOrEmpty(name)) {
throw new AnalysisException("routine load name could not be empty or null");
checkLabelName(analyzer);
}

private void checkLabelName(Analyzer analyzer) throws AnalysisException {
String dbName = labelName == null ? null : labelName.getDbName();
if (Strings.isNullOrEmpty(dbName)) {
dbFullName = analyzer.getContext().getDatabase();
if (Strings.isNullOrEmpty(dbFullName)) {
throw new AnalysisException("please choose a database firstly "
+ "such as use db, show routine load db.name etc.");
}
} else {
dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName);
}
name = labelName == null ? null : labelName.getLabelName();
}


@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
Expand Down
Loading