diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java index 2fa5c48c5d15..3249c35e06d2 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java @@ -80,7 +80,12 @@ public FileStorePathFactory pathFactory() { private ManifestFile.Factory manifestFileFactory() { return new ManifestFile.Factory( - partitionType, keyType, valueType, options.manifestFormat(), pathFactory()); + partitionType, + keyType, + valueType, + options.manifestFormat(), + pathFactory(), + options.manifestTargetSize().getBytes()); } @VisibleForTesting diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java index 77b3ba79452d..6382817923ad 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java @@ -18,21 +18,26 @@ package org.apache.flink.table.store.file.manifest; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.file.FileFormat; import org.apache.flink.table.store.file.stats.FieldStatsCollector; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.file.utils.RollingFile; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -41,23 +46,33 @@ */ public class ManifestFile { + private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); + private final RowType partitionType; private final ManifestEntrySerializer serializer; private final BulkFormat readerFactory; private final BulkWriter.Factory writerFactory; private final FileStorePathFactory pathFactory; + private final long suggestedFileSize; private ManifestFile( RowType partitionType, ManifestEntrySerializer serializer, BulkFormat readerFactory, BulkWriter.Factory writerFactory, - FileStorePathFactory pathFactory) { + FileStorePathFactory pathFactory, + long suggestedFileSize) { this.partitionType = partitionType; this.serializer = serializer; this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.pathFactory = pathFactory; + this.suggestedFileSize = suggestedFileSize; + } + + @VisibleForTesting + public long suggestedFileSize() { + return suggestedFileSize; } public List read(String fileName) { @@ -70,34 +85,56 @@ public List read(String fileName) { } /** - * Write several {@link ManifestEntry}s into a manifest file. + * Write several {@link ManifestEntry}s into manifest files. * *

NOTE: This method is atomic. */ - public ManifestFileMeta write(List entries) { + public List write(List entries) { Preconditions.checkArgument( entries.size() > 0, "Manifest entries to write must not be empty."); - Path path = pathFactory.newManifestFile(); + ManifestRollingFile rollingFile = new ManifestRollingFile(); + List result = new ArrayList<>(); + List filesToCleanUp = new ArrayList<>(); try { - return write(entries, path); + rollingFile.write(entries.iterator(), result, filesToCleanUp); } catch (Throwable e) { - FileUtils.deleteOrWarn(path); - throw new RuntimeException( - "Exception occurs when writing manifest file " + path + ". Clean up.", e); + LOG.warn("Exception occurs when writing manifest files. Cleaning up.", e); + for (Path path : filesToCleanUp) { + FileUtils.deleteOrWarn(path); + } + throw new RuntimeException(e); } + return result; + } + + public void delete(String fileName) { + FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName)); } - private ManifestFileMeta write(List entries, Path path) throws IOException { - FSDataOutputStream out = - path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE); - BulkWriter writer = writerFactory.create(out); - long numAddedFiles = 0; - long numDeletedFiles = 0; - FieldStatsCollector statsCollector = new FieldStatsCollector(partitionType); + private class ManifestRollingFile extends RollingFile { + + private long numAddedFiles; + private long numDeletedFiles; + private FieldStatsCollector statsCollector; + + private ManifestRollingFile() { + super(suggestedFileSize); + resetMeta(); + } - for (ManifestEntry entry : entries) { - writer.addElement(serializer.toRow(entry)); + @Override + protected Path newPath() { + return pathFactory.newManifestFile(); + } + + @Override + protected BulkWriter newWriter(FSDataOutputStream out) throws IOException { + return writerFactory.create(out); + } + + @Override + protected RowData toRowData(ManifestEntry entry) { switch (entry.kind()) { case ADD: numAddedFiles++; @@ -107,20 +144,28 @@ private ManifestFileMeta write(List entries, Path path) throws IO break; } statsCollector.collect(entry.partition()); + + return serializer.toRow(entry); } - writer.finish(); - out.close(); - - return new ManifestFileMeta( - path.getName(), - path.getFileSystem().getFileStatus(path).getLen(), - numAddedFiles, - numDeletedFiles, - statsCollector.extract()); - } - public void delete(String fileName) { - FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName)); + @Override + protected ManifestFileMeta collectFile(Path path) throws IOException { + ManifestFileMeta result = + new ManifestFileMeta( + path.getName(), + path.getFileSystem().getFileStatus(path).getLen(), + numAddedFiles, + numDeletedFiles, + statsCollector.extract()); + resetMeta(); + return result; + } + + private void resetMeta() { + numAddedFiles = 0; + numDeletedFiles = 0; + statsCollector = new FieldStatsCollector(partitionType); + } } /** @@ -135,13 +180,15 @@ public static class Factory { private final BulkFormat readerFactory; private final BulkWriter.Factory writerFactory; private final FileStorePathFactory pathFactory; + private final long suggestedFileSize; public Factory( RowType partitionType, RowType keyType, RowType rowType, FileFormat fileFormat, - FileStorePathFactory pathFactory) { + FileStorePathFactory pathFactory, + long suggestedFileSize) { this.partitionType = partitionType; this.keyType = keyType; this.rowType = rowType; @@ -149,6 +196,7 @@ public Factory( this.readerFactory = fileFormat.createReaderFactory(entryType); this.writerFactory = fileFormat.createWriterFactory(entryType); this.pathFactory = pathFactory; + this.suggestedFileSize = suggestedFileSize; } public ManifestFile create() { @@ -157,7 +205,8 @@ public ManifestFile create() { new ManifestEntrySerializer(partitionType, keyType, rowType), readerFactory, writerFactory, - pathFactory); + pathFactory, + suggestedFileSize); } } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java index be1d62932840..4ee1c6ded42d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java @@ -27,12 +27,10 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; /** Metadata of a manifest file. */ public class ManifestFileMeta { @@ -125,56 +123,40 @@ public String toString() { } /** - * Merge several {@link ManifestFileMeta}s with several {@link ManifestEntry}s. {@link - * ManifestEntry}s representing first adding and then deleting the same sst file will cancel - * each other. + * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and + * then deleting the same sst file will cancel each other. * *

NOTE: This method is atomic. */ public static List merge( List metas, - List entries, ManifestFile manifestFile, long suggestedMetaSize, int suggestedMinMetaCount) { List result = new ArrayList<>(); // these are the newly created manifest files, clean them up if exception occurs List newMetas = new ArrayList<>(); - List candidate = new ArrayList<>(); + List candidates = new ArrayList<>(); long totalSize = 0; - int metaCount = 0; try { // merge existing manifests first for (ManifestFileMeta manifest : metas) { totalSize += manifest.fileSize; - metaCount += 1; - candidate.add(manifest); - if (totalSize >= suggestedMetaSize || metaCount >= suggestedMinMetaCount) { + candidates.add(manifest); + if (totalSize >= suggestedMetaSize) { // reach suggested file size, perform merging and produce new file - if (candidate.size() == 1) { - result.add(candidate.get(0)); - } else { - mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile) - .ifPresent( - merged -> { - newMetas.add(merged); - result.add(merged); - }); - } - - candidate.clear(); + mergeCandidates(candidates, manifestFile, result, newMetas); + candidates.clear(); totalSize = 0; - metaCount = 0; } } - // both size and count conditions not satisfied, create new file from entries - result.addAll(candidate); - if (entries.size() > 0) { - ManifestFileMeta newManifestFileMeta = manifestFile.write(entries); - newMetas.add(newManifestFileMeta); - result.add(newManifestFileMeta); + // merge the last bit of manifests if there are too many + if (candidates.size() >= suggestedMinMetaCount) { + mergeCandidates(candidates, manifestFile, result, newMetas); + } else { + result.addAll(candidates); } } catch (Throwable e) { // exception occurs, clean up and rethrow @@ -187,16 +169,25 @@ public static List merge( return result; } - private static Optional mergeIntoOneFile( - List metas, List entries, ManifestFile manifestFile) { + private static void mergeCandidates( + List candidates, + ManifestFile manifestFile, + List result, + List newMetas) { + if (candidates.size() == 1) { + result.add(candidates.get(0)); + return; + } + Map map = new LinkedHashMap<>(); - for (ManifestFileMeta manifest : metas) { + for (ManifestFileMeta manifest : candidates) { mergeEntries(manifestFile.read(manifest.fileName), map); } - mergeEntries(entries, map); - return map.isEmpty() - ? Optional.empty() - : Optional.of(manifestFile.write(new ArrayList<>(map.values()))); + if (!map.isEmpty()) { + List merged = manifestFile.write(new ArrayList<>(map.values())); + result.addAll(merged); + newMetas.addAll(merged); + } } private static void mergeEntries( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java index 91e7ae3610a7..5c18aa6a7a61 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -32,6 +31,7 @@ import org.apache.flink.table.store.file.stats.FieldStats; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.file.utils.RollingFile; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CloseableIterator; @@ -91,40 +91,20 @@ public SstPathFactory pathFactory() { */ public List write(CloseableIterator iterator, int level) throws Exception { + SstRollingFile rollingFile = new StatsCollectingRollingFile(level); List result = new ArrayList<>(); - - RollingFile rollingFile = null; - Path currentPath = null; + List filesToCleanUp = new ArrayList<>(); try { - while (iterator.hasNext()) { - if (rollingFile == null) { - currentPath = pathFactory.newPath(); - rollingFile = new RollingFile(currentPath, suggestedFileSize); - } - rollingFile.write(iterator.next()); - if (rollingFile.exceedsSuggestedFileSize()) { - result.add(rollingFile.finish(level)); - rollingFile = null; - } - } - // finish last file - if (rollingFile != null) { - result.add(rollingFile.finish(level)); - } - iterator.close(); + rollingFile.write(iterator, result, filesToCleanUp); } catch (Throwable e) { LOG.warn("Exception occurs when writing sst files. Cleaning up.", e); - // clean up finished files - for (SstFileMeta meta : result) { - FileUtils.deleteOrWarn(pathFactory.toPath(meta.fileName())); - } - // clean up in-progress file - if (currentPath != null) { - FileUtils.deleteOrWarn(currentPath); + for (Path path : filesToCleanUp) { + FileUtils.deleteOrWarn(path); } throw e; + } finally { + iterator.close(); } - return result; } @@ -132,12 +112,9 @@ public void delete(SstFileMeta file) { FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName())); } - private class RollingFile { - private final Path path; - private final long suggestedFileSize; + private abstract class SstRollingFile extends RollingFile { - private final FSDataOutputStream out; - private final BulkWriter writer; + private final int level; private final KeyValueSerializer serializer; private final RowDataSerializer keySerializer; @@ -147,37 +124,30 @@ private class RollingFile { private long minSequenceNumber; private long maxSequenceNumber; - private RollingFile(Path path, long suggestedFileSize) throws IOException { - this.path = path; - this.suggestedFileSize = suggestedFileSize; - - this.out = - this.path.getFileSystem().create(this.path, FileSystem.WriteMode.NO_OVERWRITE); - this.writer = writerFactory.create(out); + private SstRollingFile(int level) { + super(suggestedFileSize); + this.level = level; this.serializer = new KeyValueSerializer(keyType, valueType); this.keySerializer = new RowDataSerializer(keyType); + resetMeta(); + } - this.rowCount = 0; - this.minKey = null; - this.maxKey = null; - this.minSequenceNumber = Long.MAX_VALUE; - this.maxSequenceNumber = Long.MIN_VALUE; + @Override + protected Path newPath() { + return pathFactory.newPath(); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new sst file " + path); - } + @Override + protected BulkWriter newWriter(FSDataOutputStream out) throws IOException { + return writerFactory.create(out); } - private void write(KeyValue kv) throws IOException { + @Override + protected RowData toRowData(KeyValue kv) { if (LOG.isDebugEnabled()) { - LOG.debug( - "Writing key-value to sst file " - + path - + ", kv: " - + kv.toString(keyType, valueType)); + LOG.debug("Writing key-value to sst file, kv: " + kv.toString(keyType, valueType)); } - writer.addElement(serializer.toRow(kv)); rowCount++; if (minKey == null) { minKey = keySerializer.toBinaryRow(kv.key()).copy(); @@ -185,17 +155,46 @@ private void write(KeyValue kv) throws IOException { maxKey = kv.key(); minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber()); maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber()); + + return serializer.toRow(kv); } - private boolean exceedsSuggestedFileSize() throws IOException { - // NOTE: this method is inaccurate for formats buffering changes in memory - return out.getPos() >= suggestedFileSize; + @Override + protected SstFileMeta collectFile(Path path) throws IOException { + SstFileMeta result = + new SstFileMeta( + path.getName(), + FileUtils.getFileSize(path), + rowCount, + minKey, + keySerializer.toBinaryRow(maxKey).copy(), + collectStats(path), + minSequenceNumber, + maxSequenceNumber, + level); + resetMeta(); + return result; } - private SstFileMeta finish(int level) throws IOException { - writer.finish(); - out.close(); + private void resetMeta() { + rowCount = 0; + minKey = null; + maxKey = null; + minSequenceNumber = Long.MAX_VALUE; + maxSequenceNumber = Long.MIN_VALUE; + } + protected abstract FieldStats[] collectStats(Path path); + } + + private class StatsCollectingRollingFile extends SstRollingFile { + + private StatsCollectingRollingFile(int level) { + super(level); + } + + @Override + protected FieldStats[] collectStats(Path path) { // TODO // 1. Read statistics directly from the written orc/parquet files. // 2. For other file formats use StatsCollector. Make sure fields are not reused @@ -204,17 +203,7 @@ private SstFileMeta finish(int level) throws IOException { for (int i = 0; i < stats.length; i++) { stats[i] = new FieldStats(null, null, 0); } - - return new SstFileMeta( - path.getName(), - FileUtils.getFileSize(path), - rowCount, - minKey, - keySerializer.toBinaryRow(maxKey).copy(), - stats, - minSequenceNumber, - maxSequenceNumber, - level); + return stats; } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java index d8f82118afa4..1bfdf1bae0f5 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java @@ -313,10 +313,13 @@ private boolean tryCommitOnce( newMetas.addAll( ManifestFileMeta.merge( oldMetas, - changes, manifestFile, manifestTargetSize.getBytes(), manifestMergeMinCount)); + // write new changes into manifest files + if (!changes.isEmpty()) { + newMetas.addAll(manifestFile.write(changes)); + } // prepare snapshot file manifestListName = manifestList.write(newMetas); newSnapshot = diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java new file mode 100644 index 000000000000..aeb0d90e52f3 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +/** + * A utility class to write a list of objects into several files, each with a size limit. + * + * @param record type + * @param file meta type + */ +public abstract class RollingFile { + + private static final Logger LOG = LoggerFactory.getLogger(RollingFile.class); + + private final long suggestedFileSize; + + public RollingFile(long suggestedFileSize) { + this.suggestedFileSize = suggestedFileSize; + } + + /** Create the path for a new file. */ + protected abstract Path newPath(); + + /** Create a new object writer. Called per file. */ + protected abstract BulkWriter newWriter(FSDataOutputStream out) throws IOException; + + /** + * Called before writing a record into file. Per-record calculation can be performed here. + * + * @param record record to write + * @return serialized record + */ + protected abstract RowData toRowData(R record); + + /** Called before closing the current file. Per-file calculation can be performed here. */ + protected abstract F collectFile(Path path) throws IOException; + + public void write(Iterator iterator, List result, List filesToCleanUp) + throws IOException { + Writer writer = null; + Path currentPath = null; + + while (iterator.hasNext()) { + if (writer == null) { + // create new rolling file + currentPath = newPath(); + filesToCleanUp.add(currentPath); + writer = new Writer(currentPath); + } + + RowData serialized = toRowData(iterator.next()); + writer.write(serialized); + + if (writer.exceedsSuggestedFileSize()) { + // exceeds suggested size, close current file + writer.finish(); + result.add(collectFile(currentPath)); + writer = null; + } + } + + // finish last file + if (writer != null) { + writer.finish(); + result.add(collectFile(currentPath)); + } + } + + private class Writer { + private final FSDataOutputStream out; + private final BulkWriter writer; + + private Writer(Path path) throws IOException { + this.out = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE); + this.writer = newWriter(out); + + if (LOG.isDebugEnabled()) { + LOG.debug("Create new rolling file " + path.toString()); + } + } + + private void write(RowData record) throws IOException { + writer.addElement(record); + } + + private boolean exceedsSuggestedFileSize() throws IOException { + // NOTE: this method is inaccurate for formats buffering changes in memory + return out.getPos() >= suggestedFileSize; + } + + private void finish() throws IOException { + writer.finish(); + out.close(); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java index 8bf995f95e13..9296a3186b4e 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java @@ -35,15 +35,18 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; @@ -70,30 +73,28 @@ public void beforeEach() { manifestFile = createManifestFile(tempDir.toString()); } - @Test - public void testMerge() { + @ParameterizedTest + @ValueSource(ints = {2, 3, 4}) + public void testMerge(int numLastBits) { List input = new ArrayList<>(); - List entries = new ArrayList<>(); List expected = new ArrayList<>(); - createData(input, entries, expected); + createData(numLastBits, input, expected); - List actual = - ManifestFileMeta.merge(input, entries, manifestFile, 500, 3); + List actual = ManifestFileMeta.merge(input, manifestFile, 500, 3); assertThat(actual).hasSameSizeAs(expected); - // these three manifest files are merged from the input + // these two manifest files are merged from the input assertSameContent(expected.get(0), actual.get(0), manifestFile); assertSameContent(expected.get(1), actual.get(1), manifestFile); - assertSameContent(expected.get(4), actual.get(4), manifestFile); - // these four manifest files should be kept without modification + // these two manifest files should be kept without modification assertThat(actual.get(2)).isEqualTo(input.get(5)); assertThat(actual.get(3)).isEqualTo(input.get(6)); - assertThat(actual.get(5)).isEqualTo(input.get(10)); - assertThat(actual.get(6)).isEqualTo(input.get(11)); - // this manifest file should be created from entries - assertSameContent(expected.get(7), actual.get(7), manifestFile); + // check last bits + for (int i = 4; i < actual.size(); i++) { + assertSameContent(expected.get(i), actual.get(i), manifestFile); + } } private void assertSameContent( @@ -112,14 +113,13 @@ private void assertSameContent( public void testCleanUpForException() throws IOException { FailingAtomicRenameFileSystem.get().reset(1, 10); List input = new ArrayList<>(); - List entries = new ArrayList<>(); - createData(input, entries, null); + createData(ThreadLocalRandom.current().nextInt(5), input, null); ManifestFile failingManifestFile = createManifestFile( FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())); try { - ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 30); + ManifestFileMeta.merge(input, failingManifestFile, 500, 3); } catch (Throwable e) { assertThat(e) .hasRootCauseExactlyInstanceOf( @@ -146,24 +146,20 @@ private ManifestFile createManifestFile(String path) { KEY_TYPE, ROW_TYPE, avro, - new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default")) + new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"), + Long.MAX_VALUE) .create(); } private void createData( - List input, - List entries, - List expected) { + int numLastBits, List input, List expected) { // suggested size 500 and suggested count 3 // file sizes: // 200, 300, -- multiple files exactly the suggested size // 100, 200, 300, -- multiple files exceeding the suggested size // 500, -- single file exactly the suggested size // 600, -- single file exceeding the suggested size - // 100, 100, 100, -- multiple files exceeding the suggested count - // 100, -- the last bit, not enough size or count, won't merge - // 300, -- the last bit, not enough size or count, won't merge - // 200, -- file created from entries + // 100 * numLastBits -- the last bit input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B"))); input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"), makeEntry(true, "D"))); @@ -189,15 +185,9 @@ private void createData( makeEntry(false, "K"), makeEntry(true, "L"))); - input.add(makeManifest(makeEntry(true, "M"))); - input.add(makeManifest(makeEntry(true, "N"))); - input.add(makeManifest(makeEntry(true, "O"))); - - input.add(makeManifest(makeEntry(true, "P"))); - input.add(makeManifest(makeEntry(false, "Q"), makeEntry(true, "R"), makeEntry(true, "S"))); - - entries.add(makeEntry(false, "S")); - entries.add(makeEntry(true, "T")); + for (int i = 0; i < numLastBits; i++) { + input.add(makeManifest(makeEntry(true, String.valueOf(i)))); + } if (expected == null) { return; @@ -208,15 +198,22 @@ private void createData( expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true, "F"))); expected.add(input.get(5)); expected.add(input.get(6)); - expected.add( - makeManifest(makeEntry(true, "M"), makeEntry(true, "N"), makeEntry(true, "O"))); - expected.add(input.get(10)); - expected.add(input.get(11)); - expected.add(makeManifest(makeEntry(false, "S"), makeEntry(true, "T"))); + + if (numLastBits < 3) { + for (int i = 0; i < numLastBits; i++) { + expected.add(input.get(7 + i)); + } + } else { + expected.add( + makeManifest( + IntStream.range(0, numLastBits) + .mapToObj(i -> makeEntry(true, String.valueOf(i))) + .toArray(ManifestEntry[]::new))); + } } private ManifestFileMeta makeManifest(ManifestEntry... entries) { - ManifestFileMeta writtenMeta = manifestFile.write(Arrays.asList(entries)); + ManifestFileMeta writtenMeta = manifestFile.write(Arrays.asList(entries)).get(0); return new ManifestFileMeta( writtenMeta.fileName(), entries.length * 100, // for testing purpose diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java index aa65636a8601..f1c8c78dd101 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java @@ -23,6 +23,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.table.store.file.FileFormat; import org.apache.flink.table.store.file.TestKeyValueGenerator; +import org.apache.flink.table.store.file.stats.FieldStats; +import org.apache.flink.table.store.file.stats.StatsTestUtils; import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem; import org.apache.flink.table.store.file.utils.FileStorePathFactory; @@ -32,6 +34,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -51,10 +55,12 @@ public void testWriteAndReadManifestFile() { ManifestFileMeta meta = gen.createManifestFileMeta(entries); ManifestFile manifestFile = createManifestFile(tempDir.toString()); - ManifestFileMeta actualMeta = manifestFile.write(entries); - // we do not check file name and size as we can't know in advance - checkMetaIgnoringFileNameAndSize(meta, actualMeta); - List actualEntries = manifestFile.read(actualMeta.fileName()); + List actualMetas = manifestFile.write(entries); + checkRollingFiles(meta, actualMetas, manifestFile.suggestedFileSize()); + List actualEntries = + actualMetas.stream() + .flatMap(m -> manifestFile.read(m.fileName()).stream()) + .collect(Collectors.toList()); assertThat(actualEntries).isEqualTo(entries); } @@ -90,19 +96,39 @@ private ManifestFile createManifestFile(String path) { FileStorePathFactory pathFactory = new FileStorePathFactory( new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default"); + int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; return new ManifestFile.Factory( TestKeyValueGenerator.PARTITION_TYPE, TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE, avro, - pathFactory) + pathFactory, + suggestedFileSize) .create(); } - private void checkMetaIgnoringFileNameAndSize( - ManifestFileMeta expected, ManifestFileMeta actual) { - assertThat(actual.numAddedFiles()).isEqualTo(expected.numAddedFiles()); - assertThat(actual.numDeletedFiles()).isEqualTo(expected.numDeletedFiles()); - assertThat(actual.partitionStats()).isEqualTo(expected.partitionStats()); + private void checkRollingFiles( + ManifestFileMeta expected, List actual, long suggestedFileSize) { + // all but last file should be no smaller than suggestedFileSize + for (int i = 0; i + 1 < actual.size(); i++) { + assertThat(actual.get(i).fileSize() >= suggestedFileSize).isTrue(); + } + + // expected.numAddedFiles == sum(numAddedFiles) + assertThat(actual.stream().mapToLong(ManifestFileMeta::numAddedFiles).sum()) + .isEqualTo(expected.numAddedFiles()); + + // expected.numDeletedFiles == sum(numDeletedFiles) + assertThat(actual.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum()) + .isEqualTo(expected.numDeletedFiles()); + + // check stats + for (int i = 0; i < expected.partitionStats().length; i++) { + List actualStats = new ArrayList<>(); + for (ManifestFileMeta meta : actual) { + actualStats.add(meta.partitionStats()[i]); + } + StatsTestUtils.checkRollingFileStats(expected.partitionStats()[i], actualStats); + } } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java index e5bde46ddcc6..00f44e7e3961 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java @@ -28,7 +28,6 @@ import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.table.store.file.KeyValueSerializerTest; import org.apache.flink.table.store.file.TestKeyValueGenerator; -import org.apache.flink.table.store.file.stats.FieldStats; import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FlushingAvroFormat; @@ -297,35 +296,4 @@ private void checkRollingFiles( assertThat(meta.level()).isEqualTo(expected.level()); } } - - @SuppressWarnings("unchecked") - private void checkRollingFileStats(FieldStats expected, List actual) { - if (expected.minValue() instanceof Comparable) { - Object actualMin = null; - Object actualMax = null; - for (FieldStats stats : actual) { - if (stats.minValue() != null - && (actualMin == null - || ((Comparable) stats.minValue()).compareTo(actualMin) - < 0)) { - actualMin = stats.minValue(); - } - if (stats.maxValue() != null - && (actualMax == null - || ((Comparable) stats.maxValue()).compareTo(actualMax) - > 0)) { - actualMax = stats.maxValue(); - } - } - assertThat(actualMin).isEqualTo(expected.minValue()); - assertThat(actualMax).isEqualTo(expected.maxValue()); - } else { - for (FieldStats stats : actual) { - assertThat(stats.minValue()).isNull(); - assertThat(stats.maxValue()).isNull(); - } - } - assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum()) - .isEqualTo(expected.nullCount()); - } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java new file mode 100644 index 000000000000..379b60067795 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.stats; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Utils for stats related tests. */ +public class StatsTestUtils { + + @SuppressWarnings("unchecked") + public static void checkRollingFileStats(FieldStats expected, List actual) { + if (expected.minValue() instanceof Comparable) { + Object actualMin = null; + Object actualMax = null; + for (FieldStats stats : actual) { + if (stats.minValue() != null + && (actualMin == null + || ((Comparable) stats.minValue()).compareTo(actualMin) + < 0)) { + actualMin = stats.minValue(); + } + if (stats.maxValue() != null + && (actualMax == null + || ((Comparable) stats.maxValue()).compareTo(actualMax) + > 0)) { + actualMax = stats.maxValue(); + } + } + assertThat(actualMin).isEqualTo(expected.minValue()); + assertThat(actualMax).isEqualTo(expected.maxValue()); + } else { + for (FieldStats stats : actual) { + assertThat(stats.minValue()).isNull(); + assertThat(stats.maxValue()).isNull(); + } + } + assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum()) + .isEqualTo(expected.nullCount()); + } +}