Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ public FileStorePathFactory pathFactory() {

private ManifestFile.Factory manifestFileFactory() {
return new ManifestFile.Factory(
partitionType, keyType, valueType, options.manifestFormat(), pathFactory());
partitionType,
keyType,
valueType,
options.manifestFormat(),
pathFactory(),
options.manifestTargetSize().getBytes());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@

package org.apache.flink.table.store.file.manifest;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RollingFile;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -41,23 +46,33 @@
*/
public class ManifestFile {

private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class);

private final RowType partitionType;
private final ManifestEntrySerializer serializer;
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final BulkWriter.Factory<RowData> writerFactory;
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;

private ManifestFile(
RowType partitionType,
ManifestEntrySerializer serializer,
BulkFormat<RowData, FileSourceSplit> readerFactory,
BulkWriter.Factory<RowData> writerFactory,
FileStorePathFactory pathFactory) {
FileStorePathFactory pathFactory,
long suggestedFileSize) {
this.partitionType = partitionType;
this.serializer = serializer;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
}

@VisibleForTesting
public long suggestedFileSize() {
return suggestedFileSize;
}

public List<ManifestEntry> read(String fileName) {
Expand All @@ -70,34 +85,56 @@ public List<ManifestEntry> read(String fileName) {
}

/**
* Write several {@link ManifestEntry}s into a manifest file.
* Write several {@link ManifestEntry}s into manifest files.
*
* <p>NOTE: This method is atomic.
*/
public ManifestFileMeta write(List<ManifestEntry> entries) {
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
Preconditions.checkArgument(
entries.size() > 0, "Manifest entries to write must not be empty.");

Path path = pathFactory.newManifestFile();
ManifestRollingFile rollingFile = new ManifestRollingFile();
List<ManifestFileMeta> result = new ArrayList<>();
List<Path> filesToCleanUp = new ArrayList<>();
try {
return write(entries, path);
rollingFile.write(entries.iterator(), result, filesToCleanUp);
} catch (Throwable e) {
FileUtils.deleteOrWarn(path);
throw new RuntimeException(
"Exception occurs when writing manifest file " + path + ". Clean up.", e);
LOG.warn("Exception occurs when writing manifest files. Cleaning up.", e);
for (Path path : filesToCleanUp) {
FileUtils.deleteOrWarn(path);
}
throw new RuntimeException(e);
}
return result;
}

public void delete(String fileName) {
FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
}

private ManifestFileMeta write(List<ManifestEntry> entries, Path path) throws IOException {
FSDataOutputStream out =
path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
BulkWriter<RowData> writer = writerFactory.create(out);
long numAddedFiles = 0;
long numDeletedFiles = 0;
FieldStatsCollector statsCollector = new FieldStatsCollector(partitionType);
private class ManifestRollingFile extends RollingFile<ManifestEntry, ManifestFileMeta> {

private long numAddedFiles;
private long numDeletedFiles;
private FieldStatsCollector statsCollector;

private ManifestRollingFile() {
super(suggestedFileSize);
resetMeta();
}

for (ManifestEntry entry : entries) {
writer.addElement(serializer.toRow(entry));
@Override
protected Path newPath() {
return pathFactory.newManifestFile();
}

@Override
protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
return writerFactory.create(out);
}

@Override
protected RowData toRowData(ManifestEntry entry) {
switch (entry.kind()) {
case ADD:
numAddedFiles++;
Expand All @@ -107,20 +144,28 @@ private ManifestFileMeta write(List<ManifestEntry> entries, Path path) throws IO
break;
}
statsCollector.collect(entry.partition());

return serializer.toRow(entry);
}
writer.finish();
out.close();

return new ManifestFileMeta(
path.getName(),
path.getFileSystem().getFileStatus(path).getLen(),
numAddedFiles,
numDeletedFiles,
statsCollector.extract());
}

public void delete(String fileName) {
FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
@Override
protected ManifestFileMeta collectFile(Path path) throws IOException {
ManifestFileMeta result =
new ManifestFileMeta(
path.getName(),
path.getFileSystem().getFileStatus(path).getLen(),
numAddedFiles,
numDeletedFiles,
statsCollector.extract());
resetMeta();
return result;
}

private void resetMeta() {
numAddedFiles = 0;
numDeletedFiles = 0;
statsCollector = new FieldStatsCollector(partitionType);
}
}

/**
Expand All @@ -135,20 +180,23 @@ public static class Factory {
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final BulkWriter.Factory<RowData> writerFactory;
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;

public Factory(
RowType partitionType,
RowType keyType,
RowType rowType,
FileFormat fileFormat,
FileStorePathFactory pathFactory) {
FileStorePathFactory pathFactory,
long suggestedFileSize) {
this.partitionType = partitionType;
this.keyType = keyType;
this.rowType = rowType;
RowType entryType = ManifestEntry.schema(partitionType, keyType, rowType);
this.readerFactory = fileFormat.createReaderFactory(entryType);
this.writerFactory = fileFormat.createWriterFactory(entryType);
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
}

public ManifestFile create() {
Expand All @@ -157,7 +205,8 @@ public ManifestFile create() {
new ManifestEntrySerializer(partitionType, keyType, rowType),
readerFactory,
writerFactory,
pathFactory);
pathFactory,
suggestedFileSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/** Metadata of a manifest file. */
public class ManifestFileMeta {
Expand Down Expand Up @@ -125,56 +123,40 @@ public String toString() {
}

/**
* Merge several {@link ManifestFileMeta}s with several {@link ManifestEntry}s. {@link
* ManifestEntry}s representing first adding and then deleting the same sst file will cancel
* each other.
* Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
* then deleting the same sst file will cancel each other.
*
* <p>NOTE: This method is atomic.
*/
public static List<ManifestFileMeta> merge(
List<ManifestFileMeta> metas,
List<ManifestEntry> entries,
ManifestFile manifestFile,
long suggestedMetaSize,
int suggestedMinMetaCount) {
List<ManifestFileMeta> result = new ArrayList<>();
// these are the newly created manifest files, clean them up if exception occurs
List<ManifestFileMeta> newMetas = new ArrayList<>();
List<ManifestFileMeta> candidate = new ArrayList<>();
List<ManifestFileMeta> candidates = new ArrayList<>();
long totalSize = 0;
int metaCount = 0;

try {
// merge existing manifests first
for (ManifestFileMeta manifest : metas) {
totalSize += manifest.fileSize;
metaCount += 1;
candidate.add(manifest);
if (totalSize >= suggestedMetaSize || metaCount >= suggestedMinMetaCount) {
candidates.add(manifest);
if (totalSize >= suggestedMetaSize) {
// reach suggested file size, perform merging and produce new file
if (candidate.size() == 1) {
result.add(candidate.get(0));
} else {
mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile)
.ifPresent(
merged -> {
newMetas.add(merged);
result.add(merged);
});
}

candidate.clear();
mergeCandidates(candidates, manifestFile, result, newMetas);
candidates.clear();
totalSize = 0;
metaCount = 0;
}
}

// both size and count conditions not satisfied, create new file from entries
result.addAll(candidate);
if (entries.size() > 0) {
ManifestFileMeta newManifestFileMeta = manifestFile.write(entries);
newMetas.add(newManifestFileMeta);
result.add(newManifestFileMeta);
// merge the last bit of manifests if there are too many
if (candidates.size() >= suggestedMinMetaCount) {
mergeCandidates(candidates, manifestFile, result, newMetas);
} else {
result.addAll(candidates);
}
} catch (Throwable e) {
// exception occurs, clean up and rethrow
Expand All @@ -187,16 +169,25 @@ public static List<ManifestFileMeta> merge(
return result;
}

private static Optional<ManifestFileMeta> mergeIntoOneFile(
List<ManifestFileMeta> metas, List<ManifestEntry> entries, ManifestFile manifestFile) {
private static void mergeCandidates(
List<ManifestFileMeta> candidates,
ManifestFile manifestFile,
List<ManifestFileMeta> result,
List<ManifestFileMeta> newMetas) {
if (candidates.size() == 1) {
result.add(candidates.get(0));
return;
}

Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
for (ManifestFileMeta manifest : metas) {
for (ManifestFileMeta manifest : candidates) {
mergeEntries(manifestFile.read(manifest.fileName), map);
}
mergeEntries(entries, map);
return map.isEmpty()
? Optional.empty()
: Optional.of(manifestFile.write(new ArrayList<>(map.values())));
if (!map.isEmpty()) {
List<ManifestFileMeta> merged = manifestFile.write(new ArrayList<>(map.values()));
result.addAll(merged);
newMetas.addAll(merged);
}
}

private static void mergeEntries(
Expand Down
Loading