diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index feb8715701d4..f348c35e8e95 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -26,6 +26,12 @@ + +
changelog.precommit-compact.thread-num
+ (none) + Integer + Maximum number of threads to copy bytes from small changelog files. By default is the number of processors available to the Java virtual machine. +
end-input.watermark
(none) @@ -108,7 +114,7 @@
precommit-compact
false Boolean - If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files. + If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files.
scan.bounded
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java index e4b3da8ca8c3..a4790583c5c3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java @@ -138,17 +138,7 @@ private void advanceIfNeeded() { public static void randomlyOnlyExecute( ExecutorService executor, Consumer processor, Collection input) { - List> futures = new ArrayList<>(input.size()); - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - for (U u : input) { - futures.add( - executor.submit( - () -> { - Thread.currentThread().setContextClassLoader(cl); - processor.accept(u); - })); - } - awaitAllFutures(futures); + awaitAllFutures(submitAllTasks(executor, processor, input)); } public static Iterator randomlyExecuteSequentialReturn( @@ -189,7 +179,22 @@ public Iterator next() { }); } - private static void awaitAllFutures(List> futures) { + public static List> submitAllTasks( + ExecutorService executor, Consumer processor, Collection input) { + List> futures = new ArrayList<>(input.size()); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + for (U u : input) { + futures.add( + executor.submit( + () -> { + Thread.currentThread().setContextClassLoader(cl); + processor.accept(u); + })); + } + return futures; + } + + public static void awaitAllFutures(List> futures) { for (Future future : futures) { try { future.get(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index adf7c9624c2d..a9e7f0f7d1fd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -430,7 +430,24 @@ public class FlinkConnectorOptions { + "in order to compact several changelog files (for primary key tables) " + "or newly created data files (for unaware bucket tables) " + "from the same partition into large ones, " - + "which can decrease the number of small files. "); + + "which can decrease the number of small files."); + + public static final ConfigOption CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM = + key("changelog.precommit-compact.thread-num") + .intType() + .noDefaultValue() + .withDescription( + "Maximum number of threads to copy bytes from small changelog files. " + + "By default is the number of processors available to the Java virtual machine."); + + @ExcludeFromDocumentation("Most users won't need to adjust this config") + public static final ConfigOption CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE = + key("changelog.precommit-compact.buffer-size") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(128)) + .withDescription( + "The buffer size for copying bytes from small changelog files. " + + "The default value is 128 MB."); public static final ConfigOption SOURCE_OPERATOR_UID_SUFFIX = key("source.operator-uid.suffix") diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java index 9f6bd4431dc4..34553ebe5f90 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -18,12 +18,13 @@ package org.apache.paimon.flink.compact.changelog; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -35,6 +36,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -49,13 +51,14 @@ public class ChangelogCompactCoordinateOperator extends AbstractStreamOperator> implements OneInputStreamOperator>, BoundedOneInput { - private final FileStoreTable table; + + private final CoreOptions options; private transient long checkpointId; private transient Map partitionChangelogs; - public ChangelogCompactCoordinateOperator(FileStoreTable table) { - this.table = table; + public ChangelogCompactCoordinateOperator(CoreOptions options) { + this.options = options; } @Override @@ -63,7 +66,7 @@ public void open() throws Exception { super.open(); checkpointId = Long.MIN_VALUE; - partitionChangelogs = new HashMap<>(); + partitionChangelogs = new LinkedHashMap<>(); } public void processElement(StreamRecord record) { @@ -81,10 +84,26 @@ public void processElement(StreamRecord record) { return; } + // Changelog files are not stored in an LSM tree, + // so we can regard them as files without primary keys. + long targetFileSize = options.targetFileSize(false); + long compactionFileSize = + Math.min( + options.compactionFileSize(false), + options.toConfiguration() + .get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE) + .getBytes()); + BinaryRow partition = message.partition(); Integer bucket = message.bucket(); - long targetFileSize = table.coreOptions().targetFileSize(false); + List skippedNewChangelogs = new ArrayList<>(); + List skippedCompactChangelogs = new ArrayList<>(); + for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) { + if (meta.fileSize() >= compactionFileSize) { + skippedNewChangelogs.add(meta); + continue; + } partitionChangelogs .computeIfAbsent(partition, k -> new PartitionChangelog()) .addNewChangelogFile(bucket, meta); @@ -94,6 +113,10 @@ public void processElement(StreamRecord record) { } } for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { + if (meta.fileSize() >= compactionFileSize) { + skippedCompactChangelogs.add(meta); + continue; + } partitionChangelogs .computeIfAbsent(partition, k -> new PartitionChangelog()) .addCompactChangelogFile(bucket, meta); @@ -111,11 +134,11 @@ public void processElement(StreamRecord record) { new DataIncrement( message.newFilesIncrement().newFiles(), message.newFilesIncrement().deletedFiles(), - Collections.emptyList()), + skippedNewChangelogs), new CompactIncrement( message.compactIncrement().compactBefore(), message.compactIncrement().compactAfter(), - Collections.emptyList()), + skippedCompactChangelogs), message.indexIncrement()); Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); @@ -132,15 +155,59 @@ public void endInput() { private void emitPartitionChangelogCompactTask(BinaryRow partition) { PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); - output.collect( - new StreamRecord<>( - Either.Right( - new ChangelogCompactTask( - checkpointId, - partition, - table.coreOptions().bucket(), - partitionChangelog.newFileChangelogFiles, - partitionChangelog.compactChangelogFiles)))); + int numNewChangelogFiles = + partitionChangelog.newFileChangelogFiles.values().stream() + .mapToInt(List::size) + .sum(); + int numCompactChangelogFiles = + partitionChangelog.compactChangelogFiles.values().stream() + .mapToInt(List::size) + .sum(); + if (numNewChangelogFiles + numCompactChangelogFiles == 1) { + // there is only one changelog file in this partition, so we don't wrap it as a + // compaction task + CommitMessageImpl message; + if (numNewChangelogFiles == 1) { + Map.Entry> entry = + partitionChangelog.newFileChangelogFiles.entrySet().iterator().next(); + message = + new CommitMessageImpl( + partition, + entry.getKey(), + options.bucket(), + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + entry.getValue()), + CompactIncrement.emptyIncrement()); + } else { + Map.Entry> entry = + partitionChangelog.compactChangelogFiles.entrySet().iterator().next(); + message = + new CommitMessageImpl( + partition, + entry.getKey(), + options.bucket(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + entry.getValue())); + } + Committable newCommittable = + new Committable(checkpointId, Committable.Kind.FILE, message); + output.collect(new StreamRecord<>(Either.Left(newCommittable))); + } else { + output.collect( + new StreamRecord<>( + Either.Right( + new ChangelogCompactTask( + checkpointId, + partition, + options.bucket(), + partitionChangelog.newFileChangelogFiles, + partitionChangelog.compactChangelogFiles)))); + } partitionChangelogs.remove(partition); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 96fe15c344c2..7605af61c4e0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -23,19 +23,24 @@ import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; -import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.ThreadPoolUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -43,6 +48,12 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; /** * {@link ChangelogCompactTask} to compact several changelog files from the same partition into one @@ -50,6 +61,8 @@ */ public class ChangelogCompactTask implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(ChangelogCompactTask.class); + private final long checkpointId; private final BinaryRow partition; private final int totalBuckets; @@ -67,6 +80,11 @@ public ChangelogCompactTask( this.totalBuckets = totalBuckets; this.newFileChangelogFiles = newFileChangelogFiles; this.compactChangelogFiles = compactChangelogFiles; + Preconditions.checkArgument( + newFileChangelogFiles.isEmpty() || compactChangelogFiles.isEmpty(), + "Both newFileChangelogFiles and compactChangelogFiles are not empty. " + + "There is no such table where changelog is produced both from new files and from compaction. " + + "This is unexpected."); } public long checkpointId() { @@ -89,69 +107,102 @@ public Map> compactChangelogFiles() { return compactChangelogFiles; } - public List doCompact(FileStoreTable table) throws Exception { + public List doCompact( + FileStoreTable table, ExecutorService executor, MemorySize bufferSize) + throws Exception { + Preconditions.checkArgument( + bufferSize.getBytes() <= Integer.MAX_VALUE, + "Changelog pre-commit compaction buffer size ({} bytes) too large! " + + "The maximum possible value is {} bytes.", + bufferSize.getBytes(), + Integer.MAX_VALUE); + FileStorePathFactory pathFactory = table.store().pathFactory(); + List tasks = new ArrayList<>(); + BiConsumer>, Boolean> addTasks = + (files, isCompactResult) -> { + for (Map.Entry> entry : files.entrySet()) { + int bucket = entry.getKey(); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + for (DataFileMeta meta : entry.getValue()) { + ReadTask task = + new ReadTask( + table, + dataFilePathFactory.toPath(meta), + bucket, + isCompactResult, + meta); + Preconditions.checkArgument( + meta.fileSize() <= bufferSize.getBytes(), + "Trying to compact changelog file with size {} bytes, " + + "while the buffer size is only {} bytes. This is unexpected.", + meta.fileSize(), + bufferSize.getBytes()); + tasks.add(task); + } + } + }; + addTasks.accept(newFileChangelogFiles, false); + addTasks.accept(compactChangelogFiles, true); + + Semaphore semaphore = new Semaphore((int) bufferSize.getBytes()); + BlockingQueue finishedTasks = new LinkedBlockingQueue<>(); + List> futures = + ThreadPoolUtils.submitAllTasks( + executor, + t -> { + // Why not create `finishedTasks` as a blocking queue and use it to + // limit the total size of bytes awaiting to be copied? Because finished + // tasks are added after their contents are read, so even if + // `finishedTasks` is full, each thread can still read one more file, + // and the limit will become `bytesInThreads + bufferSize`, not just + // `bufferSize`. + try { + semaphore.acquire((int) t.meta.fileSize()); + t.readFully(); + finishedTasks.put(t); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }, + tasks); + OutputStream outputStream = new OutputStream(); List results = new ArrayList<>(); - - // copy all changelog files to a new big file - for (Map.Entry> entry : newFileChangelogFiles.entrySet()) { - int bucket = entry.getKey(); - DataFilePathFactory dataFilePathFactory = - pathFactory.createDataFilePathFactory(partition, bucket); - for (DataFileMeta meta : entry.getValue()) { - copyFile( - outputStream, - results, - table, - dataFilePathFactory.toPath(meta), - bucket, - false, - meta); - } - } - for (Map.Entry> entry : compactChangelogFiles.entrySet()) { - Integer bucket = entry.getKey(); - DataFilePathFactory dataFilePathFactory = - pathFactory.createDataFilePathFactory(partition, bucket); - for (DataFileMeta meta : entry.getValue()) { - copyFile( - outputStream, - results, - table, - dataFilePathFactory.toPath(meta), - bucket, - true, - meta); - } + for (int i = 0; i < tasks.size(); i++) { + // copy all files into a new big file + ReadTask task = finishedTasks.take(); + write(task, outputStream, results); + semaphore.release((int) task.meta.fileSize()); } outputStream.out.close(); + ThreadPoolUtils.awaitAllFutures(futures); return produceNewCommittables(results, table, pathFactory, outputStream.path); } - private void copyFile( - OutputStream outputStream, - List results, - FileStoreTable table, - Path path, - int bucket, - boolean isCompactResult, - DataFileMeta meta) + private void write(ReadTask task, OutputStream outputStream, List results) throws Exception { if (!outputStream.isInitialized) { Path outputPath = - new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); - outputStream.init(outputPath, table.fileIO().newOutputStream(outputPath, false)); + new Path(task.path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); + outputStream.init(outputPath, task.table.fileIO().newOutputStream(outputPath, false)); } - long offset = outputStream.out.getPos(); - try (SeekableInputStream in = table.fileIO().newInputStream(path)) { - IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false); + + if (LOG.isDebugEnabled()) { + LOG.debug("Copying bytes from {} to {}", task.path, outputStream.path); } - table.fileIO().deleteQuietly(path); + long offset = outputStream.out.getPos(); + outputStream.out.write(task.result); results.add( new Result( - bucket, isCompactResult, meta, offset, outputStream.out.getPos() - offset)); + task.bucket, + task.isCompactResult, + task.meta, + offset, + outputStream.out.getPos() - offset)); } private List produceNewCommittables( @@ -172,23 +223,23 @@ private List produceNewCommittables( + baseResult.bucket + "-" + baseResult.length; - table.fileIO() - .rename( - changelogTempPath, - dataFilePathFactory.toAlignedPath( - realName - + "." - + CompactedChangelogReadOnlyFormat.getIdentifier( - baseResult.meta.fileFormat()), - baseResult.meta)); - - List newCommittables = new ArrayList<>(); + Path realPath = + dataFilePathFactory.toAlignedPath( + realName + + "." + + CompactedChangelogReadOnlyFormat.getIdentifier( + baseResult.meta.fileFormat()), + baseResult.meta); + if (LOG.isDebugEnabled()) { + LOG.debug("Rename {} to {}", changelogTempPath, realPath); + } + table.fileIO().rename(changelogTempPath, realPath); Map> bucketedResults = new HashMap<>(); for (Result result : results) { bucketedResults.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result); } - + List newCommittables = new ArrayList<>(); for (Map.Entry> entry : bucketedResults.entrySet()) { List newFilesChangelog = new ArrayList<>(); List compactChangelog = new ArrayList<>(); @@ -256,6 +307,39 @@ public String toString() { partition, newFileChangelogFiles, compactChangelogFiles); } + private static class ReadTask { + + private final FileStoreTable table; + private final Path path; + private final int bucket; + private final boolean isCompactResult; + private final DataFileMeta meta; + + private byte[] result = null; + + private ReadTask( + FileStoreTable table, + Path path, + int bucket, + boolean isCompactResult, + DataFileMeta meta) { + this.table = table; + this.path = path; + this.bucket = bucket; + this.isCompactResult = isCompactResult; + this.meta = meta; + } + + private void readFully() { + try { + result = IOUtils.readFully(table.fileIO().newInputStream(path), true); + table.fileIO().deleteQuietly(path); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + private static class OutputStream { private Path path; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java index 260c25a31561..9d5fb46e3e15 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java @@ -18,8 +18,12 @@ package org.apache.paimon.flink.compact.changelog; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ThreadPoolUtils; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -27,6 +31,7 @@ import org.apache.flink.types.Either; import java.util.List; +import java.util.concurrent.ExecutorService; /** * Receive and process the {@link ChangelogCompactTask}s emitted by {@link @@ -34,20 +39,40 @@ */ public class ChangelogCompactWorkerOperator extends AbstractStreamOperator implements OneInputStreamOperator, Committable> { + private final FileStoreTable table; + private transient ExecutorService executor; + private transient MemorySize bufferSize; + public ChangelogCompactWorkerOperator(FileStoreTable table) { this.table = table; } + @Override + public void open() throws Exception { + Options options = new Options(table.options()); + int numThreads = + options.getOptional(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM) + .orElse(Runtime.getRuntime().availableProcessors()); + executor = + ThreadPoolUtils.createCachedThreadPool( + numThreads, "changelog-compact-async-read-bytes"); + bufferSize = options.get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE); + LOG.info( + "Creating {} threads and a buffer of {} bytes for changelog compaction.", + numThreads, + bufferSize.getBytes()); + } + + @Override public void processElement(StreamRecord> record) throws Exception { - if (record.getValue().isLeft()) { output.collect(new StreamRecord<>(record.getValue().left())); } else { ChangelogCompactTask task = record.getValue().right(); - List committables = task.doCompact(table); + List committables = task.doCompact(table, executor, bufferSize); committables.forEach(committable -> output.collect(new StreamRecord<>(committable))); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 9c5bf112216f..d99dd255e496 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -248,7 +248,7 @@ public DataStream doWrite( "Changelog Compact Coordinator", new EitherTypeInfo<>( new CommittableTypeInfo(), new ChangelogTaskTypeInfo()), - new ChangelogCompactCoordinateOperator(table)) + new ChangelogCompactCoordinateOperator(table.coreOptions())) .forceNonParallel() .transform( "Changelog Compact Worker", diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index c701cefb3076..d1ed5dbc84f5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -679,7 +679,7 @@ public void testChangelogCompactInBatchWrite() throws Exception { + "WITH (" + " 'bucket' = '10',\n" + " 'changelog-producer' = 'lookup',\n" - + " 'changelog.precommit-compact' = 'true',\n" + + " 'precommit-compact' = 'true',\n" + " 'snapshot.num-retained.min' = '3',\n" + " 'snapshot.num-retained.max' = '3'\n" + ")"); @@ -773,7 +773,7 @@ public void testChangelogCompactInStreamWrite() throws Exception { + "WITH (" + " 'bucket' = '10',\n" + " 'changelog-producer' = 'lookup',\n" - + " 'changelog.precommit-compact' = 'true'\n" + + " 'precommit-compact' = 'true'\n" + ")"); Path inputPath = new Path(path, "input"); @@ -1014,7 +1014,7 @@ private void testLookupChangelogProducerRandom( + "'changelog-producer' = 'lookup', " + "'lookup-wait' = '%s', " + "'deletion-vectors.enabled' = '%s', " - + "'changelog.precommit-compact' = '%s'", + + "'precommit-compact' = '%s'", random.nextBoolean() ? "4mb" : "8mb", random.nextBoolean(), enableDeletionVectors, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java new file mode 100644 index 000000000000..4d5e9ac1b9ee --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.types.Either; +import org.apache.flink.util.Preconditions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ChangelogCompactCoordinateOperator}. */ +public class ChangelogCompactCoordinateOperatorTest { + + @Test + public void testPrepareSnapshotWithMultipleFiles() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + ChangelogCompactCoordinateOperator operator = + new ChangelogCompactCoordinateOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness> + testHarness = createTestHarness(operator); + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 1, + BinaryRow.EMPTY_ROW, + 0, + Collections.emptyList(), + Arrays.asList(3, 2, 5, 4)))); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 1, + BinaryRow.EMPTY_ROW, + 1, + Collections.emptyList(), + Arrays.asList(3, 3, 2, 2)))); + testHarness.prepareSnapshotPreBarrier(1); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 2, + BinaryRow.EMPTY_ROW, + 0, + Collections.emptyList(), + Arrays.asList(2, 3)))); + testHarness.prepareSnapshotPreBarrier(2); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(7); + + Map> expected = new HashMap<>(); + expected.put(0, Arrays.asList(3, 2, 5)); + assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, new HashMap<>(), expected); + + expected.clear(); + expected.put(0, Collections.singletonList(4)); + expected.put(1, Arrays.asList(3, 3)); + assertCompactionTask(output.get(2), 1, BinaryRow.EMPTY_ROW, new HashMap<>(), expected); + + expected.clear(); + expected.put(1, Arrays.asList(2, 2)); + assertCompactionTask(output.get(4), 1, BinaryRow.EMPTY_ROW, new HashMap<>(), expected); + + expected.clear(); + expected.put(0, Arrays.asList(2, 3)); + assertCompactionTask(output.get(6), 2, BinaryRow.EMPTY_ROW, new HashMap<>(), expected); + + testHarness.close(); + } + + @Test + public void testPrepareSnapshotWithSingleFile() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + ChangelogCompactCoordinateOperator operator = + new ChangelogCompactCoordinateOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness> + testHarness = createTestHarness(operator); + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 1, + BinaryRow.EMPTY_ROW, + 0, + Arrays.asList(3, 5, 2), + Collections.emptyList()))); + testHarness.prepareSnapshotPreBarrier(1); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(3); + + Map> expected = new HashMap<>(); + expected.put(0, Arrays.asList(3, 5)); + assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, expected, new HashMap<>()); + assertCommittable( + output.get(2), + BinaryRow.EMPTY_ROW, + Collections.singletonList(2), + Collections.emptyList()); + + testHarness.close(); + } + + @Test + public void testPrepareSnapshotWithMultiplePartitions() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + ChangelogCompactCoordinateOperator operator = + new ChangelogCompactCoordinateOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness> + testHarness = createTestHarness(operator); + + Function binaryRow = + i -> { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeInt(0, i); + writer.complete(); + return row; + }; + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 1, + binaryRow.apply(1), + 0, + Collections.emptyList(), + Arrays.asList(3, 2, 5, 4)))); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 1, + binaryRow.apply(2), + 1, + Collections.emptyList(), + Arrays.asList(3, 3, 2, 2, 3)))); + testHarness.prepareSnapshotPreBarrier(1); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 2, + binaryRow.apply(1), + 0, + Collections.emptyList(), + Arrays.asList(2, 3)))); + testHarness.prepareSnapshotPreBarrier(2); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(8); + + Map> expected = new HashMap<>(); + expected.put(0, Arrays.asList(3, 2, 5)); + assertCompactionTask(output.get(0), 1, binaryRow.apply(1), new HashMap<>(), expected); + + expected.clear(); + expected.put(1, Arrays.asList(3, 3, 2)); + assertCompactionTask(output.get(2), 1, binaryRow.apply(2), new HashMap<>(), expected); + + assertCommittable( + output.get(4), + binaryRow.apply(1), + Collections.emptyList(), + Collections.singletonList(4)); + + expected.clear(); + expected.put(1, Arrays.asList(2, 3)); + assertCompactionTask(output.get(5), 1, binaryRow.apply(2), new HashMap<>(), expected); + + expected.clear(); + expected.put(0, Arrays.asList(2, 3)); + assertCompactionTask(output.get(7), 2, binaryRow.apply(1), new HashMap<>(), expected); + + testHarness.close(); + } + + @Test + public void testSkipLargeFiles() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + ChangelogCompactCoordinateOperator operator = + new ChangelogCompactCoordinateOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness> + testHarness = createTestHarness(operator); + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>( + createCommittable( + 1, + BinaryRow.EMPTY_ROW, + 0, + Collections.emptyList(), + Arrays.asList(3, 10, 5, 9)))); + testHarness.prepareSnapshotPreBarrier(1); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(2); + + Map> expected = new HashMap<>(); + expected.put(0, Arrays.asList(3, 5)); + assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, new HashMap<>(), expected); + assertCommittable( + output.get(1), BinaryRow.EMPTY_ROW, Collections.emptyList(), Arrays.asList(10, 9)); + + testHarness.close(); + } + + @SuppressWarnings("unchecked") + private void assertCommittable( + Object o, + BinaryRow partition, + List newFilesChangelogMbs, + List compactChangelogMbs) { + StreamRecord> record = + (StreamRecord>) o; + assertThat(record.getValue().isLeft()).isTrue(); + Committable committable = record.getValue().left(); + + assertThat(committable.checkpointId()).isEqualTo(1); + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + assertThat(message.partition()).isEqualTo(partition); + assertThat(message.bucket()).isEqualTo(0); + + assertSameSizes(message.newFilesIncrement().changelogFiles(), newFilesChangelogMbs); + assertSameSizes(message.compactIncrement().changelogFiles(), compactChangelogMbs); + } + + @SuppressWarnings("unchecked") + private void assertCompactionTask( + Object o, + long checkpointId, + BinaryRow partition, + Map> newFilesChangelogMbs, + Map> compactChangelogMbs) { + StreamRecord> record = + (StreamRecord>) o; + assertThat(record.getValue().isRight()).isTrue(); + ChangelogCompactTask task = record.getValue().right(); + + assertThat(task.checkpointId()).isEqualTo(checkpointId); + assertThat(task.partition()).isEqualTo(partition); + + assertThat(task.newFileChangelogFiles().keySet()).isEqualTo(newFilesChangelogMbs.keySet()); + for (int bucket : task.newFileChangelogFiles().keySet()) { + assertSameSizes( + task.newFileChangelogFiles().get(bucket), newFilesChangelogMbs.get(bucket)); + } + assertThat(task.compactChangelogFiles().keySet()).isEqualTo(compactChangelogMbs.keySet()); + for (int bucket : task.compactChangelogFiles().keySet()) { + assertSameSizes( + task.compactChangelogFiles().get(bucket), compactChangelogMbs.get(bucket)); + } + } + + private void assertSameSizes(List metas, List mbs) { + assertThat(metas.stream().mapToLong(DataFileMeta::fileSize).toArray()) + .containsExactlyInAnyOrder( + mbs.stream() + .mapToLong(mb -> MemorySize.ofMebiBytes(mb).getBytes()) + .toArray()); + } + + private Committable createCommittable( + long checkpointId, + BinaryRow partition, + int bucket, + List newFilesChangelogMbs, + List compactChangelogMbs) { + CommitMessageImpl message = + new CommitMessageImpl( + partition, + bucket, + 2, + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + newFilesChangelogMbs.stream() + .map(this::createDataFileMetaOfSize) + .collect(Collectors.toList())), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + compactChangelogMbs.stream() + .map(this::createDataFileMetaOfSize) + .collect(Collectors.toList()))); + return new Committable(checkpointId, Committable.Kind.FILE, message); + } + + private DataFileMeta createDataFileMetaOfSize(int mb) { + return DataFileMeta.forAppend( + UUID.randomUUID().toString(), + MemorySize.ofMebiBytes(mb).getBytes(), + 0, + SimpleStats.EMPTY_STATS, + 0, + 0, + 1, + Collections.emptyList(), + null, + null, + null, + null); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private OneInputStreamOperatorTestHarness< + Committable, Either> + createTestHarness(ChangelogCompactCoordinateOperator operator) throws Exception { + TypeSerializer serializer = + new EitherSerializer<>( + new CommittableTypeInfo().createSerializer(new ExecutionConfig()), + new ChangelogTaskTypeInfo().createSerializer(new ExecutionConfig())); + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness(operator, 1, 1, 0); + harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer)); + harness.getStreamConfig().serializeAllConfigs(); + harness.setup(serializer); + return harness; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java index 7fcde6214f5f..580220e77d13 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java @@ -56,13 +56,24 @@ public void testSerializer() throws Exception { put(1, newFiles(20)); } }, + new HashMap<>()); + ChangelogCompactTask serializeTask = + serializer.deserialize(serializer.getVersion(), serializer.serialize(task)); + assertThat(task).isEqualTo(serializeTask); + + task = + new ChangelogCompactTask( + 2L, + partition, + 2, + new HashMap<>(), new HashMap>() { { put(0, newFiles(10)); put(1, newFiles(10)); } }); - ChangelogCompactTask serializeTask = serializer.deserialize(2, serializer.serialize(task)); + serializeTask = serializer.deserialize(2, serializer.serialize(task)); assertThat(task).isEqualTo(serializeTask); }