Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -747,9 +747,7 @@ private int tryCommit(
retryResult = (RetryResult) result;

if (retryCount >= commitMaxRetries) {
if (retryResult != null) {
retryResult.cleanAll();
}
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
Expand All @@ -767,75 +765,52 @@ private int tryOverwrite(
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
int retryCount = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();

List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
if (latestSnapshot != null) {
List<ManifestEntry> currentEntries =
scan.withSnapshot(latestSnapshot)
.withPartitionFilter(partitionFilter)
.withKind(ScanMode.ALL)
.plan()
.files();
for (ManifestEntry entry : currentEntries) {
changesWithOverwrite.add(
new ManifestEntry(
FileKind.DELETE,
entry.partition(),
entry.bucket(),
entry.totalBuckets(),
entry.file()));
}
// collect all files with overwrite
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
if (latestSnapshot != null) {
List<ManifestEntry> currentEntries =
scan.withSnapshot(latestSnapshot)
.withPartitionFilter(partitionFilter)
.withKind(ScanMode.ALL)
.plan()
.files();
for (ManifestEntry entry : currentEntries) {
changesWithOverwrite.add(
new ManifestEntry(
FileKind.DELETE,
entry.partition(),
entry.bucket(),
entry.totalBuckets(),
entry.file()));
}

// collect index files
if (latestSnapshot.indexManifest() != null) {
List<IndexManifestEntry> entries =
indexManifestFile.read(latestSnapshot.indexManifest());
for (IndexManifestEntry entry : entries) {
if (partitionFilter == null || partitionFilter.test(entry.partition())) {
indexChangesWithOverwrite.add(entry.toDeleteEntry());
}
// collect index files
if (latestSnapshot.indexManifest() != null) {
List<IndexManifestEntry> entries =
indexManifestFile.read(latestSnapshot.indexManifest());
for (IndexManifestEntry entry : entries) {
if (partitionFilter == null || partitionFilter.test(entry.partition())) {
indexChangesWithOverwrite.add(entry.toDeleteEntry());
}
}
}
changesWithOverwrite.addAll(changes);
indexChangesWithOverwrite.addAll(indexFiles);

CommitResult result =
tryCommitOnce(
null,
changesWithOverwrite,
Collections.emptyList(),
indexChangesWithOverwrite,
identifier,
watermark,
logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
mustConflictCheck(),
branchName,
null);

if (result.isSuccess()) {
break;
}

// TODO optimize OVERWRITE too
RetryResult retryResult = (RetryResult) result;
retryResult.cleanAll();

if (retryCount >= commitMaxRetries) {
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
retryCount++;
}
return retryCount + 1;
changesWithOverwrite.addAll(changes);
indexChangesWithOverwrite.addAll(indexFiles);

return tryCommit(
changesWithOverwrite,
Collections.emptyList(),
indexChangesWithOverwrite,
identifier,
watermark,
logOffsets,
Snapshot.CommitKind.OVERWRITE,
mustConflictCheck(),
branchName,
null);
}

@VisibleForTesting
Expand Down
Loading