From f907c69fa4c3de5ae689336cf501a22babc0aede Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 11 Oct 2024 18:41:27 +0800 Subject: [PATCH] [branch-2.1][feat](job)Implementing Job in Nereids (#41391) ## Proposed changes The JOB's execution SQL is currently defined by an older CUP file, which causes some issues with lexical analysis in the new optimizer as it doesn't pass under the old optimizer. Since the JOB's underlying execution already uses the new optimizer, we're planning to fully migrate to ANTLR4 for consistency. --- .../org/apache/doris/nereids/DorisParser.g4 | 28 +- .../apache/doris/analysis/CreateJobStmt.java | 1 + .../job/extensions/insert/InsertJob.java | 4 +- .../nereids/parser/LogicalPlanBuilder.java | 28 ++ .../doris/nereids/trees/plans/PlanType.java | 1 + .../plans/commands/CreateJobCommand.java | 67 +++++ .../plans/commands/info/CreateJobInfo.java | 263 ++++++++++++++++++ .../insert/InsertIntoTableCommand.java | 14 +- .../trees/plans/visitor/CommandVisitor.java | 5 + .../data/job_p0/job_meta/job_query_test.out | 4 +- .../suites/job_p0/test_base_insert_job.groovy | 59 ++-- 11 files changed, 445 insertions(+), 29 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 5b75deca1982da..ab2ef6c8ba520c 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -68,12 +68,7 @@ statementBase | CREATE (EXTERNAL)? TABLE (IF NOT EXISTS)? name=multipartIdentifier LIKE existedTable=multipartIdentifier (WITH ROLLUP (rollupNames=identifierList)?)? #createTableLike - | explain? cte? INSERT (INTO | OVERWRITE TABLE) - (tableName=multipartIdentifier | DORIS_INTERNAL_TABLE_ID LEFT_PAREN tableId=INTEGER_VALUE RIGHT_PAREN) - partitionSpec? // partition define - (WITH LABEL labelName=identifier)? cols=identifierList? // label and columns define - (LEFT_BRACKET hints=identifierSeq RIGHT_BRACKET)? // hint define - query #insertTable + | insertIntoStatement #insertTableAlines | explain? cte? UPDATE tableName=multipartIdentifier tableAlias SET updateAssignmentSeq fromClause? @@ -121,9 +116,28 @@ statementBase | ALTER TABLE table=multipartIdentifier DROP CONSTRAINT constraintName=errorCapturingIdentifier #dropConstraint | SHOW CONSTRAINTS FROM table=multipartIdentifier #showConstraint + | supportedJobStatement #supportedJobStatementAlias | unsupportedStatement #unsupported ; - +insertIntoStatement + : explain? cte? INSERT (INTO | OVERWRITE TABLE) + (tableName=multipartIdentifier | DORIS_INTERNAL_TABLE_ID LEFT_PAREN tableId=INTEGER_VALUE RIGHT_PAREN) + partitionSpec? // partition define + (WITH LABEL labelName=identifier)? cols=identifierList? // label and columns define + (LEFT_BRACKET hints=identifierSeq RIGHT_BRACKET)? // hint define + query #insertTable + ; +supportedJobStatement + : CREATE JOB label=multipartIdentifier ON SCHEDULE + ( + (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier + (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))? + (ENDS endsTime=STRING_LITERAL)?) + | + (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))) + commentSpec? + DO insertIntoStatement #createScheduledJob + ; unsupportedStatement : SET identifier AS DEFAULT STORAGE VAULT #setDefaultStorageVault | SET PROPERTY (FOR user=identifierOrText)? propertyItemList #setUserProperties diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 088e9eb3e870d9..a8b0b30dce2b7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -60,6 +60,7 @@ * quantity { DAY | HOUR | MINUTE | * WEEK | SECOND } */ +@Deprecated @Slf4j public class CreateJobStmt extends DdlStmt { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 43f43ba86997cf..487591efc04745 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -299,7 +299,9 @@ public void cancelTaskById(long taskId) throws JobException { @Override public void cancelAllTasks() throws JobException { try { - checkAuth("CANCEL LOAD"); + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + checkAuth("CANCEL LOAD"); + } super.cancelAllTasks(); this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"); } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 61521bd7681692..62080329350bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -363,6 +363,7 @@ import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.Constraint; +import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand; @@ -397,6 +398,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableLikeInfo; @@ -540,6 +542,32 @@ public LogicalPlan visitStatementDefault(StatementDefaultContext ctx) { return withExplain(plan, ctx.explain()); } + @Override + public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext ctx) { + Optional label = ctx.label == null ? Optional.empty() : Optional.of(ctx.label.getText()); + Optional atTime = ctx.atTime == null ? Optional.empty() : Optional.of(ctx.atTime.getText()); + Optional immediateStartOptional = ctx.CURRENT_TIMESTAMP() == null ? Optional.of(false) : + Optional.of(true); + Optional startTime = ctx.startTime == null ? Optional.empty() : Optional.of(ctx.startTime.getText()); + Optional endsTime = ctx.endsTime == null ? Optional.empty() : Optional.of(ctx.endsTime.getText()); + Optional interval = ctx.timeInterval == null ? Optional.empty() : + Optional.of(Long.valueOf(ctx.timeInterval.getText())); + Optional intervalUnit = ctx.timeUnit == null ? Optional.empty() : Optional.of(ctx.timeUnit.getText()); + String comment = + visitCommentSpec(ctx.commentSpec()); + String executeSql = getOriginSql(ctx.insertIntoStatement()); + CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, interval, intervalUnit, startTime, + endsTime, immediateStartOptional, comment, executeSql); + return new CreateJobCommand(createJobInfo); + } + + @Override + public String visitCommentSpec(DorisParser.CommentSpecContext ctx) { + String commentSpec = ctx == null ? "''" : ctx.STRING_LITERAL().getText(); + return + LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); + } + @Override public LogicalPlan visitInsertTable(InsertTableContext ctx) { boolean isOverwrite = ctx.INTO() == null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 3ff217f39ef025..4fce2f11356e86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -143,6 +143,7 @@ public enum PlanType { SELECT_INTO_OUTFILE_COMMAND, UPDATE_COMMAND, CREATE_MTMV_COMMAND, + CREATE_JOB_COMMAND, ALTER_MTMV_COMMAND, ADD_CONSTRAINT_COMMAND, DROP_CONSTRAINT_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java new file mode 100644 index 00000000000000..4bcf594e0f08ea --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +/** + * syntax: + * CREATE + * [DEFINER = user] + * JOB + * event_name + * ON SCHEDULE schedule + * [COMMENT 'string'] + * DO event_body; + * schedule: { + * [STREAMING] AT timestamp + * | EVERY interval + * [STARTS timestamp ] + * [ENDS timestamp ] + * } + * interval: + * quantity { DAY | HOUR | MINUTE | + * WEEK | SECOND } + */ +public class CreateJobCommand extends Command implements ForwardWithSync { + + private CreateJobInfo createJobInfo; + + public CreateJobCommand(CreateJobInfo jobInfo) { + super(PlanType.CREATE_JOB_COMMAND); + this.createJobInfo = jobInfo; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx); + Env.getCurrentEnv().getJobManager().registerJob(job); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitCreateJobCommand(this, context); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java new file mode 100644 index 00000000000000..6cef7ee89ec960 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.base.TimerDefinition; +import org.apache.doris.job.common.IntervalUnit; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +import java.util.Optional; + +/** + * Build job info and analyze the SQL statement to create a job. + */ +public class CreateJobInfo { + + // exclude job name prefix, which is used by inner job + private static final String excludeJobNamePrefix = "inner_"; + + private final Optional labelNameOptional; + + private final Optional onceJobStartTimestampOptional; + + private final Optional intervalOptional; + + private final Optional intervalTimeUnitOptional; + + private final Optional startsTimeStampOptional; + + private final Optional endsTimeStampOptional; + + private final Optional immediateStartOptional; + + private final String comment; + + private final String executeSql; + + /** + * Constructor for CreateJobInfo. + * + * @param labelNameOptional Job name. + * @param onceJobStartTimestampOptional Start time for a one-time job. + * @param intervalOptional Interval for a recurring job. + * @param intervalTimeUnitOptional Interval time unit for a recurring job. + * @param startsTimeStampOptional Start time for a recurring job. + * @param endsTimeStampOptional End time for a recurring job. + * @param immediateStartOptional Immediate start for a job. + * @param comment Comment for the job. + * @param executeSql Original SQL statement. + */ + public CreateJobInfo(Optional labelNameOptional, Optional onceJobStartTimestampOptional, + Optional intervalOptional, Optional intervalTimeUnitOptional, + Optional startsTimeStampOptional, Optional endsTimeStampOptional, + Optional immediateStartOptional, String comment, String executeSql) { + this.labelNameOptional = labelNameOptional; + this.onceJobStartTimestampOptional = onceJobStartTimestampOptional; + this.intervalOptional = intervalOptional; + this.intervalTimeUnitOptional = intervalTimeUnitOptional; + this.startsTimeStampOptional = startsTimeStampOptional; + this.endsTimeStampOptional = endsTimeStampOptional; + this.immediateStartOptional = immediateStartOptional; + this.comment = comment; + this.executeSql = executeSql; + + } + + /** + * Analyzes the provided SQL statement and builds the job information. + * + * @param ctx Connect context. + * @return AbstractJob instance. + * @throws UserException If there is an error during SQL analysis or job creation. + */ + public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserException { + checkAuth(); + if (labelNameOptional.orElseThrow(() -> new AnalysisException("labelName is null")).isEmpty()) { + throw new AnalysisException("Job name can not be empty"); + } + + String jobName = labelNameOptional.get(); + checkJobName(jobName); + String dbName = ctx.getDatabase(); + + Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + // check its insert stmt,currently only support insert stmt + JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); + JobExecuteType executeType = intervalOptional.isPresent() ? JobExecuteType.RECURRING : JobExecuteType.ONE_TIME; + jobExecutionConfiguration.setExecuteType(executeType); + TimerDefinition timerDefinition = new TimerDefinition(); + + if (executeType.equals(JobExecuteType.ONE_TIME)) { + buildOnceJob(timerDefinition, jobExecutionConfiguration); + } else { + buildRecurringJob(timerDefinition, jobExecutionConfiguration); + } + jobExecutionConfiguration.setTimerDefinition(timerDefinition); + return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration); + } + + /** + * Builds a TimerDefinition for a once-job. + * + * @param timerDefinition Timer definition to be built. + * @param jobExecutionConfiguration Job execution configuration. + * @throws AnalysisException If the job is not configured correctly. + */ + private void buildOnceJob(TimerDefinition timerDefinition, + JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException { + if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) { + jobExecutionConfiguration.setImmediate(true); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); + return; + } + + // Ensure start time is provided for once jobs. + String startTime = onceJobStartTimestampOptional.orElseThrow(() + -> new AnalysisException("Once time job must set start time")); + timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(startTime)); + } + + /** + * Builds a TimerDefinition for a recurring job. + * + * @param timerDefinition Timer definition to be built. + * @param jobExecutionConfiguration Job execution configuration. + * @throws AnalysisException If the job is not configured correctly. + */ + private void buildRecurringJob(TimerDefinition timerDefinition, + JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException { + // Ensure interval is provided for recurring jobs. + long interval = intervalOptional.orElseThrow(() + -> new AnalysisException("Interval must be set for recurring job")); + timerDefinition.setInterval(interval); + + // Ensure interval time unit is provided for recurring jobs. + String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(() + -> new AnalysisException("Interval time unit must be set for recurring job")); + IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase()); + if (intervalUnit == null) { + throw new AnalysisException("Invalid interval time unit: " + intervalTimeUnit); + } + + // Check if interval unit is second and disable if not in test mode. + if (intervalUnit.equals(IntervalUnit.SECOND) && !Config.enable_job_schedule_second_for_test) { + throw new AnalysisException("Interval time unit can not be second in production mode"); + } + + timerDefinition.setIntervalUnit(intervalUnit); + + // Set end time if provided. + endsTimeStampOptional.ifPresent(s -> timerDefinition.setEndTimeMs(stripQuotesAndParseTimestamp(s))); + + // Set immediate start if configured. + if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) { + jobExecutionConfiguration.setImmediate(true); + // Avoid immediate re-scheduling by setting start time slightly in the past. + timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100); + return; + } + // Set start time if provided. + startsTimeStampOptional.ifPresent(s -> timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s))); + } + + protected static void checkAuth() throws AnalysisException { + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + /** + * Analyzes the provided SQL statement and creates an appropriate job based on the parsed logical plan. + * Currently, only "InsertIntoTableCommand" is supported for job creation. + * + * @param sql the SQL statement to be analyzed + * @param currentDbName the current database name where the SQL statement will be executed + * @param jobExecutionConfiguration the configuration for job execution + * @return an instance of AbstractJob corresponding to the SQL statement + * @throws UserException if there is an error during SQL analysis or job creation + */ + private AbstractJob analyzeAndCreateJob(String sql, String currentDbName, + JobExecutionConfiguration jobExecutionConfiguration) throws UserException { + NereidsParser parser = new NereidsParser(); + LogicalPlan logicalPlan = parser.parseSingle(sql); + if (logicalPlan instanceof InsertIntoTableCommand) { + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; + try { + insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); + return new InsertJob(labelNameOptional.get(), + JobStatus.RUNNING, + currentDbName, + comment, + ConnectContext.get().getCurrentUserIdentity(), + jobExecutionConfiguration, + System.currentTimeMillis(), + sql); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); + } + } else { + throw new AnalysisException("Not support this sql : " + sql + " Command class is " + + logicalPlan.getClass().getName() + "."); + } + } + + private void checkJobName(String jobName) throws AnalysisException { + if (Strings.isNullOrEmpty(jobName)) { + throw new AnalysisException("job name can not be null"); + } + if (jobName.startsWith(excludeJobNamePrefix)) { + throw new AnalysisException("job name can not start with " + excludeJobNamePrefix); + } + } + + /** + * Strips quotes from the input string and parses it to a timestamp. + * + * @param str The input string potentially enclosed in single or double quotes. + * @return The parsed timestamp as a long value, or -1L if the input is null or empty. + */ + public static Long stripQuotesAndParseTimestamp(String str) { + if (str == null || str.isEmpty()) { + return -1L; + } + if (str.startsWith("'") && str.endsWith("'")) { + str = str.substring(1, str.length() - 1); + } else if (str.startsWith("\"") && str.endsWith("\"")) { + str = str.substring(1, str.length() - 1); + } + return TimeUtils.timeStringToLong(str.trim()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 1f5afc836e5a28..7a46948f1b57a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -118,12 +118,20 @@ public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor, runInternal(ctx, executor); } + public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception { + return initPlan(ctx, executor, true); + } + /** * This function is used to generate the plan for Nereids. * There are some load functions that only need to the plan, such as stream_load. * Therefore, this section will be presented separately. + * @param needBeginTransaction whether to start a transaction. + * For external uses such as creating a job, only basic analysis is needed without starting a transaction, + * in which case this can be set to false. */ - public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception { + public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor, + boolean needBeginTransaction) throws Exception { if (!ctx.getSessionVariable().isEnableNereidsDML()) { try { ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); @@ -221,6 +229,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor // TODO: support other table types throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table"); } + if (!needBeginTransaction) { + targetTableIf.readUnlock(); + return insertExecutor; + } if (!insertExecutor.isEmptyInsert()) { insertExecutor.beginTransaction(); insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalSink); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 3af299022ba50c..dc725147d9916a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -23,6 +23,7 @@ import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand; @@ -106,6 +107,10 @@ default R visitCreateMTMVCommand(CreateMTMVCommand createMTMVCommand, C context) return visitCommand(createMTMVCommand, context); } + default R visitCreateJobCommand(CreateJobCommand createJobCommand, C context) { + return visitCommand(createJobCommand, context); + } + default R visitAlterMTMVCommand(AlterMTMVCommand alterMTMVCommand, C context) { return visitCommand(alterMTMVCommand, context); } diff --git a/regression-test/data/job_p0/job_meta/job_query_test.out b/regression-test/data/job_p0/job_meta/job_query_test.out index 1a2bfe0f9cd995..2bfbb890aed767 100644 --- a/regression-test/data/job_p0/job_meta/job_query_test.out +++ b/regression-test/data/job_p0/job_meta/job_query_test.out @@ -1,7 +1,7 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select1 -- -JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); +JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213') -- !select2 -- -JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); +JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213') diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index c3b77336587c43..76264d8ae94436 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -26,6 +26,9 @@ suite("test_base_insert_job") { def tableName = "t_test_BASE_inSert_job" def jobName = "insert_recovery_test_base_insert_job" def jobMixedName = "Insert_recovery_Test_base_insert_job" + sql """ + SET enable_fallback_to_original_planner=false; + """ sql """drop table if exists `${tableName}` force""" sql """ DROP JOB IF EXISTS where jobname = '${jobName}' @@ -70,27 +73,47 @@ suite("test_base_insert_job") { ); """ sql """ - CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + insert into ${tableName} values + ('2023-03-18', 1, 1) + """ + sql """ + CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO INSERT INTO ${tableName} (`timestamp`, `type`, `user_id`) + WITH + tbl_timestamp AS ( + SELECT `timestamp` FROM ${tableName} WHERE user_id = 1 + ), + tbl_type AS ( + SELECT `type` FROM ${tableName} WHERE user_id = 1 + ), + tbl_user_id AS ( + SELECT `user_id` FROM ${tableName} WHERE user_id = 1 + ) + SELECT + tbl_timestamp.`timestamp`, + tbl_type.`type`, + tbl_user_id.`user_id` + FROM + tbl_timestamp, tbl_type, tbl_user_id; """ Awaitility.await().atMost(30, SECONDS).until( { def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='RECURRING' """ println(onceJob) - onceJob .size() == 1 && '1' <= onceJob.get(0).get(0) - + onceJob.size() == 1 && '1' <= onceJob.get(0).get(0) + } - ) + ) sql """ PAUSE JOB where jobname = '${jobName}' """ def tblDatas = sql """select * from ${tableName}""" println tblDatas - assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some times 3 records + assert tblDatas.size() >= 2 //at least 2 records def pauseJobId = sql """select id from jobs("type"="insert") where Name='${jobName}'""" def taskStatus = sql """select status from tasks("type"="insert") where jobid= '${pauseJobId.get(0).get(0)}'""" println taskStatus for (int i = 0; i < taskStatus.size(); i++) { - assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) != "STOPPED"||taskStatus.get(i).get(0) != "STOPPED" + assert taskStatus.get(i).get(0) != "FAILED" || taskStatus.get(i).get(0) != "STOPPED" || taskStatus.get(i).get(0) != "STOPPED" } sql """ CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); @@ -126,11 +149,11 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', 2, 1001); """ - Awaitility.await("create-one-time-job-test").atMost(30,SECONDS).until( - { - def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ - onceJob.size() == 1 && '1' == onceJob.get(0).get(0) - } + Awaitility.await("create-one-time-job-test").atMost(30, SECONDS).until( + { + def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ + onceJob.size() == 1 && '1' == onceJob.get(0).get(0) + } ) def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ assert onceJob.size() == 1 @@ -141,7 +164,7 @@ suite("test_base_insert_job") { assert datas.size() == 1 assert datas.get(0).get(0) == "FINISHED" // check table data - def dataCount1 = sql """select count(1) from ${tableName}""" + def dataCount1 = sql """select count(1) from ${tableName} where user_id=1001""" assert dataCount1.get(0).get(0) == 1 // check job status def oncejob = sql """select status,comment from jobs("type"="insert") where Name='${jobName}' """ @@ -200,10 +223,10 @@ suite("test_base_insert_job") { println(tasks.size()) Awaitility.await("resume-job-test").atMost(60, SECONDS).until({ def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ - println "resume tasks :"+afterResumeTasks - afterResumeTasks.size() >tasks.size() + println "resume tasks :" + afterResumeTasks + afterResumeTasks.size() > tasks.size() }) - + // assert same job name try { sql """ @@ -218,7 +241,7 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test' DO update ${tableName} set type=2 where type=1; """ } catch (Exception e) { - assert e.getMessage().contains("Not support this sql") + assert e.getMessage().contains("Not support this sql :") } // assert start time greater than current time try { @@ -247,7 +270,7 @@ suite("test_base_insert_job") { // assert end time less than start time try { sql """ - CREATE JOB test_error_starts ON SCHEDULE every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + CREATE JOB test_error_starts ON SCHEDULE every 1 second starts current_timestamp ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { assert e.getMessage().contains("endTimeMs must be greater than the start time") @@ -258,7 +281,7 @@ suite("test_base_insert_job") { CREATE JOB test_error_starts ON SCHEDULE every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert e.getMessage().contains("interval time unit can not be years") + assert e.getMessage().contains("Invalid interval time unit: years") } // test keyword as job name