From 41a23fc15ec5d2341d168400d081f80439432397 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 9 Mar 2022 18:30:31 +0800 Subject: [PATCH] [FLINK-26293] Store log offsets in snapshot --- .../flink/table/store/file/Snapshot.java | 14 ++++- .../file/operation/FileStoreCommitImpl.java | 41 ++++++++++--- .../flink/table/store/file/TestFileStore.java | 6 ++ .../file/operation/FileStoreCommitTest.java | 61 +++++++++++++++++++ 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java index 6fc84cee25ba..6d5c5d5a3340 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** This file is the entrance to all data committed at some specific time point. */ public class Snapshot { @@ -45,6 +46,7 @@ public class Snapshot { private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier"; private static final String FIELD_COMMIT_KIND = "commitKind"; private static final String FIELD_TIME_MILLIS = "timeMillis"; + private static final String FIELD_LOG_OFFSETS = "logOffsets"; @JsonProperty(FIELD_ID) private final long id; @@ -71,6 +73,9 @@ public class Snapshot { @JsonProperty(FIELD_TIME_MILLIS) private final long timeMillis; + @JsonProperty(FIELD_LOG_OFFSETS) + private final Map logOffsets; + @JsonCreator public Snapshot( @JsonProperty(FIELD_ID) long id, @@ -79,7 +84,8 @@ public Snapshot( @JsonProperty(FIELD_COMMIT_USER) String commitUser, @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier, @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, - @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) { + @JsonProperty(FIELD_TIME_MILLIS) long timeMillis, + @JsonProperty(FIELD_LOG_OFFSETS) Map logOffsets) { this.id = id; this.baseManifestList = baseManifestList; this.deltaManifestList = deltaManifestList; @@ -87,6 +93,7 @@ public Snapshot( this.commitIdentifier = commitIdentifier; this.commitKind = commitKind; this.timeMillis = timeMillis; + this.logOffsets = logOffsets; } @JsonGetter(FIELD_ID) @@ -124,6 +131,11 @@ public long timeMillis() { return timeMillis; } + @JsonGetter(FIELD_LOG_OFFSETS) + public Map getLogOffsets() { + return logOffsets; + } + public String toJson() { try { return new ObjectMapper().writeValueAsString(this); diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java index d39f05a0e482..9673d0258ee3 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java @@ -151,13 +151,23 @@ public void commit(ManifestCommittable committable, Map properti } List appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD); - tryCommit(appendChanges, committable.identifier(), Snapshot.CommitKind.APPEND, false); + tryCommit( + appendChanges, + committable.identifier(), + committable.logOffsets(), + Snapshot.CommitKind.APPEND, + false); List compactChanges = new ArrayList<>(); compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE)); compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD)); if (!compactChanges.isEmpty()) { - tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true); + tryCommit( + compactChanges, + committable.identifier(), + committable.logOffsets(), + Snapshot.CommitKind.COMPACT, + true); } } @@ -190,31 +200,42 @@ public void overwrite( } } // overwrite new files - tryOverwrite(partitionFilter, appendChanges, committable.identifier()); + tryOverwrite( + partitionFilter, appendChanges, committable.identifier(), committable.logOffsets()); List compactChanges = new ArrayList<>(); compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE)); compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD)); if (!compactChanges.isEmpty()) { - tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true); + tryCommit( + compactChanges, + committable.identifier(), + committable.logOffsets(), + Snapshot.CommitKind.COMPACT, + true); } } private void tryCommit( List changes, String hash, + Map logOffsets, Snapshot.CommitKind commitKind, boolean checkDeletedFiles) { while (true) { Long latestSnapshotId = pathFactory.latestSnapshotId(); - if (tryCommitOnce(changes, hash, commitKind, latestSnapshotId, checkDeletedFiles)) { + if (tryCommitOnce( + changes, hash, logOffsets, commitKind, latestSnapshotId, checkDeletedFiles)) { break; } } } private void tryOverwrite( - Predicate partitionFilter, List changes, String identifier) { + Predicate partitionFilter, + List changes, + String identifier, + Map logOffsets) { while (true) { Long latestSnapshotId = pathFactory.latestSnapshotId(); @@ -240,6 +261,7 @@ private void tryOverwrite( if (tryCommitOnce( changesWithOverwrite, identifier, + logOffsets, Snapshot.CommitKind.OVERWRITE, latestSnapshotId, false)) { @@ -274,6 +296,7 @@ private List collectChanges( private boolean tryCommitOnce( List changes, String identifier, + Map logOffsets, Snapshot.CommitKind commitKind, Long latestSnapshotId, boolean checkDeletedFiles) { @@ -306,6 +329,9 @@ private boolean tryCommitOnce( if (latestSnapshot != null) { // read all previous manifest files oldMetas.addAll(latestSnapshot.readAllManifests(manifestList)); + // read the last snapshot to complete the bucket's offsets when logOffsets does not + // contain all buckets + latestSnapshot.getLogOffsets().forEach(logOffsets::putIfAbsent); } // merge manifest files with changes newMetas.addAll( @@ -330,7 +356,8 @@ private boolean tryCommitOnce( commitUser, identifier, commitKind, - System.currentTimeMillis()); + System.currentTimeMillis(), + logOffsets); FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson()); } catch (Throwable e) { // fails when preparing for commit, we should clean up diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java index 005743595e3d..a92756e1b1eb 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java @@ -60,6 +60,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,6 +76,8 @@ public class TestFileStore extends FileStoreImpl { private final RowDataSerializer keySerializer; private final RowDataSerializer valueSerializer; + private static final AtomicInteger ID = new AtomicInteger(); + public static TestFileStore create( String format, String root, @@ -188,6 +191,9 @@ private List commitDataImpl( Increment increment = entryWithBucket.getValue().prepareCommit(); committable.addFileCommittable( entryWithPartition.getKey(), entryWithBucket.getKey(), increment); + if (!committable.logOffsets().containsKey(entryWithBucket.getKey())) { + committable.addLogOffset(entryWithBucket.getKey(), ID.getAndIncrement()); + } } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java index 21d0a6381cad..7224d1978ea0 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java @@ -208,6 +208,67 @@ public void testOverwritePartialCommit() throws Exception { assertThat(actual).isEqualTo(expected); } + @Test + public void testSnapshotAddLogOffset() throws Exception { + Map> data1 = + generateData(ThreadLocalRandom.current().nextInt(1000) + 1); + logData( + () -> + data1.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()), + "data1"); + + TestFileStore store = createStore(false); + List commitSnapshots = + store.commitData( + data1.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()), + gen::getPartition, + kv -> Math.toIntExact(kv.sequenceNumber() % 10)); + + Map commitLogOffsets = commitSnapshots.get(0).getLogOffsets(); + assertThat(commitLogOffsets.size()).isEqualTo(10); + commitLogOffsets.forEach((key, value) -> assertThat(key).isEqualTo(value.intValue())); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + String dtToOverwrite = + new ArrayList<>(data1.keySet()) + .get(random.nextInt(data1.size())) + .getString(0) + .toString(); + Map partitionToOverwrite = new HashMap<>(); + partitionToOverwrite.put("dt", dtToOverwrite); + + // overwrite partial commit + int numRecords = ThreadLocalRandom.current().nextInt(5) + 1; + Map> data2 = generateData(numRecords); + data2.entrySet().removeIf(e -> !dtToOverwrite.equals(e.getKey().getString(0).toString())); + logData( + () -> + data2.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()), + "data2"); + List overwriteSnapshots = + store.overwriteData( + data2.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()), + gen::getPartition, + kv -> Math.toIntExact(kv.sequenceNumber() % 10), + partitionToOverwrite); + + Map overwriteLogOffsets = overwriteSnapshots.get(0).getLogOffsets(); + assertThat(overwriteLogOffsets.size()).isEqualTo(commitLogOffsets.size()); + assertThat( + overwriteLogOffsets.entrySet().stream() + .filter(o -> !o.getKey().equals(o.getValue().intValue())) + .count()) + .isLessThanOrEqualTo(numRecords); + } + private TestFileStore createStore(boolean failing) { String root = failing