From 815fd1cf6688caec1210f0c41d01a02089a340c8 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Wed, 25 Sep 2024 14:49:21 +0800 Subject: [PATCH 1/7] [flink] Small changelog files can now be compacted into big files # Conflicts: # paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java --- docs/content/maintenance/write-performance.md | 13 + .../flink_connector_configuration.html | 6 + .../java/org/apache/paimon/utils/IOUtils.java | 2 +- .../paimon/flink/FlinkConnectorOptions.java | 11 + .../changelog/ChangelogCompactOperator.java | 299 ++++++++++++++++++ ...CompactedChangelogFormatReaderFactory.java | 252 +++++++++++++++ .../CompactedChangelogReadOnlyFormat.java | 106 +++++++ .../apache/paimon/flink/sink/FlinkSink.java | 12 + ...org.apache.paimon.format.FileFormatFactory | 18 ++ .../flink/PrimaryKeyFileStoreTableITCase.java | 130 +++++++- 10 files changed, 839 insertions(+), 10 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java create mode 100644 paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md index 03e734874c05..4b3b5788b787 100644 --- a/docs/content/maintenance/write-performance.md +++ b/docs/content/maintenance/write-performance.md @@ -160,3 +160,16 @@ You can use fine-grained-resource-management of Flink to increase committer heap 1. Configure Flink Configuration `cluster.fine-grained-resource-management.enabled: true`. (This is default after Flink 1.18) 2. Configure Paimon Table Options: `sink.committer-memory`, for example 300 MB, depends on your `TaskManager`. (`sink.committer-cpu` is also supported) + +## Changelog Compaction + +If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large, +each snapshot may produce lots of small changelog files. +Too many files may put a burden on the distributed storage cluster. + +In order to compact small changelog files into large ones, you can set the table option `changelog.compact.parallelism`. +This option will add a compact operator after the writer operator, which copies changelog files into large ones. +If the parallelism becomes larger, file copying will become faster. +However, the number of resulting files will also become larger. +As file copying is fast in most storage system, +we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed. diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index ea5dd5dfb163..030eb7691f8d 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -26,6 +26,12 @@ + +
changelog.compact.parallelism
+ (none) + Integer + Compact several changelog files from the same partition into one file, in order to decrease the number of small files. This property sets the parallelism of the compact operator. More parallelism means faster file copy, however the number of resulting files will also become larger. +
end-input.watermark
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java index 3df81d2d11d3..29352b4e8161 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java @@ -38,7 +38,7 @@ public final class IOUtils { private static final Logger LOG = LoggerFactory.getLogger(IOUtils.class); /** The block size for byte operations in byte. */ - private static final int BLOCKSIZE = 4096; + public static final int BLOCKSIZE = 4096; // ------------------------------------------------------------------------ // Byte copy operations diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 2c12b70a2493..66b4c376238f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -397,6 +397,17 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); + public static final ConfigOption CHANGELOG_COMPACT_PARALLELISM = + key("changelog.compact.parallelism") + .intType() + .noDefaultValue() + .withDescription( + "Compact several changelog files from the same partition into one file, " + + "in order to decrease the number of small files. " + + "This property sets the parallelism of the compact operator. " + + "More parallelism means faster file copy, " + + "however the number of resulting files will also become larger."); + public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); final List> list = new ArrayList<>(fields.length); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java new file mode 100644 index 000000000000..973fd5c647c9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Operator to compact several changelog files from the same partition into one file, in order to + * reduce the number of small files. + */ +public class ChangelogCompactOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private final FileStoreTable table; + + private transient FileStorePathFactory pathFactory; + private transient long checkpointId; + private transient Map outputStreams; + private transient Map> results; + + public ChangelogCompactOperator(FileStoreTable table) { + this.table = table; + } + + @Override + public void open() throws Exception { + super.open(); + + pathFactory = table.store().pathFactory(); + checkpointId = Long.MIN_VALUE; + outputStreams = new HashMap<>(); + results = new HashMap<>(); + } + + @Override + public void processElement(StreamRecord record) throws Exception { + Committable committable = record.getValue(); + checkpointId = Math.max(checkpointId, committable.checkpointId()); + if (committable.kind() != Committable.Kind.FILE) { + output.collect(record); + return; + } + + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + if (message.newFilesIncrement().changelogFiles().isEmpty() + && message.compactIncrement().changelogFiles().isEmpty()) { + output.collect(record); + return; + } + + // copy changelogs from the same partition into one file + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(message.partition(), message.bucket()); + for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) { + copyFile( + dataFilePathFactory.toPath(meta.fileName()), + message.partition(), + message.bucket(), + false, + meta); + } + for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { + copyFile( + dataFilePathFactory.toPath(meta.fileName()), + message.partition(), + message.bucket(), + true, + meta); + } + + // send commit message without changelog files + CommitMessageImpl newMessage = + new CommitMessageImpl( + message.partition(), + message.bucket(), + new DataIncrement( + message.newFilesIncrement().newFiles(), + message.newFilesIncrement().deletedFiles(), + Collections.emptyList()), + new CompactIncrement( + message.compactIncrement().compactBefore(), + message.compactIncrement().compactAfter(), + Collections.emptyList()), + message.indexIncrement()); + Committable newCommittable = + new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); + if (record.hasTimestamp()) { + output.collect(new StreamRecord<>(newCommittable, record.getTimestamp())); + } else { + output.collect(new StreamRecord<>(newCommittable)); + } + } + + private void copyFile( + Path path, BinaryRow partition, int bucket, boolean isCompactResult, DataFileMeta meta) + throws Exception { + if (!outputStreams.containsKey(partition)) { + Path outputPath = + new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); + outputStreams.put( + partition, + new OutputStream( + outputPath, table.fileIO().newOutputStream(outputPath, false))); + } + + OutputStream outputStream = outputStreams.get(partition); + long offset = outputStream.out.getPos(); + try (SeekableInputStream in = table.fileIO().newInputStream(path)) { + IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false); + } + table.fileIO().deleteQuietly(path); + results.computeIfAbsent(partition, p -> new ArrayList<>()) + .add( + new Result( + bucket, + isCompactResult, + meta, + offset, + outputStream.out.getPos() - offset)); + + if (outputStream.out.getPos() >= table.coreOptions().targetFileSize(false)) { + flushPartition(partition); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + flushAllPartitions(); + } + + @Override + public void endInput() throws Exception { + flushAllPartitions(); + } + + private void flushAllPartitions() throws Exception { + List partitions = new ArrayList<>(outputStreams.keySet()); + for (BinaryRow partition : partitions) { + flushPartition(partition); + } + } + + private void flushPartition(BinaryRow partition) throws Exception { + OutputStream outputStream = outputStreams.get(partition); + outputStream.out.close(); + + Result baseResult = results.get(partition).get(0); + Preconditions.checkArgument(baseResult.offset == 0); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, baseResult.bucket); + // see Java docs of `CompactedChangelogFormatReaderFactory` + String realName = + "compacted-changelog-" + + UUID.randomUUID() + + "$" + + baseResult.bucket + + "-" + + baseResult.length; + table.fileIO() + .rename( + outputStream.path, + dataFilePathFactory.toPath( + realName + + "." + + CompactedChangelogReadOnlyFormat.getIdentifier( + baseResult.meta.fileFormat()))); + + Map> grouped = new HashMap<>(); + for (Result result : results.get(partition)) { + grouped.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result); + } + + for (Map.Entry> entry : grouped.entrySet()) { + List newFilesChangelog = new ArrayList<>(); + List compactChangelog = new ArrayList<>(); + for (Result result : entry.getValue()) { + // see Java docs of `CompactedChangelogFormatReaderFactory` + String name = + (result.offset == 0 + ? realName + : realName + "-" + result.offset + "-" + result.length) + + "." + + CompactedChangelogReadOnlyFormat.getIdentifier( + result.meta.fileFormat()); + if (result.isCompactResult) { + compactChangelog.add(result.meta.rename(name)); + } else { + newFilesChangelog.add(result.meta.rename(name)); + } + } + + CommitMessageImpl newMessage = + new CommitMessageImpl( + partition, + entry.getKey(), + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + newFilesChangelog), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + compactChangelog)); + Committable newCommittable = + new Committable(checkpointId, Committable.Kind.FILE, newMessage); + output.collect(new StreamRecord<>(newCommittable)); + } + + outputStreams.remove(partition); + results.remove(partition); + } + + @Override + public void close() throws Exception { + for (Map.Entry entry : outputStreams.entrySet()) { + OutputStream outputStream = entry.getValue(); + try { + outputStream.out.close(); + } catch (Exception e) { + LOG.warn("Failed to close output stream for file " + outputStream.path, e); + } + table.fileIO().deleteQuietly(outputStream.path); + } + + outputStreams.clear(); + results.clear(); + } + + private static class OutputStream { + + private final Path path; + private final PositionOutputStream out; + + private OutputStream(Path path, PositionOutputStream out) { + this.path = path; + this.out = out; + } + } + + private static class Result { + + private final int bucket; + private final boolean isCompactResult; + private final DataFileMeta meta; + private final long offset; + private final long length; + + private Result( + int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) { + this.bucket = bucket; + this.isCompactResult = isCompactResult; + this.meta = meta; + this.offset = offset; + this.length = length; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java new file mode 100644 index 000000000000..8d17311518df --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog.format; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.reader.RecordReader; + +import java.io.EOFException; +import java.io.IOException; + +/** + * {@link FormatReaderFactory} for compacted changelog. + * + *

File Name Protocol + * + *

There are two kinds of file name. In the following description, bid1 and + * bid2 are bucket id, off is offset, len1 and len2 + * are lengths. + * + *

    + *
  • bucket-bid1/compacted-changelog-xxx$bid1-len1: This is the real file name. If + * this file name is recorded in manifest file meta, reader should read the bytes of this file + * starting from offset 0 with length len1. + *
  • bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2: This is the fake file + * name. Reader should read the bytes of file + * bucket-bid1/compacted-changelog-xxx$bid1-len1 starting from offset off + * with length len2. + *
+ */ +public class CompactedChangelogFormatReaderFactory implements FormatReaderFactory { + + private final FormatReaderFactory wrapped; + + public CompactedChangelogFormatReaderFactory(FormatReaderFactory wrapped) { + this.wrapped = wrapped; + } + + @Override + public RecordReader createReader(Context context) throws IOException { + OffsetReadOnlyFileIO fileIO = new OffsetReadOnlyFileIO(context.fileIO()); + long length = decodePath(context.filePath()).length; + + return wrapped.createReader( + new Context() { + + @Override + public FileIO fileIO() { + return fileIO; + } + + @Override + public Path filePath() { + return context.filePath(); + } + + @Override + public long fileSize() { + return length; + } + }); + } + + private static DecodeResult decodePath(Path path) { + String[] nameAndFormat = path.getName().split("\\."); + String[] names = nameAndFormat[0].split("\\$"); + String[] split = names[1].split("-"); + if (split.length == 2) { + return new DecodeResult(path, 0, Long.parseLong(split[1])); + } else { + Path realPath = + new Path( + path.getParent().getParent(), + "bucket-" + + split[0] + + "/" + + names[0] + + "$" + + split[0] + + "-" + + split[1] + + "." + + nameAndFormat[1]); + return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3])); + } + } + + private static class DecodeResult { + + private final Path path; + private final long offset; + private final long length; + + private DecodeResult(Path path, long offset, long length) { + this.path = path; + this.offset = offset; + this.length = length; + } + } + + private static class OffsetReadOnlyFileIO implements FileIO { + + private final FileIO wrapped; + + private OffsetReadOnlyFileIO(FileIO wrapped) { + this.wrapped = wrapped; + } + + @Override + public boolean isObjectStore() { + return wrapped.isObjectStore(); + } + + @Override + public void configure(CatalogContext context) { + wrapped.configure(context); + } + + @Override + public SeekableInputStream newInputStream(Path path) throws IOException { + DecodeResult result = decodePath(path); + return new OffsetSeekableInputStream( + wrapped.newInputStream(result.path), result.offset, result.length); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + DecodeResult result = decodePath(path); + FileStatus status = wrapped.getFileStatus(result.path); + + return new FileStatus() { + + @Override + public long getLen() { + return result.length; + } + + @Override + public boolean isDir() { + return status.isDir(); + } + + @Override + public Path getPath() { + return path; + } + + @Override + public long getModificationTime() { + return status.getModificationTime(); + } + }; + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean exists(Path path) throws IOException { + return wrapped.exists(decodePath(path).path); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + throw new UnsupportedOperationException(); + } + } + + private static class OffsetSeekableInputStream extends SeekableInputStream { + + private final SeekableInputStream wrapped; + private final long offset; + private final long length; + + private OffsetSeekableInputStream(SeekableInputStream wrapped, long offset, long length) + throws IOException { + this.wrapped = wrapped; + this.offset = offset; + this.length = length; + wrapped.seek(offset); + } + + @Override + public void seek(long desired) throws IOException { + wrapped.seek(offset + desired); + } + + @Override + public long getPos() throws IOException { + return wrapped.getPos() - offset; + } + + @Override + public int read() throws IOException { + if (getPos() >= length) { + throw new EOFException(); + } + return wrapped.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + long realLen = Math.min(len, length - getPos()); + return wrapped.read(b, off, (int) realLen); + } + + @Override + public void close() throws IOException { + wrapped.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java new file mode 100644 index 000000000000..39bed81505c6 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog.format; + +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.List; + +/** {@link FileFormat} for compacted changelog. */ +public class CompactedChangelogReadOnlyFormat extends FileFormat { + + private final FileFormat wrapped; + + protected CompactedChangelogReadOnlyFormat(String formatIdentifier, FileFormat wrapped) { + super(formatIdentifier); + this.wrapped = wrapped; + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType projectedRowType, @Nullable List filters) { + return new CompactedChangelogFormatReaderFactory( + wrapped.createReaderFactory(projectedRowType, filters)); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + throw new UnsupportedOperationException(); + } + + @Override + public void validateDataFields(RowType rowType) { + wrapped.validateDataFields(rowType); + } + + public static String getIdentifier(String wrappedFormat) { + return "cc-" + wrappedFormat; + } + + static class AbstractFactory implements FileFormatFactory { + + private final String format; + + AbstractFactory(String format) { + this.format = format; + } + + @Override + public String identifier() { + return getIdentifier(format); + } + + @Override + public FileFormat create(FormatContext formatContext) { + return new CompactedChangelogReadOnlyFormat( + getIdentifier(format), FileFormat.fromIdentifier(format, formatContext)); + } + } + + /** {@link FileFormatFactory} for compacted changelog, with orc as the real format. */ + public static class OrcFactory extends AbstractFactory { + + public OrcFactory() { + super("orc"); + } + } + + /** {@link FileFormatFactory} for compacted changelog, with parquet as the real format. */ + public static class ParquetFactory extends AbstractFactory { + + public ParquetFactory() { + super("parquet"); + } + } + + /** {@link FileFormatFactory} for compacted changelog, with avro as the real format. */ + public static class AvroFactory extends AbstractFactory { + + public AvroFactory() { + super("avro"); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index e483e3c19f74..cbf193adfee9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; +import org.apache.paimon.flink.compact.changelog.ChangelogCompactOperator; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -54,6 +55,7 @@ import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.createCommitUser; +import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_COMPACT_PARALLELISM; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT; @@ -226,6 +228,16 @@ public DataStream doWrite( if (options.get(SINK_USE_MANAGED_MEMORY)) { declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); } + + if (options.contains(CHANGELOG_COMPACT_PARALLELISM)) { + written = + written.transform( + "Changelog Compactor", + new CommittableTypeInfo(), + new ChangelogCompactOperator(table)) + .setParallelism(options.get(CHANGELOG_COMPACT_PARALLELISM)); + } + return written; } diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory new file mode 100644 index 000000000000..6e7553d5c668 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$OrcFactory +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$ParquetFactory +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$AvroFactory diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 2bdd90a963d9..879a4a076cb3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -48,6 +49,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -303,9 +305,7 @@ private void innerTestChangelogProducing(List options) throws Exception public void testBatchJobWithConflictAndRestart() throws Exception { TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().allowRestart(10).build(); tEnv.executeSql( - "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" - + getTempDirPath() - + "' )"); + "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )"); tEnv.executeSql("USE CATALOG mycat"); tEnv.executeSql( "CREATE TABLE t ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) " @@ -335,6 +335,115 @@ public void testBatchJobWithConflictAndRestart() throws Exception { } } + @Test + @Timeout(120) + public void testChangelogCompact() throws Exception { + TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); + String catalogDdl = + "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )"; + bEnv.executeSql(catalogDdl); + bEnv.executeSql("USE CATALOG mycat"); + bEnv.executeSql( + "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) " + + "PARTITIONED BY (pt) " + + "WITH (" + + " 'bucket' = '2',\n" + + " 'changelog-producer' = 'lookup',\n" + + " 'changelog.compact.parallelism' = '1',\n" + + " 'snapshot.num-retained.min' = '3',\n" + + " 'snapshot.num-retained.max' = '3'\n" + + ")"); + + TableEnvironment sEnv = + tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(1000).build(); + sEnv.executeSql(catalogDdl); + sEnv.executeSql("USE CATALOG mycat"); + + List values = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + values.add(String.format("(0, %d, %d)", i, i)); + values.add(String.format("(1, %d, %d)", i, i)); + } + bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await(); + + List compactedChangelogs2 = listAllFilesWithPrefix("compacted-changelog-"); + assertThat(compactedChangelogs2).hasSize(2); + assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); + + List expected = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + expected.add(Row.ofKind(RowKind.INSERT, 0, i, i)); + expected.add(Row.ofKind(RowKind.INSERT, 1, i, i)); + } + assertStreamingResult( + sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"), + expected); + + values.clear(); + for (int i = 0; i < 100; i++) { + values.add(String.format("(0, %d, %d)", i, i + 1)); + values.add(String.format("(1, %d, %d)", i, i + 1)); + } + bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await(); + + assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4); + assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); + + for (int i = 0; i < 100; i++) { + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1)); + } + assertStreamingResult( + sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"), + expected); + + values.clear(); + for (int i = 0; i < 100; i++) { + values.add(String.format("(0, %d, %d)", i, i + 2)); + values.add(String.format("(1, %d, %d)", i, i + 2)); + } + bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await(); + + assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4); + assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); + LocalFileIO fileIO = LocalFileIO.create(); + for (String p : compactedChangelogs2) { + assertThat(fileIO.exists(new Path(p))).isFalse(); + } + + expected = expected.subList(200, 600); + for (int i = 0; i < 100; i++) { + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i + 1)); + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i + 1)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 2)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 2)); + } + assertStreamingResult( + sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"), + expected); + } + + private List listAllFilesWithPrefix(String prefix) throws Exception { + try (Stream stream = Files.walk(java.nio.file.Paths.get(path))) { + return stream.filter(Files::isRegularFile) + .filter(p -> p.getFileName().toString().startsWith(prefix)) + .map(java.nio.file.Path::toString) + .collect(Collectors.toList()); + } + } + + private void assertStreamingResult(TableResult result, List expected) throws Exception { + List actual = new ArrayList<>(); + try (CloseableIterator it = result.collect()) { + while (actual.size() < expected.size() && it.hasNext()) { + actual.add(it.next()); + } + } + assertThat(actual).hasSameElementsAs(expected); + } + // ------------------------------------------------------------------------ // Random Tests // ------------------------------------------------------------------------ @@ -490,13 +599,16 @@ private void testLookupChangelogProducerRandom( enableFailure, "'bucket' = '4'," + String.format( - "'write-buffer-size' = '%s'," - + "'changelog-producer' = 'lookup'," - + "'lookup-wait' = '%s'," - + "'deletion-vectors.enabled' = '%s'", - random.nextBoolean() ? "4mb" : "8mb", + "'bucket' = '4', " + + "'writer-buffer-size' = '%s', " + + "'changelog-producer' = 'lookup', " + + "'lookup-wait' = '%s', " + + "'deletion-vectors.enabled' = '%s', " + + "'changelog.compact.parallelism' = '%s'", + random.nextBoolean() ? "512kb" : "1mb", random.nextBoolean(), - enableDeletionVectors)); + enableDeletionVectors, + random.nextInt(1, 3))); // sleep for a random amount of time to check // if we can first read complete records then read incremental records correctly From 4b61315599c0ccaca2b671c07083011d6b08ea9f Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 18 Oct 2024 15:46:47 +0800 Subject: [PATCH 2/7] [flink] add coordinate and worker operator for small changelog files compaction --- docs/content/maintenance/write-performance.md | 8 +- .../flink_connector_configuration.html | 6 +- .../paimon/flink/FlinkConnectorOptions.java | 16 +- .../ChangelogCompactCoordinateOperator.java | 177 ++++++++++++ .../changelog/ChangelogCompactTask.java | 255 ++++++++++++++++++ .../ChangelogCompactTaskSerializer.java | 113 ++++++++ .../ChangelogCompactWorkerOperator.java | 54 ++++ .../changelog/ChangelogTaskTypeInfo.java | 85 ++++++ .../apache/paimon/flink/sink/FlinkSink.java | 21 +- .../flink/PrimaryKeyFileStoreTableITCase.java | 104 ++++++- .../ChangelogCompactTaskSerializerTest.java | 94 +++++++ 11 files changed, 899 insertions(+), 34 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md index 4b3b5788b787..1456f79ede5b 100644 --- a/docs/content/maintenance/write-performance.md +++ b/docs/content/maintenance/write-performance.md @@ -167,9 +167,5 @@ If Flink's checkpoint interval is short (for example, 30 seconds) and the number each snapshot may produce lots of small changelog files. Too many files may put a burden on the distributed storage cluster. -In order to compact small changelog files into large ones, you can set the table option `changelog.compact.parallelism`. -This option will add a compact operator after the writer operator, which copies changelog files into large ones. -If the parallelism becomes larger, file copying will become faster. -However, the number of resulting files will also become larger. -As file copying is fast in most storage system, -we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed. +In order to compact small changelog files into large ones, you can set the table option `changelog.precommit-compact = true`. +Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer operator, which copies changelog files into large ones. diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 030eb7691f8d..f7c078c55f74 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -27,10 +27,10 @@ -
changelog.compact.parallelism
+
changelog.precommit-compact
(none) - Integer - Compact several changelog files from the same partition into one file, in order to decrease the number of small files. This property sets the parallelism of the compact operator. More parallelism means faster file copy, however the number of resulting files will also become larger. + Boolean + If true, it will compact several changelog files from the same partition into larger ones, in order to decrease the number of small files.
end-input.watermark
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 66b4c376238f..0d806a5d5889 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -397,16 +397,14 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); - public static final ConfigOption CHANGELOG_COMPACT_PARALLELISM = - key("changelog.compact.parallelism") - .intType() - .noDefaultValue() + public static final ConfigOption CHANGELOG_PRECOMMIT_COMPACT = + key("changelog.precommit-compact") + .booleanType() + .defaultValue(false) .withDescription( - "Compact several changelog files from the same partition into one file, " - + "in order to decrease the number of small files. " - + "This property sets the parallelism of the compact operator. " - + "More parallelism means faster file copy, " - + "however the number of resulting files will also become larger."); + "If true, it will add a changelog compact coordinator and worker operator after the writer operator," + + "in order to compact several changelog files from the same partition into large ones, " + + "which can decrease the number of small files. "); public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java new file mode 100644 index 000000000000..eae5683df9b8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Coordinator operator for compacting changelog files. + * + *

{@link UnawareAppendTableCompactionCoordinator} calculates the file size of changelog files + * contained in all buckets within each partition from {@link Committable} message emitted from + * writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}. + */ +public class ChangelogCompactCoordinateOperator + extends AbstractStreamOperator> + implements OneInputStreamOperator>, + BoundedOneInput { + private final FileStoreTable table; + + private transient long checkpointId; + private transient Map partitionChangelogs; + + public ChangelogCompactCoordinateOperator(FileStoreTable table) { + this.table = table; + } + + @Override + public void open() throws Exception { + super.open(); + + checkpointId = Long.MIN_VALUE; + partitionChangelogs = new HashMap<>(); + } + + public void processElement(StreamRecord record) { + Committable committable = record.getValue(); + checkpointId = Math.max(checkpointId, committable.checkpointId()); + if (committable.kind() != Committable.Kind.FILE) { + output.collect(new StreamRecord<>(Either.Left(record.getValue()))); + return; + } + + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + if (message.newFilesIncrement().changelogFiles().isEmpty() + && message.compactIncrement().changelogFiles().isEmpty()) { + output.collect(new StreamRecord<>(Either.Left(record.getValue()))); + return; + } + + BinaryRow partition = message.partition(); + Integer bucket = message.bucket(); + for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) { + partitionChangelogs + .computeIfAbsent(partition, k -> new PartitionChangelog()) + .addNewChangelogFile(bucket, meta); + PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); + if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { + emitPartitionChanglogCompactTask(partition); + } + } + for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { + partitionChangelogs + .computeIfAbsent(partition, k -> new PartitionChangelog()) + .addCompactChangelogFile(bucket, meta); + PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); + if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { + emitPartitionChanglogCompactTask(partition); + } + } + + CommitMessageImpl newMessage = + new CommitMessageImpl( + message.partition(), + message.bucket(), + new DataIncrement( + message.newFilesIncrement().newFiles(), + message.newFilesIncrement().deletedFiles(), + Collections.emptyList()), + new CompactIncrement( + message.compactIncrement().compactBefore(), + message.compactIncrement().compactAfter(), + Collections.emptyList()), + message.indexIncrement()); + Committable newCommittable = + new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); + output.collect(new StreamRecord<>(Either.Left(newCommittable))); + } + + public void prepareSnapshotPreBarrier(long checkpointId) { + emitAllPartitionsChanglogCompactTask(); + } + + public void endInput() { + emitAllPartitionsChanglogCompactTask(); + } + + private void emitPartitionChanglogCompactTask(BinaryRow partition) { + PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); + output.collect( + new StreamRecord<>( + Either.Right( + new ChangelogCompactTask( + checkpointId, + partition, + partitionChangelog.newFileChangelogFiles, + partitionChangelog.compactChangelogFiles)))); + partitionChangelogs.remove(partition); + } + + private void emitAllPartitionsChanglogCompactTask() { + List partitions = new ArrayList<>(partitionChangelogs.keySet()); + for (BinaryRow partition : partitions) { + emitPartitionChanglogCompactTask(partition); + } + } + + private static class PartitionChangelog { + private long totalFileSize; + private final Map> newFileChangelogFiles; + private final Map> compactChangelogFiles; + + public long totalFileSize() { + return totalFileSize; + } + + public PartitionChangelog() { + totalFileSize = 0; + newFileChangelogFiles = new HashMap<>(); + compactChangelogFiles = new HashMap<>(); + } + + public void addNewChangelogFile(Integer bucket, DataFileMeta file) { + totalFileSize += file.fileSize(); + newFileChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file); + } + + public void addCompactChangelogFile(Integer bucket, DataFileMeta file) { + totalFileSize += file.fileSize(); + compactChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java new file mode 100644 index 000000000000..d75ccea9b45a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.Preconditions; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +/** + * {@link ChangelogCompactTask} to compact several changelog files from the same partition into one + * file, in order to reduce the number of small files. + */ +public class ChangelogCompactTask implements Serializable { + private final long checkpointId; + private final BinaryRow partition; + private final Map> newFileChangelogFiles; + private final Map> compactChangelogFiles; + + private transient OutputStream outputStream; + private final transient List results = new ArrayList<>(); + + public ChangelogCompactTask( + long checkpointId, + BinaryRow partition, + Map> newFileChangelogFiles, + Map> compactChangelogFiles) { + this.checkpointId = checkpointId; + this.partition = partition; + this.newFileChangelogFiles = newFileChangelogFiles; + this.compactChangelogFiles = compactChangelogFiles; + } + + public long checkpointId() { + return checkpointId; + } + + public BinaryRow partition() { + return partition; + } + + public Map> newFileChangelogFiles() { + return newFileChangelogFiles; + } + + public Map> compactChangelogFiles() { + return compactChangelogFiles; + } + + public List doCompact(FileStoreTable table) throws Exception { + FileStorePathFactory pathFactory = table.store().pathFactory(); + + // copy all changelog files to a new big file + for (Map.Entry> entry : newFileChangelogFiles.entrySet()) { + Integer bucket = entry.getKey(); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + for (DataFileMeta meta : entry.getValue()) { + copyFile(table, dataFilePathFactory.toPath(meta.fileName()), bucket, false, meta); + } + } + for (Map.Entry> entry : compactChangelogFiles.entrySet()) { + Integer bucket = entry.getKey(); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + for (DataFileMeta meta : entry.getValue()) { + copyFile(table, dataFilePathFactory.toPath(meta.fileName()), bucket, true, meta); + } + } + outputStream.out.close(); + + return produceNewCommittables(table, pathFactory); + } + + private void copyFile( + FileStoreTable table, Path path, int bucket, boolean isCompactResult, DataFileMeta meta) + throws Exception { + if (outputStream == null) { + Path outputPath = + new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); + outputStream = + new OutputStream(outputPath, table.fileIO().newOutputStream(outputPath, false)); + } + long offset = outputStream.out.getPos(); + try (SeekableInputStream in = table.fileIO().newInputStream(path)) { + IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false); + } + table.fileIO().deleteQuietly(path); + results.add( + new Result( + bucket, isCompactResult, meta, offset, outputStream.out.getPos() - offset)); + } + + private List produceNewCommittables( + FileStoreTable table, FileStorePathFactory pathFactory) throws IOException { + Result baseResult = results.get(0); + Preconditions.checkArgument(baseResult.offset == 0); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, baseResult.bucket); + // see Java docs of `CompactedChangelogFormatReaderFactory` + String realName = + "compacted-changelog-" + + UUID.randomUUID() + + "$" + + baseResult.bucket + + "-" + + baseResult.length; + table.fileIO() + .rename( + outputStream.path, + dataFilePathFactory.toPath( + realName + + "." + + CompactedChangelogReadOnlyFormat.getIdentifier( + baseResult.meta.fileFormat()))); + + List newCommittables = new ArrayList<>(); + + Map> bucketedResults = new HashMap<>(); + for (Result result : results) { + bucketedResults.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result); + } + + for (Map.Entry> entry : bucketedResults.entrySet()) { + List newFilesChangelog = new ArrayList<>(); + List compactChangelog = new ArrayList<>(); + for (Result result : entry.getValue()) { + // see Java docs of `CompactedChangelogFormatReaderFactory` + String name = + (result.offset == 0 + ? realName + : realName + "-" + result.offset + "-" + result.length) + + "." + + CompactedChangelogReadOnlyFormat.getIdentifier( + result.meta.fileFormat()); + if (result.isCompactResult) { + compactChangelog.add(result.meta.rename(name)); + } else { + newFilesChangelog.add(result.meta.rename(name)); + } + } + + CommitMessageImpl newMessage = + new CommitMessageImpl( + partition, + entry.getKey(), + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + newFilesChangelog), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + compactChangelog)); + newCommittables.add(new Committable(checkpointId, Committable.Kind.FILE, newMessage)); + } + return newCommittables; + } + + public int hashCode() { + return Objects.hash(checkpointId, partition, newFileChangelogFiles, compactChangelogFiles); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ChangelogCompactTask that = (ChangelogCompactTask) o; + return checkpointId == that.checkpointId + && Objects.equals(partition, that.partition) + && Objects.equals(newFileChangelogFiles, that.newFileChangelogFiles) + && Objects.equals(compactChangelogFiles, that.compactChangelogFiles); + } + + @Override + public String toString() { + return String.format( + "ChangelogCompactionTask {" + + "partition = %s, " + + "newFileChangelogFiles = %s, " + + "compactChangelogFiles = %s}", + partition, newFileChangelogFiles, compactChangelogFiles); + } + + private static class OutputStream { + + private final Path path; + private final PositionOutputStream out; + + private OutputStream(Path path, PositionOutputStream out) { + this.path = path; + this.out = out; + } + } + + private static class Result { + + private final int bucket; + private final boolean isCompactResult; + private final DataFileMeta meta; + private final long offset; + private final long length; + + private Result( + int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) { + this.bucket = bucket; + this.isCompactResult = isCompactResult; + this.meta = meta; + this.offset = offset; + this.length = length; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java new file mode 100644 index 000000000000..34f9d035d8d2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.io.DataOutputViewStreamWrapper; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.CollectionUtil; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link ChangelogCompactTask}. */ +public class ChangelogCompactTaskSerializer + implements SimpleVersionedSerializer { + private static final int CURRENT_VERSION = 1; + + private final DataFileMetaSerializer dataFileSerializer; + + public ChangelogCompactTaskSerializer() { + this.dataFileSerializer = new DataFileMetaSerializer(); + } + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(ChangelogCompactTask obj) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + serialize(obj, view); + return out.toByteArray(); + } + + @Override + public ChangelogCompactTask deserialize(int version, byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + return deserialize(version, view); + } + + private void serialize(ChangelogCompactTask task, DataOutputView view) throws IOException { + view.writeLong(task.checkpointId()); + serializeBinaryRow(task.partition(), view); + // serialize newFileChangelogFiles map + serializeMap(task.newFileChangelogFiles(), view); + serializeMap(task.compactChangelogFiles(), view); + } + + private ChangelogCompactTask deserialize(int version, DataInputView view) throws IOException { + if (version != getVersion()) { + throw new RuntimeException("Can not deserialize version: " + version); + } + + return new ChangelogCompactTask( + view.readLong(), + deserializeBinaryRow(view), + deserializeMap(view), + deserializeMap(view)); + } + + private void serializeMap(Map> map, DataOutputView view) + throws IOException { + view.writeInt(map.size()); + for (Map.Entry> entry : map.entrySet()) { + view.writeInt(entry.getKey()); + if (entry.getValue() == null) { + throw new IllegalArgumentException( + "serialize error. no value for bucket-" + entry.getKey()); + } + dataFileSerializer.serializeList(entry.getValue(), view); + } + } + + private Map> deserializeMap(DataInputView view) throws IOException { + final int size = view.readInt(); + + final Map> map = + CollectionUtil.newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) { + map.put(view.readInt(), dataFileSerializer.deserializeList(view)); + } + + return map; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java new file mode 100644 index 000000000000..260c25a31561 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; + +import java.util.List; + +/** + * Receive and process the {@link ChangelogCompactTask}s emitted by {@link + * ChangelogCompactCoordinateOperator}. + */ +public class ChangelogCompactWorkerOperator extends AbstractStreamOperator + implements OneInputStreamOperator, Committable> { + private final FileStoreTable table; + + public ChangelogCompactWorkerOperator(FileStoreTable table) { + this.table = table; + } + + public void processElement(StreamRecord> record) + throws Exception { + + if (record.getValue().isLeft()) { + output.collect(new StreamRecord<>(record.getValue().left())); + } else { + ChangelogCompactTask task = record.getValue().right(); + List committables = task.doCompact(table); + committables.forEach(committable -> output.collect(new StreamRecord<>(committable))); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java new file mode 100644 index 000000000000..5cae899a0704 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** Type information for {@link ChangelogCompactTask}. */ +public class ChangelogTaskTypeInfo extends TypeInformation { + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return ChangelogCompactTask.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(ExecutionConfig config) { + // we don't need copy for task + return new NoneCopyVersionedSerializerTypeSerializerProxy( + ChangelogCompactTaskSerializer::new) {}; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof ChangelogTaskTypeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof ChangelogTaskTypeInfo; + } + + @Override + public String toString() { + return "ChangelogCompactionTask"; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index cbf193adfee9..6acb5b77c2a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -21,7 +21,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; -import org.apache.paimon.flink.compact.changelog.ChangelogCompactOperator; +import org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator; +import org.apache.paimon.flink.compact.changelog.ChangelogCompactWorkerOperator; +import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -32,6 +34,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.CheckpointingMode; @@ -55,7 +58,7 @@ import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_COMPACT_PARALLELISM; +import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT; @@ -229,13 +232,19 @@ public DataStream doWrite( declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); } - if (options.contains(CHANGELOG_COMPACT_PARALLELISM)) { + if (options.contains(CHANGELOG_PRECOMMIT_COMPACT)) { written = written.transform( - "Changelog Compactor", + "Changelog Compact Coordinator", + new EitherTypeInfo<>( + new CommittableTypeInfo(), new ChangelogTaskTypeInfo()), + new ChangelogCompactCoordinateOperator(table)) + .forceNonParallel() + .transform( + "Changelog Compact Worker", new CommittableTypeInfo(), - new ChangelogCompactOperator(table)) - .setParallelism(options.get(CHANGELOG_COMPACT_PARALLELISM)); + new ChangelogCompactWorkerOperator(table)) + .setParallelism(written.getParallelism()); } return written; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 879a4a076cb3..3daf9d34de60 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -337,7 +337,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception { @Test @Timeout(120) - public void testChangelogCompact() throws Exception { + public void testChangelogCompactInBatchWrite() throws Exception { TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); String catalogDdl = "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )"; @@ -347,9 +347,9 @@ public void testChangelogCompact() throws Exception { "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) " + "PARTITIONED BY (pt) " + "WITH (" - + " 'bucket' = '2',\n" + + " 'bucket' = '10',\n" + " 'changelog-producer' = 'lookup',\n" - + " 'changelog.compact.parallelism' = '1',\n" + + " 'changelog.precommit-compact' = 'true',\n" + " 'snapshot.num-retained.min' = '3',\n" + " 'snapshot.num-retained.max' = '3'\n" + ")"); @@ -360,7 +360,7 @@ public void testChangelogCompact() throws Exception { sEnv.executeSql("USE CATALOG mycat"); List values = new ArrayList<>(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { values.add(String.format("(0, %d, %d)", i, i)); values.add(String.format("(1, %d, %d)", i, i)); } @@ -371,7 +371,7 @@ public void testChangelogCompact() throws Exception { assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); List expected = new ArrayList<>(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { expected.add(Row.ofKind(RowKind.INSERT, 0, i, i)); expected.add(Row.ofKind(RowKind.INSERT, 1, i, i)); } @@ -380,7 +380,7 @@ public void testChangelogCompact() throws Exception { expected); values.clear(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { values.add(String.format("(0, %d, %d)", i, i + 1)); values.add(String.format("(1, %d, %d)", i, i + 1)); } @@ -389,7 +389,7 @@ public void testChangelogCompact() throws Exception { assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4); assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i)); expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i)); expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1)); @@ -400,7 +400,7 @@ public void testChangelogCompact() throws Exception { expected); values.clear(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { values.add(String.format("(0, %d, %d)", i, i + 2)); values.add(String.format("(1, %d, %d)", i, i + 2)); } @@ -413,8 +413,8 @@ public void testChangelogCompact() throws Exception { assertThat(fileIO.exists(new Path(p))).isFalse(); } - expected = expected.subList(200, 600); - for (int i = 0; i < 100; i++) { + expected = expected.subList(2000, 6000); + for (int i = 0; i < 1000; i++) { expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i + 1)); expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i + 1)); expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 2)); @@ -425,6 +425,81 @@ public void testChangelogCompact() throws Exception { expected); } + @Test + @Timeout(120) + public void testChangelogCompactInStreamWrite() throws Exception { + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(2000) + .parallelism(4) + .build(); + + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + sEnv.executeSql( + "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) " + + "PARTITIONED BY (pt) " + + "WITH (" + + " 'bucket' = '10',\n" + + " 'changelog-producer' = 'lookup',\n" + + " 'changelog.precommit-compact' = 'true'\n" + + ")"); + + Path inputPath = new Path(path, "input"); + LocalFileIO.create().mkdirs(inputPath); + sEnv.executeSql( + "CREATE TABLE `default_catalog`.`default_database`.`s` ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED) " + + "WITH ( 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '" + + inputPath + + "', 'source.monitor-interval' = '500ms' )"); + + sEnv.executeSql("INSERT INTO t SELECT * FROM `default_catalog`.`default_database`.`s`"); + CloseableIterator it = sEnv.executeSql("SELECT * FROM t").collect(); + + // write initial data + List values = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + values.add(String.format("(0, %d, %d)", i, i)); + values.add(String.format("(1, %d, %d)", i, i)); + } + sEnv.executeSql( + "INSERT INTO `default_catalog`.`default_database`.`s` VALUES " + + String.join(", ", values)) + .await(); + + List expected = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + expected.add(Row.ofKind(RowKind.INSERT, 0, i, i)); + expected.add(Row.ofKind(RowKind.INSERT, 1, i, i)); + } + assertStreamingResult(it, expected); + + List compactedChangelogs2 = listAllFilesWithPrefix("compacted-changelog-"); + assertThat(compactedChangelogs2).hasSize(2); + assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); + + // write update data + values.clear(); + for (int i = 0; i < 100; i++) { + values.add(String.format("(0, %d, %d)", i, i + 1)); + values.add(String.format("(1, %d, %d)", i, i + 1)); + } + sEnv.executeSql( + "INSERT INTO `default_catalog`.`default_database`.`s` VALUES " + + String.join(", ", values)) + .await(); + for (int i = 0; i < 100; i++) { + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1)); + } + assertStreamingResult(it, expected.subList(200, 600)); + assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4); + assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); + } + private List listAllFilesWithPrefix(String prefix) throws Exception { try (Stream stream = Files.walk(java.nio.file.Paths.get(path))) { return stream.filter(Files::isRegularFile) @@ -444,6 +519,15 @@ private void assertStreamingResult(TableResult result, List expected) throw assertThat(actual).hasSameElementsAs(expected); } + private void assertStreamingResult(CloseableIterator it, List expected) { + List actual = new ArrayList<>(); + while (actual.size() < expected.size() && it.hasNext()) { + actual.add(it.next()); + } + + assertThat(actual).hasSameElementsAs(expected); + } + // ------------------------------------------------------------------------ // Random Tests // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java new file mode 100644 index 000000000000..906fac850973 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact.changelog; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; +import static org.apache.paimon.stats.StatsTestUtils.newSimpleStats; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ChangelogCompactTaskSerializer}. */ +public class ChangelogCompactTaskSerializerTest { + private final ChangelogCompactTaskSerializer serializer = new ChangelogCompactTaskSerializer(); + + @Test + public void testSerializer() throws Exception { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, 0); + writer.complete(); + + ChangelogCompactTask task = + new ChangelogCompactTask( + 1L, + partition, + new HashMap>() { + { + put(0, newFiles(20)); + put(1, newFiles(20)); + } + }, + new HashMap>() { + { + put(0, newFiles(10)); + put(1, newFiles(10)); + } + }); + ChangelogCompactTask serializeTask = serializer.deserialize(1, serializer.serialize(task)); + assertThat(task).isEqualTo(serializeTask); + } + + private List newFiles(int num) { + List list = new ArrayList<>(); + for (int i = 0; i < num; i++) { + list.add(newFile()); + } + return list; + } + + private DataFileMeta newFile() { + return new DataFileMeta( + UUID.randomUUID().toString(), + 0, + 1, + row(0), + row(0), + newSimpleStats(0, 1), + newSimpleStats(0, 1), + 0, + 1, + 0, + 0, + 0L, + null, + FileSource.APPEND, + null); + } +} From 472490c947944e60aa496225a80b99735c30957b Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 25 Oct 2024 17:24:41 +0800 Subject: [PATCH 3/7] add side output in coordinator for committable message --- .../ChangelogCompactCoordinateOperator.java | 38 +++++++++---------- .../ChangelogCompactWorkerOperator.java | 16 +++----- .../apache/paimon/flink/sink/FlinkSink.java | 23 +++++++---- 3 files changed, 40 insertions(+), 37 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java index eae5683df9b8..68889531ea98 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.types.Either; +import org.apache.flink.util.OutputTag; import java.util.ArrayList; import java.util.Collections; @@ -46,17 +46,18 @@ * contained in all buckets within each partition from {@link Committable} message emitted from * writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}. */ -public class ChangelogCompactCoordinateOperator - extends AbstractStreamOperator> - implements OneInputStreamOperator>, - BoundedOneInput { +public class ChangelogCompactCoordinateOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { private final FileStoreTable table; + private final OutputTag committableOutputTag; private transient long checkpointId; private transient Map partitionChangelogs; - public ChangelogCompactCoordinateOperator(FileStoreTable table) { + public ChangelogCompactCoordinateOperator( + FileStoreTable table, OutputTag committableOutputTag) { this.table = table; + this.committableOutputTag = committableOutputTag; } @Override @@ -71,14 +72,14 @@ public void processElement(StreamRecord record) { Committable committable = record.getValue(); checkpointId = Math.max(checkpointId, committable.checkpointId()); if (committable.kind() != Committable.Kind.FILE) { - output.collect(new StreamRecord<>(Either.Left(record.getValue()))); + output.collect(committableOutputTag, record); return; } CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); if (message.newFilesIncrement().changelogFiles().isEmpty() && message.compactIncrement().changelogFiles().isEmpty()) { - output.collect(new StreamRecord<>(Either.Left(record.getValue()))); + output.collect(committableOutputTag, record); return; } @@ -90,7 +91,7 @@ public void processElement(StreamRecord record) { .addNewChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { - emitPartitionChanglogCompactTask(partition); + emitPartitionChangelogCompactTask(partition); } } for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { @@ -99,7 +100,7 @@ public void processElement(StreamRecord record) { .addCompactChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { - emitPartitionChanglogCompactTask(partition); + emitPartitionChangelogCompactTask(partition); } } @@ -118,7 +119,7 @@ public void processElement(StreamRecord record) { message.indexIncrement()); Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); - output.collect(new StreamRecord<>(Either.Left(newCommittable))); + output.collect(committableOutputTag, new StreamRecord<>(newCommittable)); } public void prepareSnapshotPreBarrier(long checkpointId) { @@ -129,23 +130,22 @@ public void endInput() { emitAllPartitionsChanglogCompactTask(); } - private void emitPartitionChanglogCompactTask(BinaryRow partition) { + private void emitPartitionChangelogCompactTask(BinaryRow partition) { PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); output.collect( new StreamRecord<>( - Either.Right( - new ChangelogCompactTask( - checkpointId, - partition, - partitionChangelog.newFileChangelogFiles, - partitionChangelog.compactChangelogFiles)))); + new ChangelogCompactTask( + checkpointId, + partition, + partitionChangelog.newFileChangelogFiles, + partitionChangelog.compactChangelogFiles))); partitionChangelogs.remove(partition); } private void emitAllPartitionsChanglogCompactTask() { List partitions = new ArrayList<>(partitionChangelogs.keySet()); for (BinaryRow partition : partitions) { - emitPartitionChanglogCompactTask(partition); + emitPartitionChangelogCompactTask(partition); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java index 260c25a31561..d180e6215cdb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java @@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.types.Either; import java.util.List; @@ -33,22 +32,17 @@ * ChangelogCompactCoordinateOperator}. */ public class ChangelogCompactWorkerOperator extends AbstractStreamOperator - implements OneInputStreamOperator, Committable> { + implements OneInputStreamOperator { private final FileStoreTable table; public ChangelogCompactWorkerOperator(FileStoreTable table) { this.table = table; } - public void processElement(StreamRecord> record) - throws Exception { + public void processElement(StreamRecord record) throws Exception { - if (record.getValue().isLeft()) { - output.collect(new StreamRecord<>(record.getValue().left())); - } else { - ChangelogCompactTask task = record.getValue().right(); - List committables = task.doCompact(table); - committables.forEach(committable -> output.collect(new StreamRecord<>(committable))); - } + ChangelogCompactTask task = record.getValue(); + List committables = task.doCompact(table); + committables.forEach(committable -> output.collect(new StreamRecord<>(committable))); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 6acb5b77c2a9..797bdd6a511c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; import org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator; +import org.apache.paimon.flink.compact.changelog.ChangelogCompactTask; import org.apache.paimon.flink.compact.changelog.ChangelogCompactWorkerOperator; import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo; import org.apache.paimon.manifest.ManifestCommittable; @@ -34,7 +35,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.CheckpointingMode; @@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.util.OutputTag; import javax.annotation.Nullable; @@ -213,7 +214,7 @@ public DataStream doWrite( boolean isStreaming = isStreaming(input); boolean writeOnly = table.coreOptions().writeOnly(); - SingleOutputStreamOperator written = + DataStream written = input.transform( (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " : " @@ -233,18 +234,26 @@ public DataStream doWrite( } if (options.contains(CHANGELOG_PRECOMMIT_COMPACT)) { - written = + final OutputTag committableOutputTag = + new OutputTag( + "coordinator-output-committables", new CommittableTypeInfo()) {}; + SingleOutputStreamOperator coordinatorStream = written.transform( "Changelog Compact Coordinator", - new EitherTypeInfo<>( - new CommittableTypeInfo(), new ChangelogTaskTypeInfo()), - new ChangelogCompactCoordinateOperator(table)) - .forceNonParallel() + new ChangelogTaskTypeInfo(), + new ChangelogCompactCoordinateOperator( + table, committableOutputTag)) + .forceNonParallel(); + DataStream committableStream = + coordinatorStream.getSideOutput(committableOutputTag); + DataStream workerCommittablesStream = + coordinatorStream .transform( "Changelog Compact Worker", new CommittableTypeInfo(), new ChangelogCompactWorkerOperator(table)) .setParallelism(written.getParallelism()); + written = committableStream.union(workerCommittablesStream); } return written; From 7f574af524d4822331b6bb082bd9732df90858a6 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Sun, 27 Oct 2024 19:12:47 +0800 Subject: [PATCH 4/7] fix docs build failure --- .../shortcodes/generated/flink_connector_configuration.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index f7c078c55f74..41e23a16af37 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -28,9 +28,9 @@

changelog.precommit-compact
- (none) + false Boolean - If true, it will compact several changelog files from the same partition into larger ones, in order to decrease the number of small files. + If true, it will add a changelog compact coordinator and worker operator after the writer operator,in order to compact several changelog files from the same partition into large ones, which can decrease the number of small files.
end-input.watermark
From f4ff2a300206fd2884683a46bd33107644936df1 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 28 Oct 2024 15:26:32 +0800 Subject: [PATCH 5/7] Revert "add side output in coordinator for committable message" --- .../ChangelogCompactCoordinateOperator.java | 38 +++++++++---------- .../ChangelogCompactWorkerOperator.java | 16 +++++--- .../apache/paimon/flink/sink/FlinkSink.java | 23 ++++------- 3 files changed, 37 insertions(+), 40 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java index 68889531ea98..eae5683df9b8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.OutputTag; +import org.apache.flink.types.Either; import java.util.ArrayList; import java.util.Collections; @@ -46,18 +46,17 @@ * contained in all buckets within each partition from {@link Committable} message emitted from * writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}. */ -public class ChangelogCompactCoordinateOperator extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { +public class ChangelogCompactCoordinateOperator + extends AbstractStreamOperator> + implements OneInputStreamOperator>, + BoundedOneInput { private final FileStoreTable table; - private final OutputTag committableOutputTag; private transient long checkpointId; private transient Map partitionChangelogs; - public ChangelogCompactCoordinateOperator( - FileStoreTable table, OutputTag committableOutputTag) { + public ChangelogCompactCoordinateOperator(FileStoreTable table) { this.table = table; - this.committableOutputTag = committableOutputTag; } @Override @@ -72,14 +71,14 @@ public void processElement(StreamRecord record) { Committable committable = record.getValue(); checkpointId = Math.max(checkpointId, committable.checkpointId()); if (committable.kind() != Committable.Kind.FILE) { - output.collect(committableOutputTag, record); + output.collect(new StreamRecord<>(Either.Left(record.getValue()))); return; } CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); if (message.newFilesIncrement().changelogFiles().isEmpty() && message.compactIncrement().changelogFiles().isEmpty()) { - output.collect(committableOutputTag, record); + output.collect(new StreamRecord<>(Either.Left(record.getValue()))); return; } @@ -91,7 +90,7 @@ public void processElement(StreamRecord record) { .addNewChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { - emitPartitionChangelogCompactTask(partition); + emitPartitionChanglogCompactTask(partition); } } for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { @@ -100,7 +99,7 @@ public void processElement(StreamRecord record) { .addCompactChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { - emitPartitionChangelogCompactTask(partition); + emitPartitionChanglogCompactTask(partition); } } @@ -119,7 +118,7 @@ public void processElement(StreamRecord record) { message.indexIncrement()); Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); - output.collect(committableOutputTag, new StreamRecord<>(newCommittable)); + output.collect(new StreamRecord<>(Either.Left(newCommittable))); } public void prepareSnapshotPreBarrier(long checkpointId) { @@ -130,22 +129,23 @@ public void endInput() { emitAllPartitionsChanglogCompactTask(); } - private void emitPartitionChangelogCompactTask(BinaryRow partition) { + private void emitPartitionChanglogCompactTask(BinaryRow partition) { PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); output.collect( new StreamRecord<>( - new ChangelogCompactTask( - checkpointId, - partition, - partitionChangelog.newFileChangelogFiles, - partitionChangelog.compactChangelogFiles))); + Either.Right( + new ChangelogCompactTask( + checkpointId, + partition, + partitionChangelog.newFileChangelogFiles, + partitionChangelog.compactChangelogFiles)))); partitionChangelogs.remove(partition); } private void emitAllPartitionsChanglogCompactTask() { List partitions = new ArrayList<>(partitionChangelogs.keySet()); for (BinaryRow partition : partitions) { - emitPartitionChangelogCompactTask(partition); + emitPartitionChanglogCompactTask(partition); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java index d180e6215cdb..260c25a31561 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import java.util.List; @@ -32,17 +33,22 @@ * ChangelogCompactCoordinateOperator}. */ public class ChangelogCompactWorkerOperator extends AbstractStreamOperator - implements OneInputStreamOperator { + implements OneInputStreamOperator, Committable> { private final FileStoreTable table; public ChangelogCompactWorkerOperator(FileStoreTable table) { this.table = table; } - public void processElement(StreamRecord record) throws Exception { + public void processElement(StreamRecord> record) + throws Exception { - ChangelogCompactTask task = record.getValue(); - List committables = task.doCompact(table); - committables.forEach(committable -> output.collect(new StreamRecord<>(committable))); + if (record.getValue().isLeft()) { + output.collect(new StreamRecord<>(record.getValue().left())); + } else { + ChangelogCompactTask task = record.getValue().right(); + List committables = task.doCompact(table); + committables.forEach(committable -> output.collect(new StreamRecord<>(committable))); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 797bdd6a511c..6acb5b77c2a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; import org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator; -import org.apache.paimon.flink.compact.changelog.ChangelogCompactTask; import org.apache.paimon.flink.compact.changelog.ChangelogCompactWorkerOperator; import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo; import org.apache.paimon.manifest.ManifestCommittable; @@ -35,6 +34,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.CheckpointingMode; @@ -47,7 +47,6 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.util.OutputTag; import javax.annotation.Nullable; @@ -214,7 +213,7 @@ public DataStream doWrite( boolean isStreaming = isStreaming(input); boolean writeOnly = table.coreOptions().writeOnly(); - DataStream written = + SingleOutputStreamOperator written = input.transform( (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " : " @@ -234,26 +233,18 @@ public DataStream doWrite( } if (options.contains(CHANGELOG_PRECOMMIT_COMPACT)) { - final OutputTag committableOutputTag = - new OutputTag( - "coordinator-output-committables", new CommittableTypeInfo()) {}; - SingleOutputStreamOperator coordinatorStream = + written = written.transform( "Changelog Compact Coordinator", - new ChangelogTaskTypeInfo(), - new ChangelogCompactCoordinateOperator( - table, committableOutputTag)) - .forceNonParallel(); - DataStream committableStream = - coordinatorStream.getSideOutput(committableOutputTag); - DataStream workerCommittablesStream = - coordinatorStream + new EitherTypeInfo<>( + new CommittableTypeInfo(), new ChangelogTaskTypeInfo()), + new ChangelogCompactCoordinateOperator(table)) + .forceNonParallel() .transform( "Changelog Compact Worker", new CommittableTypeInfo(), new ChangelogCompactWorkerOperator(table)) .setParallelism(written.getParallelism()); - written = committableStream.union(workerCommittablesStream); } return written; From 660b35182fdb0a87b03157f3ac1815e1d7f2fc44 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 28 Oct 2024 15:49:36 +0800 Subject: [PATCH 6/7] delete unused ChangelogCompactOperator --- .../changelog/ChangelogCompactOperator.java | 299 ------------------ 1 file changed, 299 deletions(-) delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java deleted file mode 100644 index 973fd5c647c9..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactOperator.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.compact.changelog; - -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat; -import org.apache.paimon.flink.sink.Committable; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.PositionOutputStream; -import org.apache.paimon.fs.SeekableInputStream; -import org.apache.paimon.io.CompactIncrement; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.DataFilePathFactory; -import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.CommitMessageImpl; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.Preconditions; - -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -/** - * Operator to compact several changelog files from the same partition into one file, in order to - * reduce the number of small files. - */ -public class ChangelogCompactOperator extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { - - private final FileStoreTable table; - - private transient FileStorePathFactory pathFactory; - private transient long checkpointId; - private transient Map outputStreams; - private transient Map> results; - - public ChangelogCompactOperator(FileStoreTable table) { - this.table = table; - } - - @Override - public void open() throws Exception { - super.open(); - - pathFactory = table.store().pathFactory(); - checkpointId = Long.MIN_VALUE; - outputStreams = new HashMap<>(); - results = new HashMap<>(); - } - - @Override - public void processElement(StreamRecord record) throws Exception { - Committable committable = record.getValue(); - checkpointId = Math.max(checkpointId, committable.checkpointId()); - if (committable.kind() != Committable.Kind.FILE) { - output.collect(record); - return; - } - - CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); - if (message.newFilesIncrement().changelogFiles().isEmpty() - && message.compactIncrement().changelogFiles().isEmpty()) { - output.collect(record); - return; - } - - // copy changelogs from the same partition into one file - DataFilePathFactory dataFilePathFactory = - pathFactory.createDataFilePathFactory(message.partition(), message.bucket()); - for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) { - copyFile( - dataFilePathFactory.toPath(meta.fileName()), - message.partition(), - message.bucket(), - false, - meta); - } - for (DataFileMeta meta : message.compactIncrement().changelogFiles()) { - copyFile( - dataFilePathFactory.toPath(meta.fileName()), - message.partition(), - message.bucket(), - true, - meta); - } - - // send commit message without changelog files - CommitMessageImpl newMessage = - new CommitMessageImpl( - message.partition(), - message.bucket(), - new DataIncrement( - message.newFilesIncrement().newFiles(), - message.newFilesIncrement().deletedFiles(), - Collections.emptyList()), - new CompactIncrement( - message.compactIncrement().compactBefore(), - message.compactIncrement().compactAfter(), - Collections.emptyList()), - message.indexIncrement()); - Committable newCommittable = - new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); - if (record.hasTimestamp()) { - output.collect(new StreamRecord<>(newCommittable, record.getTimestamp())); - } else { - output.collect(new StreamRecord<>(newCommittable)); - } - } - - private void copyFile( - Path path, BinaryRow partition, int bucket, boolean isCompactResult, DataFileMeta meta) - throws Exception { - if (!outputStreams.containsKey(partition)) { - Path outputPath = - new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); - outputStreams.put( - partition, - new OutputStream( - outputPath, table.fileIO().newOutputStream(outputPath, false))); - } - - OutputStream outputStream = outputStreams.get(partition); - long offset = outputStream.out.getPos(); - try (SeekableInputStream in = table.fileIO().newInputStream(path)) { - IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false); - } - table.fileIO().deleteQuietly(path); - results.computeIfAbsent(partition, p -> new ArrayList<>()) - .add( - new Result( - bucket, - isCompactResult, - meta, - offset, - outputStream.out.getPos() - offset)); - - if (outputStream.out.getPos() >= table.coreOptions().targetFileSize(false)) { - flushPartition(partition); - } - } - - @Override - public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - flushAllPartitions(); - } - - @Override - public void endInput() throws Exception { - flushAllPartitions(); - } - - private void flushAllPartitions() throws Exception { - List partitions = new ArrayList<>(outputStreams.keySet()); - for (BinaryRow partition : partitions) { - flushPartition(partition); - } - } - - private void flushPartition(BinaryRow partition) throws Exception { - OutputStream outputStream = outputStreams.get(partition); - outputStream.out.close(); - - Result baseResult = results.get(partition).get(0); - Preconditions.checkArgument(baseResult.offset == 0); - DataFilePathFactory dataFilePathFactory = - pathFactory.createDataFilePathFactory(partition, baseResult.bucket); - // see Java docs of `CompactedChangelogFormatReaderFactory` - String realName = - "compacted-changelog-" - + UUID.randomUUID() - + "$" - + baseResult.bucket - + "-" - + baseResult.length; - table.fileIO() - .rename( - outputStream.path, - dataFilePathFactory.toPath( - realName - + "." - + CompactedChangelogReadOnlyFormat.getIdentifier( - baseResult.meta.fileFormat()))); - - Map> grouped = new HashMap<>(); - for (Result result : results.get(partition)) { - grouped.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result); - } - - for (Map.Entry> entry : grouped.entrySet()) { - List newFilesChangelog = new ArrayList<>(); - List compactChangelog = new ArrayList<>(); - for (Result result : entry.getValue()) { - // see Java docs of `CompactedChangelogFormatReaderFactory` - String name = - (result.offset == 0 - ? realName - : realName + "-" + result.offset + "-" + result.length) - + "." - + CompactedChangelogReadOnlyFormat.getIdentifier( - result.meta.fileFormat()); - if (result.isCompactResult) { - compactChangelog.add(result.meta.rename(name)); - } else { - newFilesChangelog.add(result.meta.rename(name)); - } - } - - CommitMessageImpl newMessage = - new CommitMessageImpl( - partition, - entry.getKey(), - new DataIncrement( - Collections.emptyList(), - Collections.emptyList(), - newFilesChangelog), - new CompactIncrement( - Collections.emptyList(), - Collections.emptyList(), - compactChangelog)); - Committable newCommittable = - new Committable(checkpointId, Committable.Kind.FILE, newMessage); - output.collect(new StreamRecord<>(newCommittable)); - } - - outputStreams.remove(partition); - results.remove(partition); - } - - @Override - public void close() throws Exception { - for (Map.Entry entry : outputStreams.entrySet()) { - OutputStream outputStream = entry.getValue(); - try { - outputStream.out.close(); - } catch (Exception e) { - LOG.warn("Failed to close output stream for file " + outputStream.path, e); - } - table.fileIO().deleteQuietly(outputStream.path); - } - - outputStreams.clear(); - results.clear(); - } - - private static class OutputStream { - - private final Path path; - private final PositionOutputStream out; - - private OutputStream(Path path, PositionOutputStream out) { - this.path = path; - this.out = out; - } - } - - private static class Result { - - private final int bucket; - private final boolean isCompactResult; - private final DataFileMeta meta; - private final long offset; - private final long length; - - private Result( - int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) { - this.bucket = bucket; - this.isCompactResult = isCompactResult; - this.meta = meta; - this.offset = offset; - this.length = length; - } - } -} From 53019bb0bfbcb239a368291363ed59b0d9b3bde6 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Wed, 30 Oct 2024 19:44:06 +0800 Subject: [PATCH 7/7] some fix according to comments --- .../ChangelogCompactCoordinateOperator.java | 12 ++-- .../changelog/ChangelogCompactTask.java | 60 ++++++++++++++----- .../flink/PrimaryKeyFileStoreTableITCase.java | 25 ++++---- 3 files changed, 60 insertions(+), 37 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java index eae5683df9b8..cd0b8716a7d1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.compact.changelog; -import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.io.CompactIncrement; @@ -42,7 +41,7 @@ /** * Coordinator operator for compacting changelog files. * - *

{@link UnawareAppendTableCompactionCoordinator} calculates the file size of changelog files + *

{@link ChangelogCompactCoordinateOperator} calculates the file size of changelog files * contained in all buckets within each partition from {@link Committable} message emitted from * writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}. */ @@ -84,12 +83,13 @@ public void processElement(StreamRecord record) { BinaryRow partition = message.partition(); Integer bucket = message.bucket(); + long targetFileSize = table.coreOptions().targetFileSize(false); for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) { partitionChangelogs .computeIfAbsent(partition, k -> new PartitionChangelog()) .addNewChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); - if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { + if (partitionChangelog.totalFileSize >= targetFileSize) { emitPartitionChanglogCompactTask(partition); } } @@ -98,7 +98,7 @@ public void processElement(StreamRecord record) { .computeIfAbsent(partition, k -> new PartitionChangelog()) .addCompactChangelogFile(bucket, meta); PartitionChangelog partitionChangelog = partitionChangelogs.get(partition); - if (partitionChangelog.totalFileSize >= table.coreOptions().targetFileSize(false)) { + if (partitionChangelog.totalFileSize >= targetFileSize) { emitPartitionChanglogCompactTask(partition); } } @@ -154,10 +154,6 @@ private static class PartitionChangelog { private final Map> newFileChangelogFiles; private final Map> compactChangelogFiles; - public long totalFileSize() { - return totalFileSize; - } - public PartitionChangelog() { totalFileSize = 0; newFileChangelogFiles = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index d75ccea9b45a..6b95e369074b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -54,9 +54,6 @@ public class ChangelogCompactTask implements Serializable { private final Map> newFileChangelogFiles; private final Map> compactChangelogFiles; - private transient OutputStream outputStream; - private final transient List results = new ArrayList<>(); - public ChangelogCompactTask( long checkpointId, BinaryRow partition, @@ -86,14 +83,23 @@ public Map> compactChangelogFiles() { public List doCompact(FileStoreTable table) throws Exception { FileStorePathFactory pathFactory = table.store().pathFactory(); + OutputStream outputStream = new OutputStream(); + List results = new ArrayList<>(); // copy all changelog files to a new big file for (Map.Entry> entry : newFileChangelogFiles.entrySet()) { - Integer bucket = entry.getKey(); + int bucket = entry.getKey(); DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(partition, bucket); for (DataFileMeta meta : entry.getValue()) { - copyFile(table, dataFilePathFactory.toPath(meta.fileName()), bucket, false, meta); + copyFile( + outputStream, + results, + table, + dataFilePathFactory.toPath(meta.fileName()), + bucket, + false, + meta); } } for (Map.Entry> entry : compactChangelogFiles.entrySet()) { @@ -101,22 +107,34 @@ public List doCompact(FileStoreTable table) throws Exception { DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(partition, bucket); for (DataFileMeta meta : entry.getValue()) { - copyFile(table, dataFilePathFactory.toPath(meta.fileName()), bucket, true, meta); + copyFile( + outputStream, + results, + table, + dataFilePathFactory.toPath(meta.fileName()), + bucket, + true, + meta); } } outputStream.out.close(); - return produceNewCommittables(table, pathFactory); + return produceNewCommittables(results, table, pathFactory, outputStream.path); } private void copyFile( - FileStoreTable table, Path path, int bucket, boolean isCompactResult, DataFileMeta meta) + OutputStream outputStream, + List results, + FileStoreTable table, + Path path, + int bucket, + boolean isCompactResult, + DataFileMeta meta) throws Exception { - if (outputStream == null) { + if (!outputStream.isInitialized) { Path outputPath = new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID()); - outputStream = - new OutputStream(outputPath, table.fileIO().newOutputStream(outputPath, false)); + outputStream.init(outputPath, table.fileIO().newOutputStream(outputPath, false)); } long offset = outputStream.out.getPos(); try (SeekableInputStream in = table.fileIO().newInputStream(path)) { @@ -129,7 +147,11 @@ private void copyFile( } private List produceNewCommittables( - FileStoreTable table, FileStorePathFactory pathFactory) throws IOException { + List results, + FileStoreTable table, + FileStorePathFactory pathFactory, + Path changelogTempPath) + throws IOException { Result baseResult = results.get(0); Preconditions.checkArgument(baseResult.offset == 0); DataFilePathFactory dataFilePathFactory = @@ -144,7 +166,7 @@ private List produceNewCommittables( + baseResult.length; table.fileIO() .rename( - outputStream.path, + changelogTempPath, dataFilePathFactory.toPath( realName + "." @@ -226,12 +248,18 @@ public String toString() { private static class OutputStream { - private final Path path; - private final PositionOutputStream out; + private Path path; + private PositionOutputStream out; + private boolean isInitialized; + + private OutputStream() { + this.isInitialized = false; + } - private OutputStream(Path path, PositionOutputStream out) { + private void init(Path path, PositionOutputStream out) { this.path = path; this.out = out; + this.isInitialized = true; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 3daf9d34de60..9a9abd8f8d7e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -479,7 +479,7 @@ public void testChangelogCompactInStreamWrite() throws Exception { assertThat(compactedChangelogs2).hasSize(2); assertThat(listAllFilesWithPrefix("changelog-")).isEmpty(); - // write update data + // write update data values.clear(); for (int i = 0; i < 100; i++) { values.add(String.format("(0, %d, %d)", i, i + 1)); @@ -681,18 +681,17 @@ private void testLookupChangelogProducerRandom( tEnv, numProducers, enableFailure, - "'bucket' = '4'," - + String.format( - "'bucket' = '4', " - + "'writer-buffer-size' = '%s', " - + "'changelog-producer' = 'lookup', " - + "'lookup-wait' = '%s', " - + "'deletion-vectors.enabled' = '%s', " - + "'changelog.compact.parallelism' = '%s'", - random.nextBoolean() ? "512kb" : "1mb", - random.nextBoolean(), - enableDeletionVectors, - random.nextInt(1, 3))); + String.format( + "'bucket' = '4', " + + "'writer-buffer-size' = '%s', " + + "'changelog-producer' = 'lookup', " + + "'lookup-wait' = '%s', " + + "'deletion-vectors.enabled' = '%s', " + + "'changelog.precommit-compact' = '%s'", + random.nextBoolean() ? "512kb" : "1mb", + random.nextBoolean(), + enableDeletionVectors, + random.nextBoolean())); // sleep for a random amount of time to check // if we can first read complete records then read incremental records correctly