From 11c55d16923beb4e62536903a4e3f559ccca1a95 Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 14 Jan 2026 19:12:19 +0800 Subject: [PATCH 1/2] fix split task restart --- .../insert/streaming/StreamingInsertJob.java | 2 +- .../offset/jdbc/JdbcSourceOffsetProvider.java | 44 +++++++++++-------- .../doris/job/util/StreamingJobUtils.java | 19 +++++--- .../reader/mysql/MySqlSourceReader.java | 2 +- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 30f115b3ba5e25..4dade2b4ec8b11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1133,7 +1133,7 @@ private void persistOffsetProviderIfNeed() { } public void replayOffsetProviderIfNeed() throws JobException { - if (this.offsetProviderPersist != null && offsetProvider != null) { + if (offsetProvider != null) { offsetProvider.replayIfNeed(this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 0c114ae8e64d3f..c74938e28865fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -41,7 +41,6 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -52,7 +51,6 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; - import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -332,19 +330,19 @@ public Offset deserializeOffsetProperty(String offset) { @Override public void replayIfNeed(StreamingInsertJob job) throws JobException { String offsetProviderPersist = job.getOffsetProviderPersist(); - if (job.getOffsetProviderPersist() == null) { - return; - } - JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist, - JdbcSourceOffsetProvider.class); - this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist(); - this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap(); - - if (MapUtils.isNotEmpty(binlogOffsetPersist)) { - currentOffset = new JdbcOffset(); - currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist)); - } else { - try { + if (offsetProviderPersist != null) { + JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist, + JdbcSourceOffsetProvider.class); + this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist(); + this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap(); + log.info("Replaying offset provider for job {}, binlogOffset size {}, chunkHighWatermark size {}", + getJobId(), + binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(), + chunkHighWatermarkMap == null ? 0 : chunkHighWatermarkMap.size()); + if (MapUtils.isNotEmpty(binlogOffsetPersist)) { + currentOffset = new JdbcOffset(); + currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist)); + } else { Map> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId()); if (MapUtils.isNotEmpty(chunkHighWatermarkMap) && MapUtils.isNotEmpty(snapshotSplits)) { SnapshotSplit lastSnapshotSplit = recalculateRemainingSplits(chunkHighWatermarkMap, snapshotSplits); @@ -353,10 +351,20 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException { currentOffset.setSplit(lastSnapshotSplit); } } - } catch (Exception ex) { - log.warn("Replay snapshot splits error with job {} ", job.getJobId(), ex); - throw new JobException(ex); } + } else if (checkNeedSplitChunks(sourceProperties) + && CollectionUtils.isEmpty(remainingSplits) + && CollectionUtils.isEmpty(finishedSplits) + && MapUtils.isEmpty(chunkHighWatermarkMap) + && MapUtils.isEmpty(binlogOffsetPersist)) { + // After the Job is created for the first time, starting from the initial offset, + // the task for the first split is scheduled, When the task status is running or failed, + // If FE restarts, the split needs to be restore from the meta again. + log.info("Replaying offset provider for job {}, offsetProviderPersist is empty", getJobId()); + Map> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId()); + recalculateRemainingSplits(new HashMap<>(), snapshotSplits); + } else { + log.info("No need to replay offset provider for job {}", getJobId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index bac12ae3eba6b7..05c7570712484f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -120,7 +120,7 @@ public static void createMetaTableIfNotExist() throws Exception { } } - public static Map> restoreSplitsToJob(Long jobId) throws IOException { + public static Map> restoreSplitsToJob(Long jobId) throws JobException { List resultRows; String sql = String.format(SELECT_SPLITS_TABLE_TEMPLATE, jobId); try (AutoCloseConnectContext context @@ -130,12 +130,17 @@ public static Map> restoreSplitsToJob(Long jobId) th } Map> tableSplits = new LinkedHashMap<>(); - for (ResultRow row : resultRows) { - String tableName = row.get(0); - String chunkListStr = row.get(1); - List splits = - new ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, SnapshotSplit[].class))); - tableSplits.put(tableName, splits); + try { + for (ResultRow row : resultRows) { + String tableName = row.get(0); + String chunkListStr = row.get(1); + List splits = + new ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, SnapshotSplit[].class))); + tableSplits.put(tableName, splits); + } + } catch (IOException ex) { + log.warn("Failed to deserialize snapshot splits from job {} meta table: {}", jobId, ex.getMessage()); + throw new JobException(ex); } return tableSplits; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index a3f14a953b6231..795deb55c261d4 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -433,7 +433,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j SourceRecords sourceRecords = null; String currentSplitId = null; DebeziumReader currentReader = null; - LOG.info("Get a split: {}", split.splitId()); + LOG.info("Get a split: {}", split.toString()); if (split instanceof MySqlSnapshotSplit) { currentReader = getSnapshotSplitReader(jobConfig); } else if (split instanceof MySqlBinlogSplit) { From e88c2e51608a8f70c2b412016b3ba2445a638dc7 Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 14 Jan 2026 19:20:08 +0800 Subject: [PATCH 2/2] fix --- .../apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index c74938e28865fd..51f3c07aa41bf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -41,6 +41,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -51,6 +52,7 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; + import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap;