diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 650b9f412b2952..b872040c4c354a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -43,7 +43,6 @@ import org.apache.doris.job.offset.SourceOffsetProvider; import org.apache.doris.job.offset.SourceOffsetProviderFactory; import org.apache.doris.job.task.AbstractTask; -import org.apache.doris.load.FailMsg; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.analyzer.UnboundTVFRelation; @@ -238,11 +237,11 @@ public void onTaskSuccess(StreamingJobSchedulerTask task) throws JobException { public void onStreamTaskFail(StreamingInsertTask task) throws JobException { try { - failedTaskCount.incrementAndGet(); - Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task); - if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { - this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg()); - } + failedTaskCount.incrementAndGet(); + Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task); + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg()); + } } finally { lock.writeLock().unlock(); } @@ -321,7 +320,8 @@ public TRow getTvfInfo() { trow.addToColumnValue(new TCell().setStringVal( jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson())); - trow.addToColumnValue(new TCell().setStringVal(pauseReason == null ? FeConstants.null_string : pauseReason.getMsg())); + trow.addToColumnValue( + new TCell().setStringVal(pauseReason == null ? FeConstants.null_string : pauseReason.getMsg())); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java index e71b169ef21c3d..a79bc1b230d6d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java @@ -24,7 +24,6 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.SessionVariable; -import com.google.gson.annotations.SerializedName; import lombok.Data; import java.util.HashMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index cbd8645cc594b9..2ab2030fbbb3e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -23,8 +23,6 @@ import lombok.Getter; import lombok.Setter; -import java.util.List; - @Getter @Setter public class S3Offset implements Offset { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index df4a344f06402b..f63333468fa426 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -72,7 +72,8 @@ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map map = new HashMap<>(); - map.put("url" ,"s3:/xxxx/*."); - // rewrite plan - Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> { - if (plan instanceof LogicalTVFRelation) { - LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan; - LogicalTVFRelation newRvfRel = new LogicalTVFRelation( - originTvfRel.getRelationId(), new S3(new Properties(map)), ImmutableList.of()); - return newRvfRel; - } - return plan; - }); - InsertIntoTableCommand s = new InsertIntoTableCommand((LogicalPlan) rewritePlan, Optional.empty(), Optional.empty(), - Optional.empty(), true, Optional.empty()); + map.put("url", "s3:/xxxx/*."); StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); if (logicalPlan instanceof CreateDatabaseCommand) {