From eb22a9b063f791fd45e2ca1b0a439f6933b5224d Mon Sep 17 00:00:00 2001 From: laihui Date: Thu, 11 Sep 2025 21:04:05 +0800 Subject: [PATCH] implement offset persistence and replay --- .../insert/streaming/StreamingInsertJob.java | 40 ++++++++++-- .../insert/streaming/StreamingInsertTask.java | 6 +- .../streaming/StreamingJobStatistic.java | 42 ++++++++++++ .../StreamingTaskTxnCommitAttachment.java | 65 +++++++++++++++++++ .../job/offset/SourceOffsetProvider.java | 4 +- .../apache/doris/job/offset/s3/S3Offset.java | 9 +++ .../job/offset/s3/S3SourceOffsetProvider.java | 2 +- .../doris/load/loadv2/LoadStatistic.java | 8 +++ 8 files changed, 162 insertions(+), 14 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index c88fd019c46b98..1b4cac14bc8b6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -36,6 +36,7 @@ import org.apache.doris.job.offset.SourceOffsetProviderFactory; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; @@ -48,6 +49,7 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TxnStateChangeCallback; +import com.google.common.base.Preconditions; import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; @@ -65,7 +67,7 @@ public class StreamingInsertJob extends AbstractJob createTasks(TaskType taskType, Map taskIds = new ArrayList<>(); + taskIds.add(runningStreamTask.getTaskId()); + List loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds); + if (loadJobs.size() != 1) { + throw new TransactionException("load job not found, insert job id is " + runningStreamTask.getTaskId()); + } + LoadJob loadJob = loadJobs.get(0); + LoadStatistic loadStatistic = loadJob.getLoadStatistic(); + txnState.setTxnCommitAttachment(new StreamingTaskTxnCommitAttachment( + loadStatistic.getScannedRows(), + loadStatistic.getLoadBytes(), + loadStatistic.getFileNumber(), + loadStatistic.getTotalFileSizeB(), + runningStreamTask.getOffset())); } @Override @@ -254,12 +276,18 @@ public void beforeAborted(TransactionState txnState) throws TransactionException @Override public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException { - + Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState); + StreamingTaskTxnCommitAttachment attachment = + (StreamingTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); + updateJobStatisticAndOffset(attachment); } @Override public void replayOnCommitted(TransactionState txnState) { - + Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState); + StreamingTaskTxnCommitAttachment attachment = + (StreamingTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); + updateJobStatisticAndOffset(attachment); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java index 56a59d3a7f6a7b..cb880c8ac40b9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java @@ -26,7 +26,6 @@ import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.InsertTask; import org.apache.doris.job.offset.Offset; -import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; @@ -59,7 +58,6 @@ public class StreamingInsertTask { private String currentDb; private UserIdentity userIdentity; private ConnectContext ctx; - private LoadStatistic loadStatistic; private Offset offset; private AtomicBoolean isCanceled = new AtomicBoolean(false); private StreamingJobProperties jobProperties; @@ -67,14 +65,12 @@ public class StreamingInsertTask { public StreamingInsertTask(long jobId, long taskId, InsertIntoTableCommand command, - LoadStatistic loadStatistic, String currentDb, Offset offset, StreamingJobProperties jobProperties) { this.jobId = jobId; this.taskId = taskId; this.command = command; - this.loadStatistic = loadStatistic; this.userIdentity = ctx.getCurrentUserIdentity(); this.currentDb = currentDb; this.offset = offset; @@ -127,7 +123,7 @@ private void run() throws JobException { log.info("task has been canceled, task id is {}", getTaskId()); return; } - command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic); + command.runWithUpdateInfo(ctx, stmtExecutor, null); if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) { return; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java new file mode 100644 index 00000000000000..70e5c9f6d0f3cf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert.streaming; + +import org.apache.doris.persist.gson.GsonUtils; + +import lombok.Getter; +import lombok.Setter; + +public class StreamingJobStatistic { + @Getter + @Setter + private long scannedRows; + @Getter + @Setter + private long loadBytes; + @Getter + @Setter + private long fileNumber; + @Getter + @Setter + private long fileSize; + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java new file mode 100644 index 00000000000000..8a8768dee34b22 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert.streaming; + +import org.apache.doris.job.offset.Offset; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TxnCommitAttachment; + +import com.google.gson.annotations.SerializedName; +import lombok.Getter; + +public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment { + + public StreamingTaskTxnCommitAttachment(long scannedRows, long loadBytes, + long fileNumber, long fileSize, Offset offset) { + super(TransactionState.LoadJobSourceType.BACKEND_STREAMING); + this.scannedRows = scannedRows; + this.loadBytes = loadBytes; + this.fileNumber = fileNumber; + this.fileSize = fileSize; + this.offset = offset; + } + + @SerializedName(value = "sr") + @Getter + private long scannedRows; + @SerializedName(value = "lb") + @Getter + private long loadBytes; + @SerializedName(value = "fn") + @Getter + private long fileNumber; + @SerializedName(value = "fs") + @Getter + private long fileSize; + @SerializedName(value = "of") + @Getter + private Offset offset; + + @Override + public String toString() { + return "StreamingTaskTxnCommitAttachment: [" + + "scannedRows=" + scannedRows + + ", loadBytes=" + loadBytes + + ", fileNumber=" + fileNumber + + ", fileSize=" + fileSize + + ", offset=" + offset.toString() + + "]"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index 3d62073929068b..564327ed0cc1cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -49,10 +49,10 @@ public interface SourceOffsetProvider { InsertIntoTableCommand rewriteTvfParams(String sql); /** - * Update the progress of the source. + * Update the offset of the source. * @param offset */ - void updateProgress(Offset offset); + void updateOffset(Offset offset); /** * Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka. diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index a175575757f080..57f89b9950590d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -34,4 +34,13 @@ public class S3Offset implements Offset { public String toJson() { return GsonUtils.GSON.toJson(this); } + + @Override + public String toString() { + return "S3Offset: [" + + "startFile=" + startFile + + ", endFile=" + endFile + + ", fileLists=" + fileLists + + "]"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index 771736a9559f93..e52d9995051e11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -59,7 +59,7 @@ public InsertIntoTableCommand rewriteTvfParams(String sql) { } @Override - public void updateProgress(Offset offset) { + public void updateOffset(Offset offset) { this.currentOffset = (S3Offset) offset; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java index 43c67098bfd1f0..58137d34886ff1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java @@ -123,6 +123,14 @@ public Map getCounters() { return counters; } + public int getFileNumber() { + return fileNum; + } + + public long getTotalFileSizeB() { + return totalFileSizeB; + } + public synchronized String toJson() { long total = 0; for (long rows : counterTbl.values()) {