From e9ca8098f255a0ea2417a5a93c169fa9186189b4 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 12 Sep 2025 21:09:58 +0800 Subject: [PATCH] add fetchmeta and fix rewrite tvf params --- .../insert/streaming/StreamingInsertJob.java | 64 +++++++++---- .../insert/streaming/StreamingInsertTask.java | 32 +++---- .../job/offset/SourceOffsetProvider.java | 25 +++-- .../job/offset/s3/S3SourceOffsetProvider.java | 91 ++++++++++++------- .../insert/InsertIntoTableCommand.java | 47 ++-------- 5 files changed, 146 insertions(+), 113 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 84e42c6e1f62bc..a4e05b650e35c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -56,6 +56,7 @@ import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; +import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; import java.io.DataOutput; @@ -65,6 +66,7 @@ import java.util.List; import java.util.Map; +@Log4j2 public class StreamingInsertJob extends AbstractJob> implements TxnStateChangeCallback, GsonPostProcessable { private final long dbId; @@ -83,14 +85,16 @@ public class StreamingInsertJob extends AbstractJob originTvfProps; @Getter StreamingInsertTask runningStreamTask; SourceOffsetProvider offsetProvider; @Setter @Getter private long lastScheduleTaskTimestamp = -1L; + private InsertIntoTableCommand baseCommand; public StreamingInsertJob(String jobName, JobStatus jobStatus, @@ -105,17 +109,28 @@ public StreamingInsertJob(String jobName, jobConfig, createTimeMs, executeSql); this.dbId = ConnectContext.get().getCurrentDbId(); this.jobProperties = jobProperties; - this.tvfType = parseTvfType(); - this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType); - this.offsetProvider.init(getExecuteSql(), jobProperties); + init(); } - private String parseTvfType() { - NereidsParser parser = new NereidsParser(); - InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(getExecuteSql()); - UnboundTVFRelation firstTVF = command.getFirstTVF(); - Preconditions.checkNotNull(firstTVF, "Only support insert sql with tvf"); - return firstTVF.getFunctionName(); + private void init() { + try { + UnboundTVFRelation currentTvf = getCurrentTvf(); + this.originTvfProps = currentTvf.getProperties().getMap(); + this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName()); + } catch (Exception ex) { + log.warn("init streaming insert job failed, sql: {}", getExecuteSql(), ex); + throw new RuntimeException("init streaming insert job failed, sql: " + getExecuteSql(), ex); + } + } + + private UnboundTVFRelation getCurrentTvf() throws Exception { + if (baseCommand == null) { + this.baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(getExecuteSql()); + } + List allTVFRelation = baseCommand.getAllTVFRelation(); + Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support one source in insert streaming job"); + UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0); + return unboundTVFRelation; } @Override @@ -155,7 +170,14 @@ protected StreamingInsertTask createStreamingInsertTask() { } protected void fetchMeta() { - offsetProvider.fetchRemoteMeta(); + try { + if (originTvfProps == null) { + this.originTvfProps = getCurrentTvf().getProperties().getMap(); + } + offsetProvider.fetchRemoteMeta(originTvfProps); + } catch (Exception ex) { + log.warn("fetch remote meta failed, job id: {}", getJobId(), ex); + } } public boolean needScheduleTask() { @@ -228,8 +250,19 @@ public TRow getTvfInfo() { trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getFailedTaskCount().get()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getCanceledTaskCount().get()))); trow.addToColumnValue(new TCell().setStringVal(getComment())); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + + if (offsetProvider != null && offsetProvider.getSyncOffset() != null) { + trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getSyncOffset())); + } else { + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + } + if (offsetProvider != null && offsetProvider.getRemoteOffset() != null) { + trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getRemoteOffset())); + } else { + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + } + + trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getRemoteOffset())); trow.addToColumnValue(new TCell().setStringVal( jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson())); trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg())); @@ -324,9 +357,8 @@ public void replayOnVisible(TransactionState txnState) { @Override public void gsonPostProcess() throws IOException { - if (offsetProvider == null && jobProperties != null && tvfType != null) { - this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType); - // this.offsetProvider.init(getExecuteSql(), jobProperties); + if (offsetProvider == null) { + offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java index 1227624fc8f5cc..73ac267e678fe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java @@ -57,15 +57,15 @@ public class StreamingInsertTask { private Long startTimeMs; private Long finishTimeMs; private String sql; - private SourceOffsetProvider offsetProvider; private StmtExecutor stmtExecutor; - private InsertIntoTableCommand command; + private InsertIntoTableCommand taskCommand; private String currentDb; private UserIdentity userIdentity; private ConnectContext ctx; private Offset runningOffset; private AtomicBoolean isCanceled = new AtomicBoolean(false); private StreamingJobProperties jobProperties; + SourceOffsetProvider offsetProvider; public StreamingInsertTask(long jobId, long taskId, @@ -73,13 +73,13 @@ public StreamingInsertTask(long jobId, SourceOffsetProvider offsetProvider, String currentDb, StreamingJobProperties jobProperties, - UserIdentity userIdentity) { + UserIdentity userIdentity) { this.jobId = jobId; this.taskId = taskId; this.sql = sql; - this.offsetProvider = offsetProvider; this.userIdentity = userIdentity; this.currentDb = currentDb; + this.offsetProvider = offsetProvider; this.jobProperties = jobProperties; this.labelName = getJobId() + LABEL_SPLITTER + getTaskId(); this.createTimeMs = System.currentTimeMillis(); @@ -106,7 +106,7 @@ public void execute() throws JobException { } } - private void before() throws JobException { + private void before() throws Exception { this.status = TaskStatus.RUNNING; this.startTimeMs = System.currentTimeMillis(); @@ -117,12 +117,12 @@ private void before() throws JobException { ctx.setSessionVariable(jobProperties.getSessionVariable()); StatementContext statementContext = new StatementContext(); ctx.setStatementContext(statementContext); - offsetProvider.init(sql, jobProperties); - this.runningOffset = offsetProvider.getNextOffset(); - this.command = offsetProvider.rewriteTvfParams(runningOffset); - this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId())); - this.command.setJobId(getTaskId()); - stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); + + this.runningOffset = offsetProvider.getNextOffset(jobProperties, jobProperties.getProperties()); + this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset); + this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId())); + this.taskCommand.setJobId(getTaskId()); + this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext())); } private void run() throws JobException { @@ -134,7 +134,7 @@ private void run() throws JobException { log.info("task has been canceled, task id is {}", getTaskId()); return; } - command.run(ctx, stmtExecutor); + taskCommand.run(ctx, stmtExecutor); if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) { return; } else { @@ -142,13 +142,13 @@ private void run() throws JobException { } log.error( "streaming insert failed with {}, reason {}, to retry", - command.getLabelName(), + taskCommand.getLabelName(), errMsg); if (retry == MAX_RETRY) { errMsg = "reached max retry times, failed with" + errMsg; } } catch (Exception e) { - log.warn("execute insert task error, label is {},offset is {}", command.getLabelName(), + log.warn("execute insert task error, label is {},offset is {}", taskCommand.getLabelName(), runningOffset.toJson(), e); errMsg = Util.getRootCauseMessage(e); } @@ -209,8 +209,8 @@ public void closeOrReleaseResources() { if (null != stmtExecutor) { stmtExecutor = null; } - if (null != command) { - command = null; + if (null != taskCommand) { + taskCommand = null; } if (null != ctx) { ctx = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index 49a272c664f629..d9b2264d5b683d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -20,16 +20,13 @@ import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import java.util.Map; + /** * Interface for managing offsets and metadata of a data source. */ public interface SourceOffsetProvider { - /** - * init - */ - void init(String executeSql, StreamingJobProperties jobProperties); - /** * Get source type, e.g. s3, kafka * @return @@ -40,7 +37,7 @@ public interface SourceOffsetProvider { * Get next offset to consume * @return */ - Offset getNextOffset(); + Offset getNextOffset(StreamingJobProperties jobProps, Map properties); /** * Get current offset @@ -48,12 +45,24 @@ public interface SourceOffsetProvider { */ Offset getCurrentOffset(); + /** + * Get sync offset to show + * @return + */ + String getSyncOffset(); + + /** + * Get remote offset + * @return + */ + String getRemoteOffset(); + /** * Rewrite the TVF parameters in the SQL based on the current offset. * @param nextOffset * @return rewritten InsertIntoTableCommand */ - InsertIntoTableCommand rewriteTvfParams(Offset nextOffset); + InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset nextOffset) throws Exception; /** * Update the offset of the source. @@ -64,7 +73,7 @@ public interface SourceOffsetProvider { /** * Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka. */ - void fetchRemoteMeta(); + void fetchRemoteMeta(Map properties) throws Exception; /** * Whether there is more data to consume diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index b858a1e6431af3..a95f4af65c94b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -17,51 +17,35 @@ package org.apache.doris.job.offset.s3; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.job.offset.Offset; import org.apache.doris.job.offset.SourceOffsetProvider; -import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.functions.table.S3; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation; +import org.apache.doris.qe.ConnectContext; +import com.google.common.collect.ImmutableList; import lombok.extern.log4j.Log4j2; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; @Log4j2 public class S3SourceOffsetProvider implements SourceOffsetProvider { - String executeSql; S3Offset currentOffset; String maxRemoteEndFile; - StreamingJobProperties jobProperties; - NereidsParser parser; - String filePath; - StorageProperties storageProperties; - - @Override - public void init(String executeSql, StreamingJobProperties jobProperties) { - //todo: check is already init - this.executeSql = executeSql; - this.jobProperties = jobProperties; - this.parser = new NereidsParser(); - InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(executeSql); - UnboundTVFRelation firstTVF = command.getFirstTVF(); - Map properties = firstTVF.getProperties().getMap(); - try { - this.storageProperties = StorageProperties.createPrimary(properties); - String uri = storageProperties.validateAndGetUri(properties); - this.filePath = storageProperties.validateAndNormalizeUri(uri); - } catch (UserException e) { - throw new RuntimeException("Failed check storage props, " + e.getMessage(), e); - } - } + InsertIntoTableCommand baseCommand; @Override public String getSourceType() { @@ -69,13 +53,17 @@ public String getSourceType() { } @Override - public S3Offset getNextOffset() { + public S3Offset getNextOffset(StreamingJobProperties jobProps, Map properties) { S3Offset offset = new S3Offset(); List rfiles = new ArrayList<>(); String startFile = currentOffset == null ? null : currentOffset.endFile; + String filePath = null; + StorageProperties storageProperties = StorageProperties.createPrimary(properties); try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { + String uri = storageProperties.validateAndGetUri(properties); + filePath = storageProperties.validateAndNormalizeUri(uri); maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, startFile, - jobProperties.getS3BatchFiles(), jobProperties.getS3BatchSize()); + jobProps.getS3BatchFiles(), jobProps.getS3BatchSize()); offset.setStartFile(startFile); offset.setEndFile(rfiles.get(rfiles.size() - 1)); offset.setFileLists(rfiles); @@ -92,15 +80,41 @@ public Offset getCurrentOffset() { } @Override - public InsertIntoTableCommand rewriteTvfParams(Offset runningOffset) { + public String getSyncOffset() { + if (currentOffset != null) { + return currentOffset.getEndFile(); + } + return null; + } + + @Override + public String getRemoteOffset() { + return maxRemoteEndFile; + } + + @Override + public InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset runningOffset) throws Exception { S3Offset offset = (S3Offset) runningOffset; Map props = new HashMap<>(); String finalUri = "{" + String.join(",", offset.getFileLists()) + "}"; props.put("uri", finalUri); - InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(executeSql); - //todo: command query plan is immutable - //command.rewriteFirstTvfProperties(getSourceType(), props); - return command; + if (baseCommand == null) { + this.baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(executeSql); + this.baseCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); + } + + // rewrite plan + Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> { + if (plan instanceof LogicalTVFRelation) { + LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan; + LogicalTVFRelation newRvfRel = new LogicalTVFRelation( + originTvfRel.getRelationId(), new S3(new Properties(props)), ImmutableList.of()); + return newRvfRel; + } + return plan; + }); + return new InsertIntoTableCommand((LogicalPlan) rewritePlan, Optional.empty(), Optional.empty(), + Optional.empty(), true, Optional.empty()); } @Override @@ -109,8 +123,17 @@ public void updateOffset(Offset offset) { } @Override - public void fetchRemoteMeta() { - // list object + public void fetchRemoteMeta(Map properties) throws Exception { + StorageProperties storageProperties = StorageProperties.createPrimary(properties); + String startFile = currentOffset == null ? null : currentOffset.endFile; + try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { + String uri = storageProperties.validateAndGetUri(properties); + String filePath = storageProperties.validateAndNormalizeUri(uri); + maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new ArrayList<>(), startFile, + 1, 1); + } catch (Exception e) { + throw e; + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 04a1db682c5e38..6736b16f8afeb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -79,11 +79,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -540,51 +539,21 @@ public List getTargetColumns() { } // todo: add ut - public UnboundTVFRelation getFirstTVF() { - return getFirstTvfInPlan(getLogicalQuery()); + public List getAllTVFRelation() { + List tvfs = new ArrayList<>(); + findAllTVFInPlan(getLogicalQuery(), tvfs); + return tvfs; } - private UnboundTVFRelation getFirstTvfInPlan(LogicalPlan plan) { + private void findAllTVFInPlan(LogicalPlan plan, List tvfs) { if (plan instanceof UnboundTVFRelation) { UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan; - return tvfRelation; + tvfs.add(tvfRelation); } for (Plan child : plan.children()) { if (child instanceof LogicalPlan) { - UnboundTVFRelation result = getFirstTvfInPlan((LogicalPlan) child); - if (result != null) { - return result; - } - } - } - return null; - } - - // todo: add ut - public void rewriteFirstTvfProperties(String functionName, Map props) { - AtomicBoolean found = new AtomicBoolean(false); - rewriteFirstTvfInPlan(originLogicalQuery, functionName, props, found); - } - - private void rewriteFirstTvfInPlan(LogicalPlan plan, - String functionName, Map props, AtomicBoolean found) { - if (found.get()) { - return; - } - - if (plan instanceof UnboundTVFRelation) { - UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan; - if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) { - tvfRelation.getProperties().getMap().putAll(props); - found.set(true); - return; - } - } - - for (Plan child : plan.children()) { - if (child instanceof LogicalPlan) { - rewriteFirstTvfInPlan((LogicalPlan) child, functionName, props, found); + findAllTVFInPlan((LogicalPlan) child, tvfs); } } }