diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index d8f5f810eb046c..a0ae7c6916e636 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -99,16 +99,19 @@ materializedViewStatement | SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier #showCreateMTMV ; supportedJobStatement - : CREATE JOB label=multipartIdentifier ON SCHEDULE - ( + : CREATE JOB label=multipartIdentifier propertyClause? + ON (STREAMING | SCHEDULE( (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))? (ENDS endsTime=STRING_LITERAL)?) | - (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))) - commentSpec? - DO supportedDmlStatement #createScheduledJob + (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)) + ) + ) + commentSpec? + DO supportedDmlStatement #createScheduledJob | PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #pauseJob + | ALTER JOB FOR (jobNameKey=identifier) (propertyClause | supportedDmlStatement | propertyClause supportedDmlStatement) #alterJob | DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #dropJob | RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #resumeJob | CANCEL TASK WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ (taskIdValue=INTEGER_VALUE) #cancelJobTask diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java new file mode 100644 index 00000000000000..3985b59bf16379 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.base; + +import org.apache.doris.common.AnalysisException; + +public interface JobProperties { + default void validate() throws AnalysisException { + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index dd0e0d3a228420..5b386886b19ce8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -51,6 +51,8 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.ErrorTabletInfo; import org.apache.doris.transaction.TabletCommitInfo; @@ -96,6 +98,11 @@ public class InsertJob extends AbstractJob> impl .add(new Column("CreateTime", ScalarType.createStringType())) .addAll(COMMON_SCHEMA) .add(new Column("Comment", ScalarType.createStringType())) + // only execute type = streaming need record + .add(new Column("Progress", ScalarType.createStringType())) + .add(new Column("RemoteOffset", ScalarType.createStringType())) + .add(new Column("LoadStatistic", ScalarType.createStringType())) + .add(new Column("ErrorMsg", ScalarType.createStringType())) .build(); private static final ShowResultSetMetaData TASK_META_DATA = @@ -112,6 +119,8 @@ public class InsertJob extends AbstractJob> impl .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(200))) .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(200))) .addColumn(new Column("User", ScalarType.createVarchar(50))) + // only execute type = streaming need record + .addColumn(new Column("Offset", ScalarType.createStringType())) .build(); public static final ImmutableMap COLUMN_TO_INDEX; @@ -523,6 +532,28 @@ public List getShowInfo() { } } + @Override + public TRow getTvfInfo() { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(getJobName())); + trow.addToColumnValue(new TCell().setStringVal(getCreateUser().getQualifiedUser())); + trow.addToColumnValue(new TCell().setStringVal(getJobConfig().getExecuteType().name())); + trow.addToColumnValue(new TCell().setStringVal(getJobConfig().convertRecurringStrategyToString())); + trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name())); + trow.addToColumnValue(new TCell().setStringVal(getExecuteSql())); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getSucceedTaskCount().get()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getFailedTaskCount().get()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getCanceledTaskCount().get()))); + trow.addToColumnValue(new TCell().setStringVal(getComment())); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg())); + return trow; + } + @Override public String formatMsgWhenExecuteQueueFull(Long taskId) { return commonFormatMsgWhenExecuteQueueFull(taskId, "insert_task_queue_size", diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index f6633d6c5e8e6e..34cfdf6edea874 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -67,7 +67,8 @@ public class InsertTask extends AbstractTask { new Column("FinishTime", ScalarType.createStringType()), new Column("TrackingUrl", ScalarType.createStringType()), new Column("LoadStatistic", ScalarType.createStringType()), - new Column("User", ScalarType.createStringType())); + new Column("User", ScalarType.createStringType()), + new Column("Offset", ScalarType.createStringType())); public static final ImmutableMap COLUMN_TO_INDEX; @@ -272,6 +273,7 @@ public TRow getTvfInfo(String jobName) { } else { trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); } + trow.addToColumnValue(new TCell().setStringVal("")); return trow; } @@ -292,6 +294,7 @@ private TRow getPendingTaskTVFInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); + trow.addToColumnValue(new TCell().setStringVal("")); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 9e5baccd13450c..a2016c52cb9371 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -17,13 +17,16 @@ package org.apache.doris.job.extensions.insert.streaming; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.common.io.Text; import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecutionConfiguration; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.PauseReason; import org.apache.doris.job.exception.JobException; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; import com.google.gson.annotations.SerializedName; import lombok.Getter; @@ -37,6 +40,8 @@ public class StreamingInsertJob extends AbstractJob> { + @SerializedName("did") + private final long dbId; @Getter @SerializedName("st") protected JobStatus status; @@ -52,6 +57,26 @@ public class StreamingInsertJob extends AbstractJob properties; + private long maxIntervalSecond; + private long s3BatchFiles; + private long s3BatchSize; + + public StreamingJobProperties(Map jobProperties) { + this.properties = jobProperties; + } + + @Override + public void validate() throws AnalysisException { + this.maxIntervalSecond = Util.getLongPropertyOrDefault( + properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY), + StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND, (v) -> v >= 1, + StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY + " should > 1"); + + this.s3BatchFiles = Util.getLongPropertyOrDefault( + properties.get(StreamingJobProperties.S3_BATCH_FILES_PROPERTY), + StreamingJobProperties.DEFAULT_S3_BATCH_FILES, (v) -> v >= 1, + StreamingJobProperties.S3_BATCH_FILES_PROPERTY + " should >=1 "); + + this.s3BatchSize = Util.getLongPropertyOrDefault(properties.get(StreamingJobProperties.S3_BATCH_SIZE_PROPERTY), + StreamingJobProperties.DEFAULT_S3_BATCH_SIZE, (v) -> v >= 100 * 1024 * 1024 + && v <= (long) (1024 * 1024 * 1024) * 10, + StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should between 100MB and 10GB"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 3d705b945e7dbd..e763f8de590153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -32,6 +32,7 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; @@ -219,6 +220,17 @@ public void alterJobStatus(Long jobId, JobStatus status) throws JobException { jobMap.get(jobId).logUpdateOperation(); } + public void alterJob(T job) { + writeLock(); + try { + jobMap.put(job.getJobId(), job); + job.logUpdateOperation(); + } finally { + writeUnlock(); + } + log.info("update job success, jobId: {}", job.getJobId()); + } + public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException { for (T a : jobMap.values()) { if (a.getJobName().equals(jobName)) { @@ -349,6 +361,9 @@ public void replayDeleteJob(T replayJob) throws JobException { */ public void cancelTaskById(String jobName, Long taskId) throws JobException { for (T job : jobMap.values()) { + if (job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) { + throw new JobException("streaming job not support cancel task by id"); + } if (job.getJobName().equals(jobName)) { job.cancelTaskById(taskId); job.logUpdateOperation(); @@ -392,6 +407,14 @@ public T getJob(Long jobId) { return jobMap.get(jobId); } + public T getJobByName(String jobName) throws JobException { + for (T a : jobMap.values()) { + if (a.getJobName().equals(jobName)) { + return a; + } + } + throw new JobException("job not exist, jobName:" + jobName); + } /** * get load info by db diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java new file mode 100644 index 00000000000000..095f0a5e6bf2bc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.offset; + +public interface Offset { + String toJson(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java new file mode 100644 index 00000000000000..f88079617de532 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.offset; + +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; + +/** + * Interface for managing offsets and metadata of a data source. + */ +public interface SourceOffsetProvider { + /** + * Get source type, e.g. s3, kafka + * @return + */ + String getSourceType(); + + /** + * Get next offset to consume + * @return + */ + Offset getNextOffset(); + + /** + * Rewrite the TVF parameters in the InsertIntoTableCommand based on the current offset. + * @param command + * @return + */ + InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand command); + + /** + * Update the progress of the source. + * @param offset + */ + void updateProgress(Offset offset); + + /** + * Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka. + */ + void fetchRemoteMeta(); + + /** + * Whether there is more data to consume + * @return + */ + boolean hasMoreData(); +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java new file mode 100644 index 00000000000000..5ba1d903d78135 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.offset; + +import org.apache.doris.job.offset.s3.S3SourceOffsetProvider; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SourceOffsetProviderFactory { + private static final Map> map = new ConcurrentHashMap<>(); + + static { + map.put("s3", S3SourceOffsetProvider.class); + } + + public static SourceOffsetProvider createSourceOffsetProvider(String sourceType) throws InstantiationException, + IllegalAccessException { + Class cla = map.get(sourceType.toUpperCase()); + return cla.newInstance(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java new file mode 100644 index 00000000000000..86ff467796af8d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.offset.s3; + +import org.apache.doris.job.offset.Offset; +import org.apache.doris.persist.gson.GsonUtils; + +import java.util.List; + +public class S3Offset implements Offset { + String startFile; + + String endFile; + + List fileLists; + + @Override + public String toJson() { + return GsonUtils.GSON.toJson(this); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java new file mode 100644 index 00000000000000..087d9c2beb7685 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.offset.s3; + +import org.apache.doris.job.offset.Offset; +import org.apache.doris.job.offset.SourceOffsetProvider; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; + +public class S3SourceOffsetProvider implements SourceOffsetProvider { + + @Override + public String getSourceType() { + return null; + } + + @Override + public Offset getNextOffset() { + return null; + } + + @Override + public InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand command) { + return null; + } + + @Override + public void updateProgress(Offset offset) { + } + + @Override + public void fetchRemoteMeta() { + } + + @Override + public boolean hasMoreData() { + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index abb468a19ccb30..80a0886017ea6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -608,6 +608,7 @@ import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand; import org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand; +import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.AlterResourceCommand; import org.apache.doris.nereids.trees.plans.commands.AlterRoleCommand; @@ -1144,11 +1145,13 @@ public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext Optional interval = ctx.timeInterval == null ? Optional.empty() : Optional.of(Long.valueOf(ctx.timeInterval.getText())); Optional intervalUnit = ctx.timeUnit == null ? Optional.empty() : Optional.of(ctx.timeUnit.getText()); + Map properties = ctx.propertyClause() != null + ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : Maps.newHashMap(); String comment = visitCommentSpec(ctx.commentSpec()); String executeSql = getOriginSql(ctx.supportedDmlStatement()); CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, interval, intervalUnit, startTime, - endsTime, immediateStartOptional, comment, executeSql); + endsTime, immediateStartOptional, comment, executeSql, ctx.STREAMING() != null, properties); return new CreateJobCommand(createJobInfo); } @@ -1158,6 +1161,15 @@ private void checkJobNameKey(String key, String keyFormat, DorisParser.Supported } } + @Override + public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) { + checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx); + Map properties = ctx.propertyClause() != null + ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : Maps.newHashMap(); + String executeSql = getOriginSql(ctx.supportedDmlStatement()); + return new AlterJobCommand(stripQuotes(ctx.jobNameKey.getText()), properties, executeSql); + } + @Override public LogicalPlan visitPauseJob(DorisParser.PauseJobContext ctx) { checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index e150899f95cb79..367e6d641e04ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -162,6 +162,7 @@ public enum PlanType { CREATE_MTMV_COMMAND, CREATE_MATERIALIZED_VIEW_COMMAND, CREATE_JOB_COMMAND, + ALTER_JOB_COMMAND, PAUSE_JOB_COMMAND, CANCEL_JOB_COMMAND, DROP_CATALOG_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java new file mode 100644 index 00000000000000..ef8c9bb8b7a120 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.StmtType; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.base.TimerDefinition; +import org.apache.doris.job.common.IntervalUnit; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; +import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; + +/** + * alter job command. + */ +public class AlterJobCommand extends AlterCommand implements ForwardWithSync { + // exclude job name prefix, which is used by inner job + private static final String excludeJobNamePrefix = "inner_"; + private String jobName; + private Map properties; + private String sql; + + public AlterJobCommand(String jobName, Map properties, String sql) { + super(PlanType.ALTER_JOB_COMMAND); + this.jobName = jobName; + this.properties = properties; + this.sql = sql; + } + + public String getJobName() { + return jobName; + } + + @Override + public StmtType stmtType() { + return StmtType.ALTER; + } + + @Override + public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { + + validate(); + AbstractJob job = analyzeAndBuildJobInfo(ctx); + ctx.getEnv().getJobManager().alterJob(job); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitAlterJobCommand(this, context); + } + + private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws JobException { + AbstractJob job = Env.getCurrentEnv().getJobManager().getJobByName(jobName); + if (job instanceof StreamingInsertJob) { + StreamingInsertJob originJob = (StreamingInsertJob) job; + String updateSQL = StringUtils.isEmpty(sql) ? originJob.getExecuteSql() : sql; + Map updateProps = properties == null || properties.isEmpty() ? originJob.getJobProperties() + .getProperties() : properties; + StreamingJobProperties streamJobProps = new StreamingJobProperties(updateProps); + // rebuild time definition + JobExecutionConfiguration execConfig = originJob.getJobConfig(); + TimerDefinition timerDefinition = new TimerDefinition(); + timerDefinition.setInterval(streamJobProps.getMaxIntervalSecond()); + timerDefinition.setIntervalUnit(IntervalUnit.SECOND); + timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs()); + execConfig.setTimerDefinition(timerDefinition); + return new StreamingInsertJob(jobName, + job.getJobStatus(), + job.getCurrentDbName(), + job.getComment(), + ConnectContext.get().getCurrentUserIdentity(), + execConfig, + System.currentTimeMillis(), + updateSQL, + streamJobProps); + } else { + throw new JobException("Unsupported job type for ALTER:" + job.getJobType()); + } + } + + private void validate() throws Exception { + if (jobName.startsWith(excludeJobNamePrefix)) { + throw new AnalysisException("Can't alter inner job"); + } + AbstractJob job = Env.getCurrentEnv().getJobManager().getJobByName(jobName); + if (!JobStatus.PAUSED.equals(job.getJobStatus())) { + throw new AnalysisException("Only PAUSED job can be altered"); + } + + if (job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING) + && job instanceof StreamingInsertJob) { + StreamingInsertJob streamingJob = (StreamingInsertJob) job; + boolean proCheck = checkProperties(streamingJob.getJobProperties().getProperties()); + boolean sqlCheck = checkSql(streamingJob.getExecuteSql()); + if (!proCheck && !sqlCheck) { + throw new AnalysisException("No properties or sql changed in ALTER JOB"); + } + } else { + throw new AnalysisException("Unsupported job type for ALTER:" + job.getJobType()); + } + } + + private boolean checkProperties(Map originProps) { + if (originProps.isEmpty()) { + return false; + } + if (!originProps.equals(properties)) { + return true; + } + return false; + } + + private boolean checkSql(String sql) { + if (sql == null || sql.isEmpty()) { + return false; + } + if (!sql.equals(sql)) { + return true; + } + return false; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java index 6cef7ee89ec960..0d52e23ece56ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -27,10 +27,13 @@ import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.base.JobProperties; import org.apache.doris.job.base.TimerDefinition; import org.apache.doris.job.common.IntervalUnit; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; +import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; @@ -39,6 +42,7 @@ import com.google.common.base.Strings; +import java.util.Map; import java.util.Optional; /** @@ -66,6 +70,8 @@ public class CreateJobInfo { private final String comment; private final String executeSql; + private final boolean streamingJob; + private final Map jobProperties; /** * Constructor for CreateJobInfo. @@ -83,7 +89,8 @@ public class CreateJobInfo { public CreateJobInfo(Optional labelNameOptional, Optional onceJobStartTimestampOptional, Optional intervalOptional, Optional intervalTimeUnitOptional, Optional startsTimeStampOptional, Optional endsTimeStampOptional, - Optional immediateStartOptional, String comment, String executeSql) { + Optional immediateStartOptional, String comment, String executeSql, + boolean streamingJob, Map jobProperties) { this.labelNameOptional = labelNameOptional; this.onceJobStartTimestampOptional = onceJobStartTimestampOptional; this.intervalOptional = intervalOptional; @@ -93,7 +100,8 @@ public CreateJobInfo(Optional labelNameOptional, Optional onceJo this.immediateStartOptional = immediateStartOptional; this.comment = comment; this.executeSql = executeSql; - + this.streamingJob = streamingJob; + this.jobProperties = jobProperties; } /** @@ -117,16 +125,32 @@ public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserExcepti // check its insert stmt,currently only support insert stmt JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); JobExecuteType executeType = intervalOptional.isPresent() ? JobExecuteType.RECURRING : JobExecuteType.ONE_TIME; + JobProperties properties = null; + if (streamingJob) { + executeType = JobExecuteType.STREAMING; + properties = new StreamingJobProperties(jobProperties); + } jobExecutionConfiguration.setExecuteType(executeType); - TimerDefinition timerDefinition = new TimerDefinition(); + TimerDefinition timerDefinition = new TimerDefinition(); if (executeType.equals(JobExecuteType.ONE_TIME)) { buildOnceJob(timerDefinition, jobExecutionConfiguration); + } else if (executeType.equals(JobExecuteType.STREAMING)) { + buildStreamingJob(timerDefinition, properties); } else { buildRecurringJob(timerDefinition, jobExecutionConfiguration); } jobExecutionConfiguration.setTimerDefinition(timerDefinition); - return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration); + return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration, properties); + } + + private void buildStreamingJob(TimerDefinition timerDefinition, JobProperties props) + throws AnalysisException { + StreamingJobProperties properties = (StreamingJobProperties) props; + timerDefinition.setInterval(properties.getMaxIntervalSecond()); + timerDefinition.setIntervalUnit(IntervalUnit.SECOND); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); + properties.validate(); } /** @@ -210,7 +234,17 @@ protected static void checkAuth() throws AnalysisException { * @throws UserException if there is an error during SQL analysis or job creation */ private AbstractJob analyzeAndCreateJob(String sql, String currentDbName, - JobExecutionConfiguration jobExecutionConfiguration) throws UserException { + JobExecutionConfiguration jobExecutionConfiguration, + JobProperties properties) throws UserException { + if (jobExecutionConfiguration.getExecuteType().equals(JobExecuteType.STREAMING)) { + return analyzeAndCreateStreamingInsertJob(sql, currentDbName, jobExecutionConfiguration, properties); + } else { + return analyzeAndCreateInsertJob(sql, currentDbName, jobExecutionConfiguration); + } + } + + private AbstractJob analyzeAndCreateInsertJob(String sql, String currentDbName, + JobExecutionConfiguration jobExecutionConfiguration) throws UserException { NereidsParser parser = new NereidsParser(); LogicalPlan logicalPlan = parser.parseSingle(sql); if (logicalPlan instanceof InsertIntoTableCommand) { @@ -234,6 +268,26 @@ private AbstractJob analyzeAndCreateJob(String sql, String currentDbName, } } + private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String currentDbName, + JobExecutionConfiguration jobExecutionConfiguration, JobProperties properties) throws UserException { + NereidsParser parser = new NereidsParser(); + LogicalPlan logicalPlan = parser.parseSingle(sql); + if (logicalPlan instanceof InsertIntoTableCommand) { + return new StreamingInsertJob(labelNameOptional.get(), + JobStatus.RUNNING, + currentDbName, + comment, + ConnectContext.get().getCurrentUserIdentity(), + jobExecutionConfiguration, + System.currentTimeMillis(), + sql, + (StreamingJobProperties) properties); + } else { + throw new AnalysisException("Not support this sql : " + sql + " Command class is " + + logicalPlan.getClass().getName() + "."); + } + } + private void checkJobName(String jobName) throws AnalysisException { if (Strings.isNullOrEmpty(jobName)) { throw new AnalysisException("job name can not be null"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index a5f9a00be1d934..45abfa0c25064c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand; import org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand; +import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.AlterResourceCommand; @@ -387,6 +388,10 @@ default R visitCreateJobCommand(CreateJobCommand createJobCommand, C context) { return visitCommand(createJobCommand, context); } + default R visitAlterJobCommand(AlterJobCommand alterJobCommand, C context) { + return visitCommand(alterJobCommand, context); + } + default R visitCreateFileCommand(CreateFileCommand createFileCommand, C context) { return visitCommand(createFileCommand, context); }