Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ private void persistOffsetProviderIfNeed() {
}

public void replayOffsetProviderIfNeed() throws JobException {
if (this.offsetProviderPersist != null && offsetProvider != null) {
if (offsetProvider != null) {
offsetProvider.replayIfNeed(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,19 +332,19 @@ public Offset deserializeOffsetProperty(String offset) {
@Override
public void replayIfNeed(StreamingInsertJob job) throws JobException {
String offsetProviderPersist = job.getOffsetProviderPersist();
if (job.getOffsetProviderPersist() == null) {
return;
}
JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist,
JdbcSourceOffsetProvider.class);
this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist();
this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap();

if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
currentOffset = new JdbcOffset();
currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist));
} else {
try {
if (offsetProviderPersist != null) {
JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist,
JdbcSourceOffsetProvider.class);
this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist();
this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap();
log.info("Replaying offset provider for job {}, binlogOffset size {}, chunkHighWatermark size {}",
getJobId(),
binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(),
chunkHighWatermarkMap == null ? 0 : chunkHighWatermarkMap.size());
if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
currentOffset = new JdbcOffset();
currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist));
} else {
Map<String, List<SnapshotSplit>> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId());
if (MapUtils.isNotEmpty(chunkHighWatermarkMap) && MapUtils.isNotEmpty(snapshotSplits)) {
SnapshotSplit lastSnapshotSplit = recalculateRemainingSplits(chunkHighWatermarkMap, snapshotSplits);
Expand All @@ -353,10 +353,20 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException {
currentOffset.setSplit(lastSnapshotSplit);
}
}
} catch (Exception ex) {
log.warn("Replay snapshot splits error with job {} ", job.getJobId(), ex);
throw new JobException(ex);
}
} else if (checkNeedSplitChunks(sourceProperties)
&& CollectionUtils.isEmpty(remainingSplits)
&& CollectionUtils.isEmpty(finishedSplits)
&& MapUtils.isEmpty(chunkHighWatermarkMap)
&& MapUtils.isEmpty(binlogOffsetPersist)) {
// After the Job is created for the first time, starting from the initial offset,
// the task for the first split is scheduled, When the task status is running or failed,
// If FE restarts, the split needs to be restore from the meta again.
log.info("Replaying offset provider for job {}, offsetProviderPersist is empty", getJobId());
Map<String, List<SnapshotSplit>> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId());
recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
} else {
log.info("No need to replay offset provider for job {}", getJobId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static void createMetaTableIfNotExist() throws Exception {
}
}

public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long jobId) throws IOException {
public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long jobId) throws JobException {
List<ResultRow> resultRows;
String sql = String.format(SELECT_SPLITS_TABLE_TEMPLATE, jobId);
try (AutoCloseConnectContext context
Expand All @@ -130,12 +130,17 @@ public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long jobId) th
}

Map<String, List<SnapshotSplit>> tableSplits = new LinkedHashMap<>();
for (ResultRow row : resultRows) {
String tableName = row.get(0);
String chunkListStr = row.get(1);
List<SnapshotSplit> splits =
new ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, SnapshotSplit[].class)));
tableSplits.put(tableName, splits);
try {
for (ResultRow row : resultRows) {
String tableName = row.get(0);
String chunkListStr = row.get(1);
List<SnapshotSplit> splits =
new ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, SnapshotSplit[].class)));
tableSplits.put(tableName, splits);
}
} catch (IOException ex) {
log.warn("Failed to deserialize snapshot splits from job {} meta table: {}", jobId, ex.getMessage());
throw new JobException(ex);
}
return tableSplits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j
SourceRecords sourceRecords = null;
String currentSplitId = null;
DebeziumReader<SourceRecords, MySqlSplit> currentReader = null;
LOG.info("Get a split: {}", split.splitId());
LOG.info("Get a split: {}", split.toString());
if (split instanceof MySqlSnapshotSplit) {
currentReader = getSnapshotSplitReader(jobConfig);
} else if (split instanceof MySqlBinlogSplit) {
Expand Down
Loading