Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.manifest;

import org.apache.paimon.data.BinaryRow;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** A {@link SimpleFileEntry} with {@link #fileSource}. */
public class ExpireFileEntry extends SimpleFileEntry {

@Nullable private final FileSource fileSource;

public ExpireFileEntry(
FileKind kind,
BinaryRow partition,
int bucket,
int level,
String fileName,
List<String> extraFiles,
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
BinaryRow maxKey,
@Nullable FileSource fileSource) {
super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey);
this.fileSource = fileSource;
}

public Optional<FileSource> fileSource() {
return Optional.ofNullable(fileSource);
}

public static ExpireFileEntry from(ManifestEntry entry) {
return new ExpireFileEntry(
entry.kind(),
entry.partition(),
entry.bucket(),
entry.level(),
entry.fileName(),
entry.file().extraFiles(),
entry.file().embeddedIndex(),
entry.minKey(),
entry.maxKey(),
entry.file().fileSource().orElse(null));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ExpireFileEntry that = (ExpireFileEntry) o;
return fileSource == that.fileSource;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), fileSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public interface FileEntry {

BinaryRow maxKey();

List<String> extraFiles();

/**
* The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
* file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public BinaryRow maxKey() {
return file.maxKey();
}

@Override
public List<String> extraFiles() {
return file.extraFiles();
}

public int totalBuckets() {
return totalBuckets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -84,6 +85,15 @@ public long suggestedFileSize() {
return suggestedFileSize;
}

public List<ExpireFileEntry> readExpireFileEntries(String fileName, @Nullable Long fileSize) {
List<ManifestEntry> entries = read(fileName, fileSize);
List<ExpireFileEntry> result = new ArrayList<>(entries.size());
for (ManifestEntry entry : entries) {
result.add(ExpireFileEntry.from(entry));
}
return result;
}

/**
* Write several {@link ManifestEntry}s into manifest files.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public BinaryRow maxKey() {
return maxKey;
}

@Override
public List<String> extraFiles() {
return extraFiles;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
Expand Down Expand Up @@ -60,7 +60,7 @@ public ChangelogDeletion(
}

@Override
public void cleanUnusedDataFiles(Changelog changelog, Predicate<ManifestEntry> skipper) {
public void cleanUnusedDataFiles(Changelog changelog, Predicate<ExpireFileEntry> skipper) {
if (changelog.changelogManifestList() != null) {
deleteAddedDataFiles(changelog.changelogManifestList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
Expand All @@ -46,15 +47,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Base class for file deletion including methods for clean data files, manifest files and empty
Expand Down Expand Up @@ -110,7 +109,7 @@ public FileDeletionBase(
* @param skipper if the test result of a data file is true, it will be skipped when deleting;
* else it will be deleted
*/
public abstract void cleanUnusedDataFiles(T snapshot, Predicate<ManifestEntry> skipper);
public abstract void cleanUnusedDataFiles(T snapshot, Predicate<ExpireFileEntry> skipper);

/**
* Clean metadata files that will not be used anymore of a snapshot, including data manifests,
Expand Down Expand Up @@ -164,21 +163,23 @@ public void cleanEmptyDirectories() {
deletionBuckets.clear();
}

protected void recordDeletionBuckets(ManifestEntry entry) {
protected void recordDeletionBuckets(ExpireFileEntry entry) {
deletionBuckets
.computeIfAbsent(entry.partition(), p -> new HashSet<>())
.add(entry.bucket());
}

public void cleanUnusedDataFiles(String manifestList, Predicate<ManifestEntry> skipper) {
public void cleanUnusedDataFiles(String manifestList, Predicate<ExpireFileEntry> skipper) {
// try read manifests
List<String> manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList));
List<ManifestEntry> manifestEntries;
List<ManifestFileMeta> manifests = tryReadManifestList(manifestList);
List<ExpireFileEntry> manifestEntries;
// data file path -> (original manifest entry, extra file paths)
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new HashMap<>();
for (String manifest : manifestFileNames) {
Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete = new HashMap<>();
for (ManifestFileMeta manifest : manifests) {
try {
manifestEntries = manifestFile.read(manifest);
manifestEntries =
manifestFile.readExpireFileEntries(
manifest.fileName(), manifest.fileSize());
} catch (Exception e) {
// cancel deletion if any exception occurs
LOG.warn("Failed to read some manifest files. Cancel deletion.", e);
Expand All @@ -192,12 +193,12 @@ public void cleanUnusedDataFiles(String manifestList, Predicate<ManifestEntry> s
}

protected void doCleanUnusedDataFile(
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
Predicate<ManifestEntry> skipper) {
Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
Predicate<ExpireFileEntry> skipper) {
List<Path> actualDataFileToDelete = new ArrayList<>();
dataFileToDelete.forEach(
(path, pair) -> {
ManifestEntry entry = pair.getLeft();
ExpireFileEntry entry = pair.getLeft();
// check whether we should skip the data file
if (!skipper.test(entry)) {
// delete data files
Expand All @@ -211,20 +212,20 @@ protected void doCleanUnusedDataFile(
}

protected void getDataFileToDelete(
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
List<ManifestEntry> dataFileEntries) {
Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
List<ExpireFileEntry> dataFileEntries) {
// we cannot delete a data file directly when we meet a DELETE entry, because that
// file might be upgraded
for (ManifestEntry entry : dataFileEntries) {
for (ExpireFileEntry entry : dataFileEntries) {
Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket());
Path dataFilePath = new Path(bucketPath, entry.file().fileName());
Path dataFilePath = new Path(bucketPath, entry.fileName());
switch (entry.kind()) {
case ADD:
dataFileToDelete.remove(dataFilePath);
break;
case DELETE:
List<Path> extraFiles = new ArrayList<>(entry.file().extraFiles().size());
for (String file : entry.file().extraFiles()) {
List<Path> extraFiles = new ArrayList<>(entry.extraFiles().size());
for (String file : entry.extraFiles()) {
extraFiles.add(new Path(bucketPath, file));
}
dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles));
Expand All @@ -242,27 +243,28 @@ protected void getDataFileToDelete(
* @param manifestListName name of manifest list
*/
public void deleteAddedDataFiles(String manifestListName) {
List<String> manifestFileNames =
readManifestFileNames(tryReadManifestList(manifestListName));
for (String file : manifestFileNames) {
List<ManifestFileMeta> manifests = tryReadManifestList(manifestListName);
for (ManifestFileMeta manifest : manifests) {
try {
List<ManifestEntry> manifestEntries = manifestFile.read(file);
List<ExpireFileEntry> manifestEntries =
manifestFile.readExpireFileEntries(
manifest.fileName(), manifest.fileSize());
deleteAddedDataFiles(manifestEntries);
} catch (Exception e) {
// We want to delete the data file, so just ignore the unavailable files
LOG.info("Failed to read manifest " + file + ". Ignore it.", e);
LOG.info("Failed to read manifest " + manifest.fileName() + ". Ignore it.", e);
}
}
}

private void deleteAddedDataFiles(List<ManifestEntry> manifestEntries) {
private void deleteAddedDataFiles(List<ExpireFileEntry> manifestEntries) {
List<Path> dataFileToDelete = new ArrayList<>();
for (ManifestEntry entry : manifestEntries) {
for (ExpireFileEntry entry : manifestEntries) {
if (entry.kind() == FileKind.ADD) {
dataFileToDelete.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
entry.fileName()));
recordDeletionBuckets(entry);
}
}
Expand Down Expand Up @@ -327,7 +329,7 @@ protected void cleanUnusedManifests(
cleanUnusedStatisticsManifests(snapshot, skippingSet);
}

public Predicate<ManifestEntry> createDataFileSkipperForTags(
public Predicate<ExpireFileEntry> createDataFileSkipperForTags(
List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws Exception {
int index = SnapshotManager.findPreviousSnapshot(taggedSnapshots, expiringSnapshotId);
// refresh tag data files
Expand Down Expand Up @@ -358,55 +360,46 @@ protected List<ManifestFileMeta> tryReadManifestList(String manifestListName) {
}
}

protected List<String> tryReadDataManifests(Snapshot snapshot) {
List<ManifestFileMeta> manifestFileMetas = tryReadManifestList(snapshot.baseManifestList());
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
return readManifestFileNames(manifestFileMetas);
}

protected List<String> readManifestFileNames(List<ManifestFileMeta> manifestFileMetas) {
return manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toCollection(LinkedList::new));
}

/**
* NOTE: This method is used for building data file skipping set. If failed to read some
* manifests, it will throw exception which callers must handle.
*/
protected void addMergedDataFiles(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot snapshot)
throws IOException {
for (ManifestEntry entry : readMergedDataFiles(snapshot)) {
for (ExpireFileEntry entry : readMergedDataFiles(snapshot)) {
dataFiles
.computeIfAbsent(entry.partition(), p -> new HashMap<>())
.computeIfAbsent(entry.bucket(), b -> new HashSet<>())
.add(entry.file().fileName());
.add(entry.fileName());
}
}

protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot) throws IOException {
protected Collection<ExpireFileEntry> readMergedDataFiles(Snapshot snapshot)
throws IOException {
// read data manifests
List<String> files = tryReadDataManifests(snapshot);

List<ManifestFileMeta> manifests = tryReadManifestList(snapshot.baseManifestList());
manifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));

// read and merge manifest entries
Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
for (String manifest : files) {
List<ManifestEntry> entries;
entries = manifestFile.readWithIOException(manifest);
Map<Identifier, ExpireFileEntry> map = new HashMap<>();
for (ManifestFileMeta manifest : manifests) {
List<ExpireFileEntry> entries =
manifestFile.readExpireFileEntries(manifest.fileName(), manifest.fileSize());
FileEntry.mergeEntries(entries, map);
}

return map.values();
}

protected boolean containsDataFile(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry testee) {
Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ExpireFileEntry entry) {
Map<Integer, Set<String>> buckets = dataFiles.get(entry.partition());
if (buckets != null) {
Set<String> fileNames = buckets.get(testee.bucket());
Set<String> fileNames = buckets.get(entry.bucket());
if (fileNames != null) {
return fileNames.contains(testee.file().fileName());
return fileNames.contains(entry.fileName());
}
}
return false;
Expand Down
Loading
Loading