Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/** This file is the entrance to all data committed at some specific time point. */
public class Snapshot {
Expand All @@ -45,6 +46,7 @@ public class Snapshot {
private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
private static final String FIELD_COMMIT_KIND = "commitKind";
private static final String FIELD_TIME_MILLIS = "timeMillis";
private static final String FIELD_LOG_OFFSETS = "logOffsets";

@JsonProperty(FIELD_ID)
private final long id;
Expand All @@ -71,6 +73,9 @@ public class Snapshot {
@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;

@JsonProperty(FIELD_LOG_OFFSETS)
private final Map<Integer, Long> logOffsets;

@JsonCreator
public Snapshot(
@JsonProperty(FIELD_ID) long id,
Expand All @@ -79,14 +84,16 @@ public Snapshot(
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets) {
this.id = id;
this.baseManifestList = baseManifestList;
this.deltaManifestList = deltaManifestList;
this.commitUser = commitUser;
this.commitIdentifier = commitIdentifier;
this.commitKind = commitKind;
this.timeMillis = timeMillis;
this.logOffsets = logOffsets;
}

@JsonGetter(FIELD_ID)
Expand Down Expand Up @@ -124,6 +131,11 @@ public long timeMillis() {
return timeMillis;
}

@JsonGetter(FIELD_LOG_OFFSETS)
public Map<Integer, Long> getLogOffsets() {
return logOffsets;
}

public String toJson() {
try {
return new ObjectMapper().writeValueAsString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,23 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
}

List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
tryCommit(appendChanges, committable.identifier(), Snapshot.CommitKind.APPEND, false);
tryCommit(
appendChanges,
committable.identifier(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
false);

List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
if (!compactChanges.isEmpty()) {
tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true);
tryCommit(
compactChanges,
committable.identifier(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
true);
}
}

Expand Down Expand Up @@ -190,31 +200,42 @@ public void overwrite(
}
}
// overwrite new files
tryOverwrite(partitionFilter, appendChanges, committable.identifier());
tryOverwrite(
partitionFilter, appendChanges, committable.identifier(), committable.logOffsets());

List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
if (!compactChanges.isEmpty()) {
tryCommit(compactChanges, committable.identifier(), Snapshot.CommitKind.COMPACT, true);
tryCommit(
compactChanges,
committable.identifier(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
true);
}
}

private void tryCommit(
List<ManifestEntry> changes,
String hash,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
boolean checkDeletedFiles) {
while (true) {
Long latestSnapshotId = pathFactory.latestSnapshotId();
if (tryCommitOnce(changes, hash, commitKind, latestSnapshotId, checkDeletedFiles)) {
if (tryCommitOnce(
changes, hash, logOffsets, commitKind, latestSnapshotId, checkDeletedFiles)) {
break;
}
}
}

private void tryOverwrite(
Predicate partitionFilter, List<ManifestEntry> changes, String identifier) {
Predicate partitionFilter,
List<ManifestEntry> changes,
String identifier,
Map<Integer, Long> logOffsets) {
while (true) {
Long latestSnapshotId = pathFactory.latestSnapshotId();

Expand All @@ -240,6 +261,7 @@ private void tryOverwrite(
if (tryCommitOnce(
changesWithOverwrite,
identifier,
logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshotId,
false)) {
Expand Down Expand Up @@ -274,6 +296,7 @@ private List<ManifestEntry> collectChanges(
private boolean tryCommitOnce(
List<ManifestEntry> changes,
String identifier,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long latestSnapshotId,
boolean checkDeletedFiles) {
Expand Down Expand Up @@ -306,6 +329,9 @@ private boolean tryCommitOnce(
if (latestSnapshot != null) {
// read all previous manifest files
oldMetas.addAll(latestSnapshot.readAllManifests(manifestList));
// read the last snapshot to complete the bucket's offsets when logOffsets does not
// contain all buckets
latestSnapshot.getLogOffsets().forEach(logOffsets::putIfAbsent);
}
// merge manifest files with changes
newMetas.addAll(
Expand All @@ -330,7 +356,8 @@ private boolean tryCommitOnce(
commitUser,
identifier,
commitKind,
System.currentTimeMillis());
System.currentTimeMillis(),
logOffsets);
FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -75,6 +76,8 @@ public class TestFileStore extends FileStoreImpl {
private final RowDataSerializer keySerializer;
private final RowDataSerializer valueSerializer;

private static final AtomicInteger ID = new AtomicInteger();

public static TestFileStore create(
String format,
String root,
Expand Down Expand Up @@ -188,6 +191,9 @@ private List<Snapshot> commitDataImpl(
Increment increment = entryWithBucket.getValue().prepareCommit();
committable.addFileCommittable(
entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
if (!committable.logOffsets().containsKey(entryWithBucket.getKey())) {
committable.addLogOffset(entryWithBucket.getKey(), ID.getAndIncrement());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,67 @@ public void testOverwritePartialCommit() throws Exception {
assertThat(actual).isEqualTo(expected);
}

@Test
public void testSnapshotAddLogOffset() throws Exception {
Map<BinaryRowData, List<KeyValue>> data1 =
generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
logData(
() ->
data1.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()),
"data1");

TestFileStore store = createStore(false);
List<Snapshot> commitSnapshots =
store.commitData(
data1.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()),
gen::getPartition,
kv -> Math.toIntExact(kv.sequenceNumber() % 10));

Map<Integer, Long> commitLogOffsets = commitSnapshots.get(0).getLogOffsets();
assertThat(commitLogOffsets.size()).isEqualTo(10);
commitLogOffsets.forEach((key, value) -> assertThat(key).isEqualTo(value.intValue()));

ThreadLocalRandom random = ThreadLocalRandom.current();
String dtToOverwrite =
new ArrayList<>(data1.keySet())
.get(random.nextInt(data1.size()))
.getString(0)
.toString();
Map<String, String> partitionToOverwrite = new HashMap<>();
partitionToOverwrite.put("dt", dtToOverwrite);

// overwrite partial commit
int numRecords = ThreadLocalRandom.current().nextInt(5) + 1;
Map<BinaryRowData, List<KeyValue>> data2 = generateData(numRecords);
data2.entrySet().removeIf(e -> !dtToOverwrite.equals(e.getKey().getString(0).toString()));
logData(
() ->
data2.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()),
"data2");
List<Snapshot> overwriteSnapshots =
store.overwriteData(
data2.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()),
gen::getPartition,
kv -> Math.toIntExact(kv.sequenceNumber() % 10),
partitionToOverwrite);

Map<Integer, Long> overwriteLogOffsets = overwriteSnapshots.get(0).getLogOffsets();
assertThat(overwriteLogOffsets.size()).isEqualTo(commitLogOffsets.size());
assertThat(
overwriteLogOffsets.entrySet().stream()
.filter(o -> !o.getKey().equals(o.getValue().intValue()))
.count())
.isLessThanOrEqualTo(numRecords);
}

private TestFileStore createStore(boolean failing) {
String root =
failing
Expand Down