From 9fd4ac911e6621773eb59f57f51d3fb7a7db486e Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 16 Sep 2025 22:39:38 +0800 Subject: [PATCH] add case and fix job bug --- .../property/storage/StorageProperties.java | 6 +- .../java/org/apache/doris/fs/FileSystem.java | 2 +- .../org/apache/doris/fs/obj/S3ObjStorage.java | 16 ++-- .../apache/doris/fs/remote/RemoteFile.java | 18 ++++ .../apache/doris/fs/remote/S3FileSystem.java | 2 +- .../apache/doris/job/base/AbstractJob.java | 6 +- .../apache/doris/job/base/JobProperties.java | 3 - .../insert/streaming/StreamingInsertJob.java | 55 ++++++++---- .../insert/streaming/StreamingInsertTask.java | 16 +++- .../streaming/StreamingJobProperties.java | 8 +- .../job/offset/SourceOffsetProvider.java | 2 +- .../apache/doris/job/offset/s3/S3Offset.java | 2 +- .../job/offset/s3/S3SourceOffsetProvider.java | 60 +++++++------ .../trees/plans/commands/AlterJobCommand.java | 23 ++--- .../plans/commands/info/CreateJobInfo.java | 24 ++---- .../insert/InsertIntoTableCommand.java | 7 ++ .../doris/catalog/DropFunctionTest.java | 29 ++++++- .../test_streaming_insert_job.out | 22 +++++ .../test_streaming_insert_job.groovy | 86 +++++++++++++++++++ 19 files changed, 288 insertions(+), 99 deletions(-) create mode 100644 regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out create mode 100644 regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java index 2ce87f9ffb9772..422d20a7560293 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java @@ -159,8 +159,6 @@ public static StorageProperties createPrimary(Map origProps) { private static final List, StorageProperties>> PROVIDERS = Arrays.asList( - props -> (isFsSupport(props, FS_HDFS_SUPPORT) - || HdfsProperties.guessIsMe(props)) ? new HdfsProperties(props) : null, props -> ((isFsSupport(props, FS_OSS_HDFS_SUPPORT) || isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT)) || OSSHdfsProperties.guessIsMe(props)) ? new OSSHdfsProperties(props) : null, @@ -181,7 +179,9 @@ public static StorageProperties createPrimary(Map origProps) { props -> (isFsSupport(props, FS_BROKER_SUPPORT) || BrokerProperties.guessIsMe(props)) ? new BrokerProperties(props) : null, props -> (isFsSupport(props, FS_LOCAL_SUPPORT) - || LocalProperties.guessIsMe(props)) ? new LocalProperties(props) : null + || LocalProperties.guessIsMe(props)) ? new LocalProperties(props) : null, + props -> (isFsSupport(props, FS_HDFS_SUPPORT) + || HdfsProperties.guessIsMe(props)) ? new HdfsProperties(props) : null ); protected StorageProperties(Type type, Map origProps) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index cfbb3e560f39db..b2c6957ce382ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -124,7 +124,7 @@ default Status globList(String remotePath, List result) { * @param fileNumLimit limit the total number of files to be listed. * @return */ - default String globListWithLimit(String remotePath, List result, + default String globListWithLimit(String remotePath, List result, String startFile, long fileSizeLimit, long fileNumLimit) { throw new UnsupportedOperationException("Unsupported operation glob list with limit on current file system."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java index 0a4c9159881d70..4c79df0acf705a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -642,7 +642,7 @@ public Status globList(String remotePath, List result, boolean fileN * * @return The largest file name after listObject this time */ - public String globListWithLimit(String remotePath, List result, String startFile, + public String globListWithLimit(String remotePath, List result, String startFile, long fileSizeLimit, long fileNumLimit) { long roundCnt = 0; long elementCnt = 0; @@ -704,9 +704,16 @@ public String globListWithLimit(String remotePath, List result, String s } matchCnt++; + RemoteFile remoteFile = new RemoteFile(objPath.getFileName().toString(), + !isPrefix, + isPrefix ? -1 : obj.size(), + isPrefix ? -1 : obj.size(), + isPrefix ? 0 : obj.lastModified().toEpochMilli() + ); + remoteFile.setBucket(bucket); + remoteFile.setParentPath(objPath.getParent().toString()); matchFileSize += obj.size(); - String remoteFileName = "s3://" + bucket + "/" + objPath; - result.add(remoteFileName); + result.add(remoteFile); if (reachLimit(result.size(), matchFileSize, fileSizeLimit, fileNumLimit)) { break; @@ -718,8 +725,7 @@ public String globListWithLimit(String remotePath, List result, String s } //record current last object file name S3Object lastS3Object = response.contents().get(response.contents().size() - 1); - java.nio.file.Path lastObjPath = Paths.get(lastS3Object.key()); - currentMaxFile = "s3://" + bucket + "/" + lastObjPath; + currentMaxFile = lastS3Object.key(); isTruncated = response.isTruncated(); if (isTruncated) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java index 1f6f0225278b07..ac1f8add9475f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java @@ -36,6 +36,8 @@ public class RemoteFile { private long modificationTime; private Path path; BlockLocation[] blockLocations; + private String parentPath; + private String bucket; public RemoteFile(String name, boolean isFile, long size, long blockSize) { this(name, null, isFile, !isFile, size, blockSize, 0, null); @@ -75,6 +77,22 @@ public void setPath(Path path) { this.path = path; } + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public String getParentPath() { + return parentPath; + } + + public void setParentPath(String parentPath) { + this.parentPath = parentPath; + } + public boolean isFile() { return isFile; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 9c409a66a2942c..d4e5a23c20e4cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -69,7 +69,7 @@ public Status globList(String remotePath, List result, boolean fileN } @Override - public String globListWithLimit(String remotePath, List result, String startFile, + public String globListWithLimit(String remotePath, List result, String startFile, long fileSizeLimit, long fileNumLimit) { S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; return objStorage.globListWithLimit(remotePath, result, startFile, fileSizeLimit, fileNumLimit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 83b84348ff77eb..d08942460f541f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -99,13 +99,13 @@ public abstract class AbstractJob implements Job properties; private StreamingJobProperties jobProperties; @Getter @SerializedName("tvf") @@ -108,26 +111,40 @@ public StreamingInsertJob(String jobName, JobExecutionConfiguration jobConfig, Long createTimeMs, String executeSql, - StreamingJobProperties jobProperties) { + Map properties) { super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser, jobConfig, createTimeMs, executeSql); this.dbId = ConnectContext.get().getCurrentDbId(); - this.jobProperties = jobProperties; + this.properties = properties; init(); } private void init() { try { + this.jobProperties = new StreamingJobProperties(properties); + jobProperties.validate(); + // build time definition + JobExecutionConfiguration execConfig = getJobConfig(); + TimerDefinition timerDefinition = new TimerDefinition(); + timerDefinition.setInterval(jobProperties.getMaxIntervalSecond()); + timerDefinition.setIntervalUnit(IntervalUnit.SECOND); + timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs()); + execConfig.setTimerDefinition(timerDefinition); + UnboundTVFRelation currentTvf = getCurrentTvf(); + this.tvfType = currentTvf.getFunctionName(); this.originTvfProps = currentTvf.getProperties().getMap(); this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName()); + } catch (AnalysisException ae) { + log.warn("parse streaming insert job failed, props: {}", properties, ae); + throw new RuntimeException("parse streaming insert job failed, " + ae.getMessage()); } catch (Exception ex) { log.warn("init streaming insert job failed, sql: {}", getExecuteSql(), ex); - throw new RuntimeException("init streaming insert job failed, sql: " + getExecuteSql(), ex); + throw new RuntimeException("init streaming insert job failed, " + ex.getMessage()); } } - private UnboundTVFRelation getCurrentTvf() throws Exception { + private UnboundTVFRelation getCurrentTvf() { if (baseCommand == null) { this.baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(getExecuteSql()); } @@ -170,7 +187,7 @@ public List createTasks(TaskType taskType, Map originTvfProps; SourceOffsetProvider offsetProvider; public StreamingInsertTask(long jobId, @@ -73,6 +76,7 @@ public StreamingInsertTask(long jobId, SourceOffsetProvider offsetProvider, String currentDb, StreamingJobProperties jobProperties, + Map originTvfProps, UserIdentity userIdentity) { this.jobId = jobId; this.taskId = taskId; @@ -81,6 +85,7 @@ public StreamingInsertTask(long jobId, this.currentDb = currentDb; this.offsetProvider = offsetProvider; this.jobProperties = jobProperties; + this.originTvfProps = originTvfProps; this.labelName = getJobId() + LABEL_SPLITTER + getTaskId(); this.createTimeMs = System.currentTimeMillis(); } @@ -118,8 +123,15 @@ private void before() throws Exception { StatementContext statementContext = new StatementContext(); ctx.setStatementContext(statementContext); - this.runningOffset = offsetProvider.getNextOffset(jobProperties, jobProperties.getProperties()); - this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset); + this.runningOffset = offsetProvider.getNextOffset(jobProperties, originTvfProps); + InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(sql); + StmtExecutor baseStmtExecutor = + new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand, ctx.getStatementContext())); + baseCommand.initPlan(ctx, baseStmtExecutor, false); + if (!baseCommand.getParsedPlan().isPresent()) { + throw new JobException("Can not get Parsed plan"); + } + this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, runningOffset); this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId())); this.taskCommand.setJobId(getTaskId()); this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java index 4f463a090d5225..e71b169ef21c3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java @@ -24,6 +24,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.SessionVariable; +import com.google.gson.annotations.SerializedName; import lombok.Data; import java.util.HashMap; @@ -43,15 +44,18 @@ public class StreamingJobProperties implements JobProperties { private final Map properties; private long maxIntervalSecond; - private int maxRetry; private long s3BatchFiles; private long s3BatchSize; public StreamingJobProperties(Map jobProperties) { this.properties = jobProperties; + if (properties.isEmpty()) { + this.maxIntervalSecond = DEFAULT_MAX_INTERVAL_SECOND; + this.s3BatchFiles = DEFAULT_S3_BATCH_FILES; + this.s3BatchSize = DEFAULT_S3_BATCH_SIZE; + } } - @Override public void validate() throws AnalysisException { this.maxIntervalSecond = Util.getLongPropertyOrDefault( properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY), diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index d9b2264d5b683d..e670dac553cf4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -62,7 +62,7 @@ public interface SourceOffsetProvider { * @param nextOffset * @return rewritten InsertIntoTableCommand */ - InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset nextOffset) throws Exception; + InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand, Offset nextOffset); /** * Update the offset of the source. diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index dd6927935bb6a9..cbd8645cc594b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -30,7 +30,7 @@ public class S3Offset implements Offset { String startFile; String endFile; - List fileLists; + String fileLists; @Override public void setEndOffset(String endOffset) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index a95f4af65c94b3..df4a344f06402b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -19,20 +19,18 @@ import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.job.offset.Offset; import org.apache.doris.job.offset.SourceOffsetProvider; -import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.trees.expressions.Properties; -import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation; -import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; import lombok.extern.log4j.Log4j2; import java.util.ArrayList; @@ -40,12 +38,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; @Log4j2 public class S3SourceOffsetProvider implements SourceOffsetProvider { S3Offset currentOffset; String maxRemoteEndFile; - InsertIntoTableCommand baseCommand; @Override public String getSourceType() { @@ -54,19 +52,29 @@ public String getSourceType() { @Override public S3Offset getNextOffset(StreamingJobProperties jobProps, Map properties) { + Map copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + copiedProps.putAll(properties); S3Offset offset = new S3Offset(); - List rfiles = new ArrayList<>(); + List rfiles = new ArrayList<>(); String startFile = currentOffset == null ? null : currentOffset.endFile; String filePath = null; - StorageProperties storageProperties = StorageProperties.createPrimary(properties); + StorageProperties storageProperties = StorageProperties.createPrimary(copiedProps); try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { - String uri = storageProperties.validateAndGetUri(properties); + String uri = storageProperties.validateAndGetUri(copiedProps); filePath = storageProperties.validateAndNormalizeUri(uri); maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, startFile, jobProps.getS3BatchFiles(), jobProps.getS3BatchSize()); offset.setStartFile(startFile); - offset.setEndFile(rfiles.get(rfiles.size() - 1)); - offset.setFileLists(rfiles); + //todo: The path may be in the form of bucket/dir/*/*, + // but currently only the case where the last segment is * is handled. + if (!rfiles.isEmpty()) { + String bucket = rfiles.get(0).getBucket(); + String parentPath = rfiles.get(0).getParentPath(); + String filePaths = rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{", "}")); + String finalFiles = String.format("s3://%s/%s/%s", bucket, parentPath, filePaths); + offset.setEndFile(String.format("s3://%s/%s/%s", bucket, parentPath, rfiles.get(rfiles.size() - 1).getName())); + offset.setFileLists(finalFiles); + } } catch (Exception e) { log.warn("list path exception, path={}", filePath, e); throw new RuntimeException(e); @@ -93,23 +101,18 @@ public String getRemoteOffset() { } @Override - public InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset runningOffset) throws Exception { + public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand, Offset runningOffset) { S3Offset offset = (S3Offset) runningOffset; Map props = new HashMap<>(); - String finalUri = "{" + String.join(",", offset.getFileLists()) + "}"; - props.put("uri", finalUri); - if (baseCommand == null) { - this.baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(executeSql); - this.baseCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); - } - // rewrite plan - Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> { - if (plan instanceof LogicalTVFRelation) { - LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan; - LogicalTVFRelation newRvfRel = new LogicalTVFRelation( - originTvfRel.getRelationId(), new S3(new Properties(props)), ImmutableList.of()); - return newRvfRel; + Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan -> { + if (plan instanceof UnboundTVFRelation) { + UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan; + Map oriMap = originTvfRel.getProperties().getMap(); + props.putAll(oriMap); + props.put("uri", offset.getFileLists()); + return new UnboundTVFRelation( + originTvfRel.getRelationId(), originTvfRel.getFunctionName(), new Properties(props)); } return plan; }); @@ -120,14 +123,17 @@ public InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset running @Override public void updateOffset(Offset offset) { this.currentOffset = (S3Offset) offset; + this.currentOffset.setFileLists(null); } @Override public void fetchRemoteMeta(Map properties) throws Exception { - StorageProperties storageProperties = StorageProperties.createPrimary(properties); + Map copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + copiedProps.putAll(properties); + StorageProperties storageProperties = StorageProperties.createPrimary(copiedProps); String startFile = currentOffset == null ? null : currentOffset.endFile; try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { - String uri = storageProperties.validateAndGetUri(properties); + String uri = storageProperties.validateAndGetUri(copiedProps); String filePath = storageProperties.validateAndNormalizeUri(uri); maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new ArrayList<>(), startFile, 1, 1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index 53a90893b81444..b1823de4a1d50f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -22,13 +22,9 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; -import org.apache.doris.job.base.JobExecutionConfiguration; -import org.apache.doris.job.base.TimerDefinition; -import org.apache.doris.job.common.IntervalUnit; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; -import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -81,25 +77,18 @@ private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws JobExcepti if (job instanceof StreamingInsertJob) { StreamingInsertJob originJob = (StreamingInsertJob) job; String updateSQL = StringUtils.isEmpty(sql) ? originJob.getExecuteSql() : sql; - Map updateProps = properties == null || properties.isEmpty() ? originJob.getJobProperties() - .getProperties() : properties; - StreamingJobProperties streamJobProps = new StreamingJobProperties(updateProps); - // rebuild time definition - JobExecutionConfiguration execConfig = originJob.getJobConfig(); - TimerDefinition timerDefinition = new TimerDefinition(); - timerDefinition.setInterval(streamJobProps.getMaxIntervalSecond()); - timerDefinition.setIntervalUnit(IntervalUnit.SECOND); - timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs()); - execConfig.setTimerDefinition(timerDefinition); + Map updateProps = + properties == null || properties.isEmpty() ? originJob.getProperties() : properties; + return new StreamingInsertJob(jobName, job.getJobStatus(), job.getCurrentDbName(), job.getComment(), ConnectContext.get().getCurrentUserIdentity(), - execConfig, + originJob.getJobConfig(), System.currentTimeMillis(), updateSQL, - streamJobProps); + updateProps); } else { throw new JobException("Unsupported job type for ALTER:" + job.getJobType()); } @@ -117,7 +106,7 @@ private void validate() throws Exception { if (job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING) && job instanceof StreamingInsertJob) { StreamingInsertJob streamingJob = (StreamingInsertJob) job; - boolean proCheck = checkProperties(streamingJob.getJobProperties().getProperties()); + boolean proCheck = checkProperties(streamingJob.getProperties()); boolean sqlCheck = checkSql(streamingJob.getExecuteSql()); if (!proCheck && !sqlCheck) { throw new AnalysisException("No properties or sql changed in ALTER JOB"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java index 51d0dddc7cb664..40a4d5830590f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -125,11 +125,8 @@ public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserExcepti // check its insert stmt,currently only support insert stmt JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); JobExecuteType executeType = intervalOptional.isPresent() ? JobExecuteType.RECURRING : JobExecuteType.ONE_TIME; - JobProperties properties = null; if (streamingJob) { executeType = JobExecuteType.STREAMING; - properties = new StreamingJobProperties(jobProperties); - properties.validate(); jobExecutionConfiguration.setImmediate(true); } jobExecutionConfiguration.setExecuteType(executeType); @@ -138,21 +135,18 @@ public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserExcepti if (executeType.equals(JobExecuteType.ONE_TIME)) { buildOnceJob(timerDefinition, jobExecutionConfiguration); } else if (executeType.equals(JobExecuteType.STREAMING)) { - buildStreamingJob(timerDefinition, properties); + buildStreamingJob(timerDefinition); } else { buildRecurringJob(timerDefinition, jobExecutionConfiguration); } jobExecutionConfiguration.setTimerDefinition(timerDefinition); - return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration, properties); + return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration, jobProperties); } - private void buildStreamingJob(TimerDefinition timerDefinition, JobProperties props) - throws AnalysisException { - StreamingJobProperties properties = (StreamingJobProperties) props; - timerDefinition.setInterval(properties.getMaxIntervalSecond()); + private void buildStreamingJob(TimerDefinition timerDefinition) { + // timerDefinition.setInterval(properties.getMaxIntervalSecond()); timerDefinition.setIntervalUnit(IntervalUnit.SECOND); timerDefinition.setStartTimeMs(System.currentTimeMillis()); - properties.validate(); } /** @@ -237,7 +231,7 @@ protected static void checkAuth() throws AnalysisException { */ private AbstractJob analyzeAndCreateJob(String sql, String currentDbName, JobExecutionConfiguration jobExecutionConfiguration, - JobProperties properties) throws UserException { + Map properties) throws UserException { if (jobExecutionConfiguration.getExecuteType().equals(JobExecuteType.STREAMING)) { return analyzeAndCreateStreamingInsertJob(sql, currentDbName, jobExecutionConfiguration, properties); } else { @@ -271,13 +265,13 @@ private AbstractJob analyzeAndCreateInsertJob(String sql, String currentDbName, } private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String currentDbName, - JobExecutionConfiguration jobExecutionConfiguration, JobProperties properties) throws UserException { + JobExecutionConfiguration jobExecutionConfiguration, Map properties) throws UserException { NereidsParser parser = new NereidsParser(); LogicalPlan logicalPlan = parser.parseSingle(sql); if (logicalPlan instanceof InsertIntoTableCommand) { - // InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; try { - // insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); + insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); return new StreamingInsertJob(labelNameOptional.get(), JobStatus.PENDING, currentDbName, @@ -286,7 +280,7 @@ private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String curren jobExecutionConfiguration, System.currentTimeMillis(), sql, - (StreamingJobProperties) properties); + properties); } catch (Exception e) { throw new AnalysisException(e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 6736b16f8afeb0..e774499f3617cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -61,6 +61,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -103,6 +104,7 @@ public class InsertIntoTableCommand extends Command implements NeedAuditEncrypti private Optional logicalQuery; private Optional labelName; private Optional branchName; + private Optional parsedPlan; /** * When source it's from job scheduler,it will be set. */ @@ -153,6 +155,10 @@ public LogicalPlan getLogicalQuery() { return logicalQuery.orElse(originLogicalQuery); } + public Optional getParsedPlan() { + return parsedPlan; + } + protected void setLogicalQuery(LogicalPlan logicalQuery) { this.logicalQuery = Optional.of(logicalQuery); } @@ -233,6 +239,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec throw new IllegalStateException(e.getMessage(), e); } insertExecutor = buildResult.executor; + parsedPlan = Optional.ofNullable(buildResult.planner.getParsedPlan()); if (!needBeginTransaction) { return insertExecutor; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java index bef0bb92d20593..f05f1791f2fe91 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java @@ -20,22 +20,31 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.functions.table.S3; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand; import org.apache.doris.nereids.trees.plans.commands.CreateFunctionCommand; import org.apache.doris.nereids.trees.plans.commands.DropFunctionCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.utframe.DorisAssert; import org.apache.doris.utframe.UtFrameUtils; +import com.google.common.collect.ImmutableList; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.UUID; @@ -63,9 +72,27 @@ public static void teardown() { public void testDropGlobalFunction() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); // 1. create database db1 - String sql = "create database db1;"; + //String sql = "create database db1;"; + String sql = "insert into db1.tb select * from s3('url'='s3://a/*.csv')"; NereidsParser nereidsParser = new NereidsParser(); LogicalPlan logicalPlan = nereidsParser.parseSingle(sql); + InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(sql); + baseCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); + Map map = new HashMap<>(); + map.put("url" ,"s3:/xxxx/*."); + // rewrite plan + Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> { + if (plan instanceof LogicalTVFRelation) { + LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan; + LogicalTVFRelation newRvfRel = new LogicalTVFRelation( + originTvfRel.getRelationId(), new S3(new Properties(map)), ImmutableList.of()); + return newRvfRel; + } + return plan; + }); + InsertIntoTableCommand s = new InsertIntoTableCommand((LogicalPlan) rewritePlan, Optional.empty(), Optional.empty(), + Optional.empty(), true, Optional.empty()); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); if (logicalPlan instanceof CreateDatabaseCommand) { ((CreateDatabaseCommand) logicalPlan).run(connectContext, stmtExecutor); diff --git a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out new file mode 100644 index 00000000000000..867b1f9432ba17 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +1 Emily 25 +2 Benjamin 35 +3 Olivia 28 +4 Alexander 60 +5 Ava 17 +6 William 69 +7 Sophia 32 +8 James 64 +9 Emma 37 +10 Liam 64 +11 Alexander 34 +12 Isabella 43 +13 Benjamin 56 +14 Sophia 12 +15 Christopher 33 +16 Emma 23 +17 Michael 11 +18 Olivia 38 +19 Daniel 19 +20 Ava 28 \ No newline at end of file diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy new file mode 100644 index 00000000000000..bd8ee2759fd138 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_insert_job") { + def tableName = "test_streaming_insert_job_tbl" + def jobName = "test_streaming_insert_job_name" + + sql """drop table if exists `${tableName}` force""" + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int NULL, + `c2` string NULL, + `c3` int NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + + // create recurring job + sql """ + CREATE JOB ${jobName} ON STREAMING DO INSERT INTO ${tableName} + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + Awaitility.await().atMost(30, SECONDS).until( + { + print("check success task count") + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='STREAMING' """ + // check job status and succeed task count larger than 1 + jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0) + } + ) + + sql """ + PAUSE JOB where jobname = '${jobName}' + """ + def pausedJobStatus = sql """ + select status from jobs("type"="insert") where Name='${jobName}' + """ + assert pausedJobStatus.get(0).get(0) == "PAUSED" + + qt_select """ SELECT * FROM ${tableName} order by c1 """ + + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + +}