From efe6bb42fb3d39ba914410cacc21fce972665d84 Mon Sep 17 00:00:00 2001 From: wudi Date: Thu, 15 Jan 2026 10:55:57 +0800 Subject: [PATCH] [Fix](StreamingJob) fix the first split task scheduled and fe restart remainsplit relay problem (#59883) ### What problem does this PR solve? Related PR: https://github.com/apache/doris/pull/58898 After the Job is created for the first time, starting from the initial offset, the task for the first split is scheduled, When the task status is running or failed, If FE restarts, the split needs to be restore from the meta again. --- .../insert/streaming/StreamingInsertJob.java | 2 +- .../offset/jdbc/JdbcSourceOffsetProvider.java | 42 ++++++++++++------- .../doris/job/util/StreamingJobUtils.java | 19 +++++---- .../reader/mysql/MySqlSourceReader.java | 2 +- 4 files changed, 40 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 30f115b3ba5e25..4dade2b4ec8b11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1133,7 +1133,7 @@ private void persistOffsetProviderIfNeed() { } public void replayOffsetProviderIfNeed() throws JobException { - if (this.offsetProviderPersist != null && offsetProvider != null) { + if (offsetProvider != null) { offsetProvider.replayIfNeed(this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 0c114ae8e64d3f..51f3c07aa41bf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -332,19 +332,19 @@ public Offset deserializeOffsetProperty(String offset) { @Override public void replayIfNeed(StreamingInsertJob job) throws JobException { String offsetProviderPersist = job.getOffsetProviderPersist(); - if (job.getOffsetProviderPersist() == null) { - return; - } - JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist, - JdbcSourceOffsetProvider.class); - this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist(); - this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap(); - - if (MapUtils.isNotEmpty(binlogOffsetPersist)) { - currentOffset = new JdbcOffset(); - currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist)); - } else { - try { + if (offsetProviderPersist != null) { + JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist, + JdbcSourceOffsetProvider.class); + this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist(); + this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap(); + log.info("Replaying offset provider for job {}, binlogOffset size {}, chunkHighWatermark size {}", + getJobId(), + binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(), + chunkHighWatermarkMap == null ? 0 : chunkHighWatermarkMap.size()); + if (MapUtils.isNotEmpty(binlogOffsetPersist)) { + currentOffset = new JdbcOffset(); + currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist)); + } else { Map> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId()); if (MapUtils.isNotEmpty(chunkHighWatermarkMap) && MapUtils.isNotEmpty(snapshotSplits)) { SnapshotSplit lastSnapshotSplit = recalculateRemainingSplits(chunkHighWatermarkMap, snapshotSplits); @@ -353,10 +353,20 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException { currentOffset.setSplit(lastSnapshotSplit); } } - } catch (Exception ex) { - log.warn("Replay snapshot splits error with job {} ", job.getJobId(), ex); - throw new JobException(ex); } + } else if (checkNeedSplitChunks(sourceProperties) + && CollectionUtils.isEmpty(remainingSplits) + && CollectionUtils.isEmpty(finishedSplits) + && MapUtils.isEmpty(chunkHighWatermarkMap) + && MapUtils.isEmpty(binlogOffsetPersist)) { + // After the Job is created for the first time, starting from the initial offset, + // the task for the first split is scheduled, When the task status is running or failed, + // If FE restarts, the split needs to be restore from the meta again. + log.info("Replaying offset provider for job {}, offsetProviderPersist is empty", getJobId()); + Map> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId()); + recalculateRemainingSplits(new HashMap<>(), snapshotSplits); + } else { + log.info("No need to replay offset provider for job {}", getJobId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index bac12ae3eba6b7..05c7570712484f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -120,7 +120,7 @@ public static void createMetaTableIfNotExist() throws Exception { } } - public static Map> restoreSplitsToJob(Long jobId) throws IOException { + public static Map> restoreSplitsToJob(Long jobId) throws JobException { List resultRows; String sql = String.format(SELECT_SPLITS_TABLE_TEMPLATE, jobId); try (AutoCloseConnectContext context @@ -130,12 +130,17 @@ public static Map> restoreSplitsToJob(Long jobId) th } Map> tableSplits = new LinkedHashMap<>(); - for (ResultRow row : resultRows) { - String tableName = row.get(0); - String chunkListStr = row.get(1); - List splits = - new ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, SnapshotSplit[].class))); - tableSplits.put(tableName, splits); + try { + for (ResultRow row : resultRows) { + String tableName = row.get(0); + String chunkListStr = row.get(1); + List splits = + new ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, SnapshotSplit[].class))); + tableSplits.put(tableName, splits); + } + } catch (IOException ex) { + log.warn("Failed to deserialize snapshot splits from job {} meta table: {}", jobId, ex.getMessage()); + throw new JobException(ex); } return tableSplits; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index a3f14a953b6231..795deb55c261d4 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -433,7 +433,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j SourceRecords sourceRecords = null; String currentSplitId = null; DebeziumReader currentReader = null; - LOG.info("Get a split: {}", split.splitId()); + LOG.info("Get a split: {}", split.toString()); if (split instanceof MySqlSnapshotSplit) { currentReader = getSnapshotSplitReader(jobConfig); } else if (split instanceof MySqlBinlogSplit) {