diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index de204ee4529f..23bb827aef33 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -26,12 +26,6 @@ - -
changelog.precommit-compact
- false - Boolean - If true, it will add a changelog compact coordinator and worker operator after the writer operator,in order to compact several changelog files from the same partition into large ones, which can decrease the number of small files. -
end-input.watermark
(none) @@ -110,6 +104,12 @@ Duration You can specify time interval for partition, for example, daily partition is '1 d', hourly partition is '1 h'. + +
precommit-compact
+ false + Boolean + If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files. +
scan.infer-parallelism
true diff --git a/paimon-core/src/main/java/org/apache/paimon/append/UnawareBucketNewFilesCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/UnawareBucketNewFilesCompactionCoordinator.java new file mode 100644 index 000000000000..4425d12a2512 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/UnawareBucketNewFilesCompactionCoordinator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.utils.Pair; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Buffer files from the same partition, until their total size reaches {@code targetFileSize}. */ +public class UnawareBucketNewFilesCompactionCoordinator { + + private final long targetFileSize; + private final Map partitions; + + public UnawareBucketNewFilesCompactionCoordinator(long targetFileSize) { + this.targetFileSize = targetFileSize; + this.partitions = new LinkedHashMap<>(); + } + + public Optional>> addFile( + BinaryRow partition, DataFileMeta file) { + PartitionFiles files = + partitions.computeIfAbsent(partition, ignore -> new PartitionFiles()); + files.addFile(file); + if (files.totalSize >= targetFileSize) { + partitions.remove(partition); + return Optional.of(Pair.of(partition, files.files)); + } else { + return Optional.empty(); + } + } + + public List>> emitAll() { + List>> result = + partitions.entrySet().stream() + .map(e -> Pair.of(e.getKey(), e.getValue().files)) + .collect(Collectors.toList()); + partitions.clear(); + return result; + } + + private static class PartitionFiles { + private final List files = new ArrayList<>(); + private long totalSize = 0; + + private void addFile(DataFileMeta file) { + files.add(file); + totalSize += file.fileSize(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java index e8c4ddfa1c7c..9dab5734718a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java @@ -164,6 +164,10 @@ private Plan tryFirstPlan() { } isFullPhaseEnd = boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId)); + LOG.debug( + "Starting snapshot is {}, next snapshot will be {}.", + scannedResult.plan().snapshotId(), + nextSnapshotId); return scannedResult.plan(); } else if (result instanceof StartingScanner.NextSnapshot) { nextSnapshotId = ((StartingScanner.NextSnapshot) result).nextSnapshotId(); @@ -171,6 +175,9 @@ private Plan tryFirstPlan() { snapshotManager.snapshotExists(nextSnapshotId - 1) && boundedChecker.shouldEndInput( snapshotManager.snapshot(nextSnapshotId - 1)); + LOG.debug("There is no starting snapshot. Next snapshot will be {}.", nextSnapshotId); + } else if (result instanceof StartingScanner.NoSnapshot) { + LOG.debug("There is no starting snapshot and currently there is no next snapshot."); } return SnapshotNotExistPlan.INSTANCE; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 8f7757453c3d..0105202b9d76 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -414,13 +414,16 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); - public static final ConfigOption CHANGELOG_PRECOMMIT_COMPACT = - key("changelog.precommit-compact") + public static final ConfigOption PRECOMMIT_COMPACT = + key("precommit-compact") .booleanType() .defaultValue(false) + .withFallbackKeys("changelog.precommit-compact") .withDescription( - "If true, it will add a changelog compact coordinator and worker operator after the writer operator," - + "in order to compact several changelog files from the same partition into large ones, " + "If true, it will add a compact coordinator and worker operator after the writer operator," + + "in order to compact several changelog files (for primary key tables) " + + "or newly created data files (for unaware bucket tables) " + + "from the same partition into large ones, " + "which can decrease the number of small files. "); public static final ConfigOption SOURCE_OPERATOR_UID_SUFFIX = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java new file mode 100644 index 000000000000..c52c0b2d853c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.UnawareAppendCompactionTask; +import org.apache.paimon.append.UnawareBucketNewFilesCompactionCoordinator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * Coordinator operator for compacting newly created files for unaware bucket tables. + * + *

{@link UnawareBucketNewFilesCompactionCoordinatorOperator} calculates the file size of newly + * created files contained in all buckets within each partition from {@link Committable} message + * emitted from writer operator. And emit {@link UnawareAppendCompactionTask} to {@link + * UnawareBucketNewFilesCompactionWorkerOperator}. + */ +public class UnawareBucketNewFilesCompactionCoordinatorOperator + extends AbstractStreamOperator< + Either>> + implements OneInputStreamOperator< + Committable, + Either>>, + BoundedOneInput { + + private final long targetFileSize; + private final long compactionFileSize; + + private transient UnawareBucketNewFilesCompactionCoordinator coordinator; + private transient long checkpointId; + + public UnawareBucketNewFilesCompactionCoordinatorOperator(CoreOptions options) { + this.targetFileSize = options.targetFileSize(false); + this.compactionFileSize = options.compactionFileSize(false); + } + + @Override + public void open() throws Exception { + super.open(); + + coordinator = new UnawareBucketNewFilesCompactionCoordinator(targetFileSize); + checkpointId = Long.MIN_VALUE; + } + + @Override + public void processElement(StreamRecord record) throws Exception { + Committable committable = record.getValue(); + checkpointId = Math.max(checkpointId, committable.checkpointId()); + if (committable.kind() != Committable.Kind.FILE) { + output.collect(new StreamRecord<>(Either.Left(committable))); + return; + } + + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + if (message.newFilesIncrement().newFiles().isEmpty()) { + output.collect(new StreamRecord<>(Either.Left(committable))); + return; + } + + BinaryRow partition = message.partition(); + List skippedFiles = new ArrayList<>(); + for (DataFileMeta meta : message.newFilesIncrement().newFiles()) { + if (meta.fileSize() >= compactionFileSize) { + skippedFiles.add(meta); + continue; + } + + Optional>> optionalPair = + coordinator.addFile(partition, meta); + if (optionalPair.isPresent()) { + Pair> p = optionalPair.get(); + Preconditions.checkArgument(!p.getValue().isEmpty()); + if (p.getValue().size() > 1) { + output.collect( + new StreamRecord<>( + Either.Right( + Tuple2.of( + checkpointId, + new UnawareAppendCompactionTask( + p.getKey(), p.getValue()))))); + } else { + skippedFiles.add(p.getValue().get(0)); + } + } + } + + CommitMessageImpl newMessage = + new CommitMessageImpl( + message.partition(), + message.bucket(), + new DataIncrement( + skippedFiles, + message.newFilesIncrement().deletedFiles(), + message.newFilesIncrement().changelogFiles()), + message.compactIncrement(), + message.indexIncrement()); + if (!newMessage.isEmpty()) { + Committable newCommittable = + new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); + output.collect(new StreamRecord<>(Either.Left(newCommittable))); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + emitAll(); + } + + @Override + public void endInput() throws Exception { + emitAll(); + } + + private void emitAll() { + for (Pair> p : coordinator.emitAll()) { + Preconditions.checkArgument(!p.getValue().isEmpty()); + if (p.getValue().size() > 1) { + output.collect( + new StreamRecord<>( + Either.Right( + Tuple2.of( + checkpointId, + new UnawareAppendCompactionTask( + p.getKey(), p.getValue()))))); + } else { + CommitMessageImpl message = + new CommitMessageImpl( + p.getKey(), + BucketMode.UNAWARE_BUCKET, + new DataIncrement( + Collections.singletonList(p.getValue().get(0)), + Collections.emptyList(), + Collections.emptyList()), + CompactIncrement.emptyIncrement()); + output.collect( + new StreamRecord<>( + Either.Left( + new Committable( + checkpointId, Committable.Kind.FILE, message)))); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java new file mode 100644 index 000000000000..d634d22ec6e8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.append.UnawareAppendCompactionTask; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.operation.AppendOnlyUnawareBucketFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.FileStorePathFactory; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Receive and process the {@link UnawareAppendCompactionTask}s emitted by {@link + * UnawareBucketNewFilesCompactionCoordinatorOperator}. + */ +public class UnawareBucketNewFilesCompactionWorkerOperator + extends AbstractStreamOperator + implements OneInputStreamOperator< + Either>, Committable> { + + private final FileStoreTable table; + + private transient AppendOnlyUnawareBucketFileStoreWrite write; + private transient FileStorePathFactory pathFactory; + private transient FileIO fileIO; + + public UnawareBucketNewFilesCompactionWorkerOperator(FileStoreTable table) { + this.table = table; + } + + @Override + public void open() throws Exception { + super.open(); + this.write = (AppendOnlyUnawareBucketFileStoreWrite) table.store().newWrite(null); + this.pathFactory = table.store().pathFactory(); + this.fileIO = table.fileIO(); + } + + @Override + public void processElement( + StreamRecord>> record) + throws Exception { + if (record.getValue().isLeft()) { + output.collect(new StreamRecord<>(record.getValue().left())); + } else { + long checkpointId = record.getValue().right().f0; + CommitMessage message = doCompact(record.getValue().right().f1); + output.collect( + new StreamRecord<>( + new Committable(checkpointId, Committable.Kind.FILE, message))); + } + } + + private CommitMessage doCompact(UnawareAppendCompactionTask task) throws Exception { + CommitMessageImpl message = (CommitMessageImpl) task.doCompact(table, write); + + Map toDelete = new HashMap<>(); + for (DataFileMeta meta : message.compactIncrement().compactBefore()) { + toDelete.put(meta.fileName(), meta); + } + for (DataFileMeta meta : message.compactIncrement().compactAfter()) { + toDelete.remove(meta.fileName()); + } + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(task.partition(), message.bucket()); + for (DataFileMeta meta : toDelete.values()) { + fileIO.deleteQuietly(dataFilePathFactory.toPath(meta)); + } + + return new CommitMessageImpl( + message.partition(), + message.bucket(), + new DataIncrement( + message.compactIncrement().compactAfter(), + Collections.emptyList(), + Collections.emptyList()), + CompactIncrement.emptyIncrement()); + } + + @Override + public void close() throws Exception { + if (write != null) { + write.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java index cd0b8716a7d1..d1f6a5abd2c1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -90,7 +90,7 @@ public void processElement(StreamRecord record) { .addNewChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); if (partitionChangelog.totalFileSize >= targetFileSize) { - emitPartitionChanglogCompactTask(partition); + emitPartitionChangelogCompactTask(partition); } } for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { @@ -99,7 +99,7 @@ public void processElement(StreamRecord record) { .addCompactChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); if (partitionChangelog.totalFileSize >= targetFileSize) { - emitPartitionChanglogCompactTask(partition); + emitPartitionChangelogCompactTask(partition); } } @@ -122,14 +122,14 @@ public void processElement(StreamRecord record) { } public void prepareSnapshotPreBarrier(long checkpointId) { - emitAllPartitionsChanglogCompactTask(); + emitAllPartitionsChangelogCompactTask(); } public void endInput() { - emitAllPartitionsChanglogCompactTask(); + emitAllPartitionsChangelogCompactTask(); } - private void emitPartitionChanglogCompactTask(BinaryRow partition) { + private void emitPartitionChangelogCompactTask(BinaryRow partition) { PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); output.collect( new StreamRecord<>( @@ -142,10 +142,10 @@ private void emitPartitionChanglogCompactTask(BinaryRow partition) { partitionChangelogs.remove(partition); } - private void emitAllPartitionsChanglogCompactTask() { + private void emitAllPartitionsChangelogCompactTask() { List partitions = new ArrayList<>(partitionChangelogs.keySet()); for (BinaryRow partition : partitions) { - emitPartitionChanglogCompactTask(partition); + emitPartitionChangelogCompactTask(partition); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 002f5887b5f0..4cd085883d33 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -57,9 +57,9 @@ import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK; +import static org.apache.paimon.flink.FlinkConnectorOptions.PRECOMMIT_COMPACT; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY; @@ -239,7 +239,7 @@ public DataStream doWrite( declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); } - if (options.get(CHANGELOG_PRECOMMIT_COMPACT)) { + if (options.get(PRECOMMIT_COMPACT)) { written = written.transform( "Changelog Compact Coordinator", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java index 7bc40d4c2080..487d0f268986 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java @@ -18,11 +18,17 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionCoordinatorOperator; +import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionWorkerOperator; import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory; +import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStream; @@ -59,6 +65,28 @@ public DataStream doWrite( DataStream input, String initialCommitUser, @Nullable Integer parallelism) { DataStream written = super.doWrite(input, initialCommitUser, this.parallelism); + Options options = new Options(table.options()); + if (options.get(FlinkConnectorOptions.PRECOMMIT_COMPACT)) { + written = + written.transform( + "New Files Compact Coordinator: " + table.name(), + new EitherTypeInfo<>( + new CommittableTypeInfo(), + new TupleTypeInfo<>( + BasicTypeInfo.LONG_TYPE_INFO, + new CompactionTaskTypeInfo())), + new UnawareBucketNewFilesCompactionCoordinatorOperator( + table.coreOptions())) + .startNewChain() + .forceNonParallel() + .transform( + "New Files Compact Worker: " + table.name(), + new CommittableTypeInfo(), + new UnawareBucketNewFilesCompactionWorkerOperator(table)) + .startNewChain() + .setParallelism(written.getParallelism()); + } + boolean enableCompaction = !table.coreOptions().writeOnly(); boolean isStreamingMode = input.getExecutionEnvironment() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index b551d63d7b6e..839a45a6e47a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -42,6 +42,8 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Files; @@ -63,6 +65,7 @@ public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase { private static final int TIMEOUT = 180; + private static final Logger LOG = LoggerFactory.getLogger(PrimaryKeyFileStoreTableITCase.class); // ------------------------------------------------------------------------ // Test Utilities @@ -1031,6 +1034,9 @@ private void checkChangelogTestResult(int numProducers) throws Exception { try (CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM T"))) { while (it.hasNext()) { Row row = it.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Changelog get {}", row); + } checker.addChangelog(row); if (((long) row.getField(2)) >= LIMIT) { endCnt++; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java new file mode 100644 index 000000000000..4f734ff6da8c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.UnawareAppendCompactionTask; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.types.Either; +import org.apache.flink.util.Preconditions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UnawareBucketNewFilesCompactionCoordinatorOperator}. */ +public class UnawareBucketNewFilesCompactionCoordinatorOperatorTest { + + @Test + public void testPrepareSnapshotWithMultipleFiles() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + UnawareBucketNewFilesCompactionCoordinatorOperator operator = + new UnawareBucketNewFilesCompactionCoordinatorOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness< + Committable, Either>> + testHarness = createTestHarness(operator); + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>(createCommittable(1, BinaryRow.EMPTY_ROW, 3, 5, 1, 2, 3))); + testHarness.prepareSnapshotPreBarrier(1); + testHarness.processElement( + new StreamRecord<>(createCommittable(2, BinaryRow.EMPTY_ROW, 3, 2))); + testHarness.prepareSnapshotPreBarrier(2); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(3); + assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, 3, 5); + assertCompactionTask(output.get(1), 1, BinaryRow.EMPTY_ROW, 1, 2, 3); + assertCompactionTask(output.get(2), 2, BinaryRow.EMPTY_ROW, 3, 2); + + testHarness.close(); + } + + @Test + public void testPrepareSnapshotWithSingleFile() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + UnawareBucketNewFilesCompactionCoordinatorOperator operator = + new UnawareBucketNewFilesCompactionCoordinatorOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness< + Committable, Either>> + testHarness = createTestHarness(operator); + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>(createCommittable(1, BinaryRow.EMPTY_ROW, 3, 5, 1))); + testHarness.prepareSnapshotPreBarrier(1); + testHarness.processElement( + new StreamRecord<>(createCommittable(2, BinaryRow.EMPTY_ROW, 4))); + testHarness.prepareSnapshotPreBarrier(2); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(3); + assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, 3, 5); + assertCommittable(output.get(1), 1, BinaryRow.EMPTY_ROW, 1); + assertCommittable(output.get(2), 2, BinaryRow.EMPTY_ROW, 4); + + testHarness.close(); + } + + @Test + public void testPrepareSnapshotWithMultiplePartitions() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + UnawareBucketNewFilesCompactionCoordinatorOperator operator = + new UnawareBucketNewFilesCompactionCoordinatorOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness< + Committable, Either>> + testHarness = createTestHarness(operator); + + Function binaryRow = + i -> { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeInt(0, i); + writer.complete(); + return row; + }; + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>(createCommittable(1, binaryRow.apply(1), 3, 5, 1, 2, 3))); + testHarness.processElement( + new StreamRecord<>(createCommittable(1, binaryRow.apply(2), 3, 2, 4, 3))); + testHarness.prepareSnapshotPreBarrier(1); + testHarness.processElement( + new StreamRecord<>(createCommittable(2, binaryRow.apply(2), 3, 2))); + testHarness.prepareSnapshotPreBarrier(2); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(5); + + assertCompactionTask(output.get(0), 1, binaryRow.apply(1), 3, 5); + assertCompactionTask(output.get(1), 1, binaryRow.apply(2), 3, 2, 4); + assertCompactionTask(output.get(2), 1, binaryRow.apply(1), 1, 2, 3); + assertCommittable(output.get(3), 1, binaryRow.apply(2), 3); + assertCompactionTask(output.get(4), 2, binaryRow.apply(2), 3, 2); + + testHarness.close(); + } + + @Test + public void testSkipLargeFiles() throws Exception { + Options options = new Options(); + options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8)); + UnawareBucketNewFilesCompactionCoordinatorOperator operator = + new UnawareBucketNewFilesCompactionCoordinatorOperator(new CoreOptions(options)); + OneInputStreamOperatorTestHarness< + Committable, Either>> + testHarness = createTestHarness(operator); + + testHarness.open(); + testHarness.processElement( + new StreamRecord<>(createCommittable(1, BinaryRow.EMPTY_ROW, 8, 3, 5, 9))); + testHarness.prepareSnapshotPreBarrier(1); + + List output = new ArrayList<>(testHarness.getOutput()); + assertThat(output).hasSize(2); + assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, 3, 5); + assertCommittable(output.get(1), 1, BinaryRow.EMPTY_ROW, 8, 9); + + testHarness.close(); + } + + @SuppressWarnings("unchecked") + private void assertCommittable(Object o, long checkpointId, BinaryRow partition, int... mbs) { + StreamRecord>> record = + (StreamRecord>>) o; + assertThat(record.getValue().isLeft()).isTrue(); + Committable committable = record.getValue().left(); + assertThat(committable.checkpointId()).isEqualTo(checkpointId); + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + assertThat(message.partition()).isEqualTo(partition); + assertThat(message.newFilesIncrement().deletedFiles()).isEmpty(); + assertThat(message.newFilesIncrement().changelogFiles()).isEmpty(); + assertThat(message.compactIncrement().isEmpty()).isTrue(); + assertThat(message.indexIncrement().isEmpty()).isTrue(); + assertThat(message.newFilesIncrement().newFiles().stream().map(DataFileMeta::fileSize)) + .hasSameElementsAs( + Arrays.stream(mbs) + .mapToObj(i -> MemorySize.ofMebiBytes(i).getBytes()) + .collect(Collectors.toList())); + } + + @SuppressWarnings("unchecked") + private void assertCompactionTask( + Object o, long checkpointId, BinaryRow partition, int... mbs) { + StreamRecord>> record = + (StreamRecord>>) o; + assertThat(record.getValue().isRight()).isTrue(); + assertThat(record.getValue().right().f0).isEqualTo(checkpointId); + UnawareAppendCompactionTask task = record.getValue().right().f1; + assertThat(task.partition()).isEqualTo(partition); + assertThat(task.compactBefore().stream().map(DataFileMeta::fileSize)) + .hasSameElementsAs( + Arrays.stream(mbs) + .mapToObj(i -> MemorySize.ofMebiBytes(i).getBytes()) + .collect(Collectors.toList())); + } + + private Committable createCommittable(long checkpointId, BinaryRow partition, int... mbs) { + CommitMessageImpl message = + new CommitMessageImpl( + partition, + BucketMode.UNAWARE_BUCKET, + new DataIncrement( + Arrays.stream(mbs) + .mapToObj(this::createDataFileMetaOfSize) + .collect(Collectors.toList()), + Collections.emptyList(), + Collections.emptyList()), + CompactIncrement.emptyIncrement()); + return new Committable(checkpointId, Committable.Kind.FILE, message); + } + + private DataFileMeta createDataFileMetaOfSize(int mb) { + return DataFileMeta.forAppend( + UUID.randomUUID().toString(), + MemorySize.ofMebiBytes(mb).getBytes(), + 0, + SimpleStats.EMPTY_STATS, + 0, + 0, + 1, + Collections.emptyList(), + null, + null, + null, + null); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private OneInputStreamOperatorTestHarness< + Committable, Either>> + createTestHarness(UnawareBucketNewFilesCompactionCoordinatorOperator operator) + throws Exception { + TypeSerializer serializer = + new EitherSerializer<>( + new CommittableTypeInfo().createSerializer(new ExecutionConfig()), + new TupleTypeInfo<>( + BasicTypeInfo.LONG_TYPE_INFO, new CompactionTaskTypeInfo()) + .createSerializer(new ExecutionConfig())); + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness(operator, 1, 1, 0); + harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer)); + harness.getStreamConfig().serializeAllConfigs(); + harness.setup(serializer); + return harness; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java new file mode 100644 index 000000000000..90444287d4f8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.flink.util.AbstractTestBase; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * IT cases for {@link UnawareBucketNewFilesCompactionCoordinatorOperator} and {@link + * UnawareBucketNewFilesCompactionWorkerOperator}. + */ +public class UnawareBucketNewFilesCompactionITCase extends AbstractTestBase { + + @Test + public void testCompactNewFiles() throws Exception { + String warehouse = getTempDirPath(); + TableEnvironment tEnv = + tableEnvironmentBuilder() + .batchMode() + .parallelism(2) + .setConf(TableConfigOptions.TABLE_DML_SYNC, true) + .build(); + tEnv.executeSql( + "CREATE CATALOG mycat WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"); + tEnv.executeSql("USE CATALOG mycat"); + tEnv.executeSql( + "CREATE TABLE T (\n" + + " pt INT,\n" + + " a INT,\n" + + " b STRING\n" + + ") PARTITIONED BY (pt) WITH (\n" + + " 'write-only' = 'true',\n" + + " 'compaction.min.file-num' = '3',\n" + + " 'compaction.max.file-num' = '3',\n" + + " 'precommit-compact' = 'true',\n" + + " 'sink.parallelism' = '2'\n" + + ")"); + + List values = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + for (int a = 0; a < 50; a++) { + values.add(String.format("(%d, %d, '%d')", pt, a, a * 1000)); + } + } + + Supplier> getActual = + () -> { + Map result = new HashMap<>(); + try (CloseableIterator it = tEnv.executeSql("SELECT * FROM T").collect()) { + while (it.hasNext()) { + Row row = it.next(); + assertThat(row.getArity()).isEqualTo(3); + result.compute( + String.format( + "(%s, %s, '%s')", + row.getField(0), row.getField(1), row.getField(2)), + (k, v) -> v == null ? 1 : v + 1); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return result; + }; + + LocalFileIO fileIO = LocalFileIO.create(); + for (int r = 1; r <= 3; r++) { + tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", values)).await(); + assertThat(fileIO.listStatus(new Path(warehouse, "default.db/T/pt=0/bucket-0"))) + .hasSize(r); + assertThat(fileIO.listStatus(new Path(warehouse, "default.db/T/pt=1/bucket-0"))) + .hasSize(r); + Map actual = getActual.get(); + assertThat(actual.keySet()).hasSameElementsAs(values); + final int e = r; + assertThat(actual.values()).allMatch(i -> i == e); + } + + tEnv.executeSql("CALL sys.compact('default.T')").await(); + assertThat(fileIO.listStatus(new Path(warehouse, "default.db/T/pt=0/bucket-0"))).hasSize(4); + assertThat(fileIO.listStatus(new Path(warehouse, "default.db/T/pt=1/bucket-0"))).hasSize(4); + Map actual = getActual.get(); + assertThat(actual.keySet()).hasSameElementsAs(values); + assertThat(actual.values()).allMatch(i -> i == 3); + + tEnv.executeSql("CALL sys.expire_snapshots(`table` => 'default.T', retain_max => 1)") + .await(); + assertThat(fileIO.listStatus(new Path(warehouse, "default.db/T/pt=0/bucket-0"))).hasSize(1); + assertThat(fileIO.listStatus(new Path(warehouse, "default.db/T/pt=1/bucket-0"))).hasSize(1); + actual = getActual.get(); + assertThat(actual.keySet()).hasSameElementsAs(values); + assertThat(actual.values()).allMatch(i -> i == 3); + } +}