Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.commons.lang3.StringUtils;

import java.util.HashSet;
import java.util.Set;

/**
* syntax:
Expand Down Expand Up @@ -86,15 +85,11 @@ public class CreateJobStmt extends DdlStmt {
private JobExecuteType executeType;

// exclude job name prefix, which is used by inner job
private final Set<String> excludeJobNamePrefix = new HashSet<>();

{
excludeJobNamePrefix.add("inner_mtmv_");
}
private final String excludeJobNamePrefix = "inner_";

private static final ImmutableSet<Class<? extends DdlStmt>> supportStmtSuperClass
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class)
.add(UpdateStmt.class).build();
.build();

private static final HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);

Expand Down Expand Up @@ -164,16 +159,15 @@ public void analyze(Analyzer analyzer) throws UserException {
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
job.setExecuteSql(executeSql);

//job.checkJobParams();
jobInstance = job;
}

private void checkJobName(String jobName) throws AnalysisException {
for (String prefix : excludeJobNamePrefix) {
if (jobName.startsWith(prefix)) {
throw new AnalysisException("job name can not start with " + prefix);
}
if (StringUtils.isBlank(jobName)) {
throw new AnalysisException("job name can not be null");
}
if (jobName.startsWith(excludeJobNamePrefix)) {
throw new AnalysisException("job name can not start with " + excludeJobNamePrefix);
}
}

Expand All @@ -193,7 +187,7 @@ private void checkStmtSupport() throws AnalysisException {
return;
}
}
throw new AnalysisException("Not support this stmt type");
throw new AnalysisException("Not support " + doStmt.getClass().getSimpleName() + " type in job");
}

private void analyzerSqlStmt() throws UserException {
Expand Down
108 changes: 96 additions & 12 deletions regression-test/suites/job_p0/test_base_insert_job.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,32 @@ import java.time.ZoneId;
suite("test_base_insert_job") {
def tableName = "t_test_BASE_inSert_job"
def jobName = "insert_recovery_test_base_insert_job"
def jobMixedName = "Insert_recovery_Test_base_insert_job"
sql """drop table if exists `${tableName}` force"""
sql """
DROP JOB where jobname = '${jobName}'
"""
sql """
DROP JOB where jobname = 'JOB'
"""
sql """
DROP JOB where jobname = 'DO'
"""
sql """
DROP JOB where jobname = 'AT'
"""
sql """
DROP JOB where jobname = 'SCHEDULE'
"""
sql """
DROP JOB where jobname = 'STARTS'
"""
sql """
DROP JOB where jobname = 'ENDS'
"""
sql """
DROP JOB where jobname = '${jobMixedName}'
"""

sql """
CREATE TABLE IF NOT EXISTS `${tableName}`
Expand All @@ -48,9 +70,20 @@ suite("test_base_insert_job") {
def jobs = sql """select * from ${tableName}"""
println jobs
assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records
sql """
CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
def mixedNameJobs = sql """select name,comment from jobs("type"="insert") where Name='${jobMixedName}'"""
println mixedNameJobs
assert mixedNameJobs.size() == 1 && mixedNameJobs.get(0).get(0) == jobMixedName
assert mixedNameJobs.get(0).get(1) == ''
sql """
DROP JOB where jobname = '${jobName}'
"""
sql """
DROP JOB where jobname = '${jobMixedName}'
"""

sql """drop table if exists `${tableName}` force """
sql """
CREATE TABLE IF NOT EXISTS `${tableName}`
Expand Down Expand Up @@ -84,7 +117,6 @@ suite("test_base_insert_job") {
def onceJobSql= onceJob.get(0).get(1);
println onceJobSql
def assertSql = "insert into ${tableName} values (\'2023-07-19\', sleep(10000), 1001);"
println 'hhh'
println assertSql
assert onceJobSql == assertSql
// test cancel task
Expand All @@ -108,44 +140,96 @@ suite("test_base_insert_job") {
assert oncejob.get(0).get(0) == "FINISHED"
//assert comment
assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"


// assert same job name
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001);
"""
}catch (Exception e) {
assert e.getMessage().contains("job name exist, jobName:insert_recovery_test_base_insert_job")
}
def errorTblName="${tableName}qwertyuioppoiuyte"
sql """drop table if exists `${errorTblName}` force"""
// assert error table name
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${errorTblName} values ('2023-07-19', sleep(10000), 1001);
"""
}catch (Exception e) {
assert e.getMessage().contains("Unknown table 't_test_BASE_inSert_jobqwertyuioppoiuyte'")
}
// assert not support stmt
try{
sql """
CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO update ${tableName} set type=2 where type=1;
"""
} catch (Exception e) {
assert e.getMessage().contains("Not support UpdateStmt type in job")
}
// assert start time greater than current time
try{
sql """
CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("startTimeMs must be greater than current time")
}
sql """
DROP JOB where jobname = 'test_one_time_error_starts'
"""
// assert end time less than start time
try{
sql """
CREATE JOB test_one_time_error_starts ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("startTimeMs must be greater than current time")
}
sql """
DROP JOB where jobname = 'test_error_starts'
"""
try{
sql """
CREATE JOB inner_test ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("job name can not start with inner_")
}
// assert end time less than start time
try{
sql """
CREATE JOB test_error_starts ON SCHEDULE every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("end time cannot be less than start time")
}

sql """
DROP JOB where jobname = 'test_error_starts'
"""
// assert interval time unit can not be years
try{
sql """
CREATE JOB test_error_starts ON SCHEDULE every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("interval time unit can not be years")
}

// test keyword as job name
sql """
CREATE JOB JOB ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
sql """
CREATE JOB SCHEDULE ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
sql """
CREATE JOB DO ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
sql """
CREATE JOB AT ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""

sql """
CREATE JOB STARTS ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""

sql """
CREATE JOB ENDS ON SCHEDULE every 20 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""

def jobCountRsp = sql"""select count(1) from jobs("type"="insert") where name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""
assert jobCountRsp.get(0).get(0) == 6

}