> list = new ArrayList<>(fields.length);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
new file mode 100644
index 000000000000..cd0b8716a7d1
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Coordinator operator for compacting changelog files.
+ *
+ * {@link ChangelogCompactCoordinateOperator} calculates the file size of changelog files
+ * contained in all buckets within each partition from {@link Committable} message emitted from
+ * writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}.
+ */
+public class ChangelogCompactCoordinateOperator
+ extends AbstractStreamOperator>
+ implements OneInputStreamOperator>,
+ BoundedOneInput {
+ private final FileStoreTable table;
+
+ private transient long checkpointId;
+ private transient Map partitionChangelogs;
+
+ public ChangelogCompactCoordinateOperator(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ checkpointId = Long.MIN_VALUE;
+ partitionChangelogs = new HashMap<>();
+ }
+
+ public void processElement(StreamRecord record) {
+ Committable committable = record.getValue();
+ checkpointId = Math.max(checkpointId, committable.checkpointId());
+ if (committable.kind() != Committable.Kind.FILE) {
+ output.collect(new StreamRecord<>(Either.Left(record.getValue())));
+ return;
+ }
+
+ CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable();
+ if (message.newFilesIncrement().changelogFiles().isEmpty()
+ && message.compactIncrement().changelogFiles().isEmpty()) {
+ output.collect(new StreamRecord<>(Either.Left(record.getValue())));
+ return;
+ }
+
+ BinaryRow partition = message.partition();
+ Integer bucket = message.bucket();
+ long targetFileSize = table.coreOptions().targetFileSize(false);
+ for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
+ partitionChangelogs
+ .computeIfAbsent(partition, k -> new PartitionChangelog())
+ .addNewChangelogFile(bucket, meta);
+ PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
+ if (partitionChangelog.totalFileSize >= targetFileSize) {
+ emitPartitionChanglogCompactTask(partition);
+ }
+ }
+ for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
+ partitionChangelogs
+ .computeIfAbsent(partition, k -> new PartitionChangelog())
+ .addCompactChangelogFile(bucket, meta);
+ PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
+ if (partitionChangelog.totalFileSize >= targetFileSize) {
+ emitPartitionChanglogCompactTask(partition);
+ }
+ }
+
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ message.partition(),
+ message.bucket(),
+ new DataIncrement(
+ message.newFilesIncrement().newFiles(),
+ message.newFilesIncrement().deletedFiles(),
+ Collections.emptyList()),
+ new CompactIncrement(
+ message.compactIncrement().compactBefore(),
+ message.compactIncrement().compactAfter(),
+ Collections.emptyList()),
+ message.indexIncrement());
+ Committable newCommittable =
+ new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
+ output.collect(new StreamRecord<>(Either.Left(newCommittable)));
+ }
+
+ public void prepareSnapshotPreBarrier(long checkpointId) {
+ emitAllPartitionsChanglogCompactTask();
+ }
+
+ public void endInput() {
+ emitAllPartitionsChanglogCompactTask();
+ }
+
+ private void emitPartitionChanglogCompactTask(BinaryRow partition) {
+ PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
+ output.collect(
+ new StreamRecord<>(
+ Either.Right(
+ new ChangelogCompactTask(
+ checkpointId,
+ partition,
+ partitionChangelog.newFileChangelogFiles,
+ partitionChangelog.compactChangelogFiles))));
+ partitionChangelogs.remove(partition);
+ }
+
+ private void emitAllPartitionsChanglogCompactTask() {
+ List partitions = new ArrayList<>(partitionChangelogs.keySet());
+ for (BinaryRow partition : partitions) {
+ emitPartitionChanglogCompactTask(partition);
+ }
+ }
+
+ private static class PartitionChangelog {
+ private long totalFileSize;
+ private final Map> newFileChangelogFiles;
+ private final Map> compactChangelogFiles;
+
+ public PartitionChangelog() {
+ totalFileSize = 0;
+ newFileChangelogFiles = new HashMap<>();
+ compactChangelogFiles = new HashMap<>();
+ }
+
+ public void addNewChangelogFile(Integer bucket, DataFileMeta file) {
+ totalFileSize += file.fileSize();
+ newFileChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file);
+ }
+
+ public void addCompactChangelogFile(Integer bucket, DataFileMeta file) {
+ totalFileSize += file.fileSize();
+ compactChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file);
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
new file mode 100644
index 000000000000..6b95e369074b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * {@link ChangelogCompactTask} to compact several changelog files from the same partition into one
+ * file, in order to reduce the number of small files.
+ */
+public class ChangelogCompactTask implements Serializable {
+ private final long checkpointId;
+ private final BinaryRow partition;
+ private final Map> newFileChangelogFiles;
+ private final Map> compactChangelogFiles;
+
+ public ChangelogCompactTask(
+ long checkpointId,
+ BinaryRow partition,
+ Map> newFileChangelogFiles,
+ Map> compactChangelogFiles) {
+ this.checkpointId = checkpointId;
+ this.partition = partition;
+ this.newFileChangelogFiles = newFileChangelogFiles;
+ this.compactChangelogFiles = compactChangelogFiles;
+ }
+
+ public long checkpointId() {
+ return checkpointId;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public Map> newFileChangelogFiles() {
+ return newFileChangelogFiles;
+ }
+
+ public Map> compactChangelogFiles() {
+ return compactChangelogFiles;
+ }
+
+ public List doCompact(FileStoreTable table) throws Exception {
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ OutputStream outputStream = new OutputStream();
+ List results = new ArrayList<>();
+
+ // copy all changelog files to a new big file
+ for (Map.Entry> entry : newFileChangelogFiles.entrySet()) {
+ int bucket = entry.getKey();
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(partition, bucket);
+ for (DataFileMeta meta : entry.getValue()) {
+ copyFile(
+ outputStream,
+ results,
+ table,
+ dataFilePathFactory.toPath(meta.fileName()),
+ bucket,
+ false,
+ meta);
+ }
+ }
+ for (Map.Entry> entry : compactChangelogFiles.entrySet()) {
+ Integer bucket = entry.getKey();
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(partition, bucket);
+ for (DataFileMeta meta : entry.getValue()) {
+ copyFile(
+ outputStream,
+ results,
+ table,
+ dataFilePathFactory.toPath(meta.fileName()),
+ bucket,
+ true,
+ meta);
+ }
+ }
+ outputStream.out.close();
+
+ return produceNewCommittables(results, table, pathFactory, outputStream.path);
+ }
+
+ private void copyFile(
+ OutputStream outputStream,
+ List results,
+ FileStoreTable table,
+ Path path,
+ int bucket,
+ boolean isCompactResult,
+ DataFileMeta meta)
+ throws Exception {
+ if (!outputStream.isInitialized) {
+ Path outputPath =
+ new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID());
+ outputStream.init(outputPath, table.fileIO().newOutputStream(outputPath, false));
+ }
+ long offset = outputStream.out.getPos();
+ try (SeekableInputStream in = table.fileIO().newInputStream(path)) {
+ IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false);
+ }
+ table.fileIO().deleteQuietly(path);
+ results.add(
+ new Result(
+ bucket, isCompactResult, meta, offset, outputStream.out.getPos() - offset));
+ }
+
+ private List produceNewCommittables(
+ List results,
+ FileStoreTable table,
+ FileStorePathFactory pathFactory,
+ Path changelogTempPath)
+ throws IOException {
+ Result baseResult = results.get(0);
+ Preconditions.checkArgument(baseResult.offset == 0);
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(partition, baseResult.bucket);
+ // see Java docs of `CompactedChangelogFormatReaderFactory`
+ String realName =
+ "compacted-changelog-"
+ + UUID.randomUUID()
+ + "$"
+ + baseResult.bucket
+ + "-"
+ + baseResult.length;
+ table.fileIO()
+ .rename(
+ changelogTempPath,
+ dataFilePathFactory.toPath(
+ realName
+ + "."
+ + CompactedChangelogReadOnlyFormat.getIdentifier(
+ baseResult.meta.fileFormat())));
+
+ List newCommittables = new ArrayList<>();
+
+ Map> bucketedResults = new HashMap<>();
+ for (Result result : results) {
+ bucketedResults.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result);
+ }
+
+ for (Map.Entry> entry : bucketedResults.entrySet()) {
+ List newFilesChangelog = new ArrayList<>();
+ List compactChangelog = new ArrayList<>();
+ for (Result result : entry.getValue()) {
+ // see Java docs of `CompactedChangelogFormatReaderFactory`
+ String name =
+ (result.offset == 0
+ ? realName
+ : realName + "-" + result.offset + "-" + result.length)
+ + "."
+ + CompactedChangelogReadOnlyFormat.getIdentifier(
+ result.meta.fileFormat());
+ if (result.isCompactResult) {
+ compactChangelog.add(result.meta.rename(name));
+ } else {
+ newFilesChangelog.add(result.meta.rename(name));
+ }
+ }
+
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ partition,
+ entry.getKey(),
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ newFilesChangelog),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ compactChangelog));
+ newCommittables.add(new Committable(checkpointId, Committable.Kind.FILE, newMessage));
+ }
+ return newCommittables;
+ }
+
+ public int hashCode() {
+ return Objects.hash(checkpointId, partition, newFileChangelogFiles, compactChangelogFiles);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ChangelogCompactTask that = (ChangelogCompactTask) o;
+ return checkpointId == that.checkpointId
+ && Objects.equals(partition, that.partition)
+ && Objects.equals(newFileChangelogFiles, that.newFileChangelogFiles)
+ && Objects.equals(compactChangelogFiles, that.compactChangelogFiles);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ChangelogCompactionTask {"
+ + "partition = %s, "
+ + "newFileChangelogFiles = %s, "
+ + "compactChangelogFiles = %s}",
+ partition, newFileChangelogFiles, compactChangelogFiles);
+ }
+
+ private static class OutputStream {
+
+ private Path path;
+ private PositionOutputStream out;
+ private boolean isInitialized;
+
+ private OutputStream() {
+ this.isInitialized = false;
+ }
+
+ private void init(Path path, PositionOutputStream out) {
+ this.path = path;
+ this.out = out;
+ this.isInitialized = true;
+ }
+ }
+
+ private static class Result {
+
+ private final int bucket;
+ private final boolean isCompactResult;
+ private final DataFileMeta meta;
+ private final long offset;
+ private final long length;
+
+ private Result(
+ int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) {
+ this.bucket = bucket;
+ this.isCompactResult = isCompactResult;
+ this.meta = meta;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
new file mode 100644
index 000000000000..34f9d035d8d2
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.CollectionUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Serializer for {@link ChangelogCompactTask}. */
+public class ChangelogCompactTaskSerializer
+ implements SimpleVersionedSerializer {
+ private static final int CURRENT_VERSION = 1;
+
+ private final DataFileMetaSerializer dataFileSerializer;
+
+ public ChangelogCompactTaskSerializer() {
+ this.dataFileSerializer = new DataFileMetaSerializer();
+ }
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(ChangelogCompactTask obj) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+ serialize(obj, view);
+ return out.toByteArray();
+ }
+
+ @Override
+ public ChangelogCompactTask deserialize(int version, byte[] serialized) throws IOException {
+ DataInputDeserializer view = new DataInputDeserializer(serialized);
+ return deserialize(version, view);
+ }
+
+ private void serialize(ChangelogCompactTask task, DataOutputView view) throws IOException {
+ view.writeLong(task.checkpointId());
+ serializeBinaryRow(task.partition(), view);
+ // serialize newFileChangelogFiles map
+ serializeMap(task.newFileChangelogFiles(), view);
+ serializeMap(task.compactChangelogFiles(), view);
+ }
+
+ private ChangelogCompactTask deserialize(int version, DataInputView view) throws IOException {
+ if (version != getVersion()) {
+ throw new RuntimeException("Can not deserialize version: " + version);
+ }
+
+ return new ChangelogCompactTask(
+ view.readLong(),
+ deserializeBinaryRow(view),
+ deserializeMap(view),
+ deserializeMap(view));
+ }
+
+ private void serializeMap(Map> map, DataOutputView view)
+ throws IOException {
+ view.writeInt(map.size());
+ for (Map.Entry> entry : map.entrySet()) {
+ view.writeInt(entry.getKey());
+ if (entry.getValue() == null) {
+ throw new IllegalArgumentException(
+ "serialize error. no value for bucket-" + entry.getKey());
+ }
+ dataFileSerializer.serializeList(entry.getValue(), view);
+ }
+ }
+
+ private Map> deserializeMap(DataInputView view) throws IOException {
+ final int size = view.readInt();
+
+ final Map> map =
+ CollectionUtil.newHashMapWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ map.put(view.readInt(), dataFileSerializer.deserializeList(view));
+ }
+
+ return map;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
new file mode 100644
index 000000000000..260c25a31561
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.List;
+
+/**
+ * Receive and process the {@link ChangelogCompactTask}s emitted by {@link
+ * ChangelogCompactCoordinateOperator}.
+ */
+public class ChangelogCompactWorkerOperator extends AbstractStreamOperator
+ implements OneInputStreamOperator, Committable> {
+ private final FileStoreTable table;
+
+ public ChangelogCompactWorkerOperator(FileStoreTable table) {
+ this.table = table;
+ }
+
+ public void processElement(StreamRecord> record)
+ throws Exception {
+
+ if (record.getValue().isLeft()) {
+ output.collect(new StreamRecord<>(record.getValue().left()));
+ } else {
+ ChangelogCompactTask task = record.getValue().right();
+ List committables = task.doCompact(table);
+ committables.forEach(committable -> output.collect(new StreamRecord<>(committable)));
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
new file mode 100644
index 000000000000..5cae899a0704
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** Type information for {@link ChangelogCompactTask}. */
+public class ChangelogTaskTypeInfo extends TypeInformation {
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class getTypeClass() {
+ return ChangelogCompactTask.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer createSerializer(ExecutionConfig config) {
+ // we don't need copy for task
+ return new NoneCopyVersionedSerializerTypeSerializerProxy(
+ ChangelogCompactTaskSerializer::new) {};
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof ChangelogTaskTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof ChangelogTaskTypeInfo;
+ }
+
+ @Override
+ public String toString() {
+ return "ChangelogCompactionTask";
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
new file mode 100644
index 000000000000..8d17311518df
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog.format;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.reader.RecordReader;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * {@link FormatReaderFactory} for compacted changelog.
+ *
+ * File Name Protocol
+ *
+ *
There are two kinds of file name. In the following description, bid1 and
+ * bid2 are bucket id, off is offset, len1 and len2
+ * are lengths.
+ *
+ *
+ * bucket-bid1/compacted-changelog-xxx$bid1-len1: This is the real file name. If
+ * this file name is recorded in manifest file meta, reader should read the bytes of this file
+ * starting from offset 0 with length len1.
+ * bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2: This is the fake file
+ * name. Reader should read the bytes of file
+ * bucket-bid1/compacted-changelog-xxx$bid1-len1 starting from offset off
+ * with length len2.
+ *
+ */
+public class CompactedChangelogFormatReaderFactory implements FormatReaderFactory {
+
+ private final FormatReaderFactory wrapped;
+
+ public CompactedChangelogFormatReaderFactory(FormatReaderFactory wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public RecordReader createReader(Context context) throws IOException {
+ OffsetReadOnlyFileIO fileIO = new OffsetReadOnlyFileIO(context.fileIO());
+ long length = decodePath(context.filePath()).length;
+
+ return wrapped.createReader(
+ new Context() {
+
+ @Override
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
+ @Override
+ public Path filePath() {
+ return context.filePath();
+ }
+
+ @Override
+ public long fileSize() {
+ return length;
+ }
+ });
+ }
+
+ private static DecodeResult decodePath(Path path) {
+ String[] nameAndFormat = path.getName().split("\\.");
+ String[] names = nameAndFormat[0].split("\\$");
+ String[] split = names[1].split("-");
+ if (split.length == 2) {
+ return new DecodeResult(path, 0, Long.parseLong(split[1]));
+ } else {
+ Path realPath =
+ new Path(
+ path.getParent().getParent(),
+ "bucket-"
+ + split[0]
+ + "/"
+ + names[0]
+ + "$"
+ + split[0]
+ + "-"
+ + split[1]
+ + "."
+ + nameAndFormat[1]);
+ return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3]));
+ }
+ }
+
+ private static class DecodeResult {
+
+ private final Path path;
+ private final long offset;
+ private final long length;
+
+ private DecodeResult(Path path, long offset, long length) {
+ this.path = path;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ private static class OffsetReadOnlyFileIO implements FileIO {
+
+ private final FileIO wrapped;
+
+ private OffsetReadOnlyFileIO(FileIO wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public boolean isObjectStore() {
+ return wrapped.isObjectStore();
+ }
+
+ @Override
+ public void configure(CatalogContext context) {
+ wrapped.configure(context);
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws IOException {
+ DecodeResult result = decodePath(path);
+ return new OffsetSeekableInputStream(
+ wrapped.newInputStream(result.path), result.offset, result.length);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ DecodeResult result = decodePath(path);
+ FileStatus status = wrapped.getFileStatus(result.path);
+
+ return new FileStatus() {
+
+ @Override
+ public long getLen() {
+ return result.length;
+ }
+
+ @Override
+ public boolean isDir() {
+ return status.isDir();
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return status.getModificationTime();
+ }
+ };
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return wrapped.exists(decodePath(path).path);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class OffsetSeekableInputStream extends SeekableInputStream {
+
+ private final SeekableInputStream wrapped;
+ private final long offset;
+ private final long length;
+
+ private OffsetSeekableInputStream(SeekableInputStream wrapped, long offset, long length)
+ throws IOException {
+ this.wrapped = wrapped;
+ this.offset = offset;
+ this.length = length;
+ wrapped.seek(offset);
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ wrapped.seek(offset + desired);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return wrapped.getPos() - offset;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (getPos() >= length) {
+ throw new EOFException();
+ }
+ return wrapped.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ long realLen = Math.min(len, length - getPos());
+ return wrapped.read(b, off, (int) realLen);
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrapped.close();
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java
new file mode 100644
index 000000000000..39bed81505c6
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog.format;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** {@link FileFormat} for compacted changelog. */
+public class CompactedChangelogReadOnlyFormat extends FileFormat {
+
+ private final FileFormat wrapped;
+
+ protected CompactedChangelogReadOnlyFormat(String formatIdentifier, FileFormat wrapped) {
+ super(formatIdentifier);
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType projectedRowType, @Nullable List filters) {
+ return new CompactedChangelogFormatReaderFactory(
+ wrapped.createReaderFactory(projectedRowType, filters));
+ }
+
+ @Override
+ public FormatWriterFactory createWriterFactory(RowType type) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void validateDataFields(RowType rowType) {
+ wrapped.validateDataFields(rowType);
+ }
+
+ public static String getIdentifier(String wrappedFormat) {
+ return "cc-" + wrappedFormat;
+ }
+
+ static class AbstractFactory implements FileFormatFactory {
+
+ private final String format;
+
+ AbstractFactory(String format) {
+ this.format = format;
+ }
+
+ @Override
+ public String identifier() {
+ return getIdentifier(format);
+ }
+
+ @Override
+ public FileFormat create(FormatContext formatContext) {
+ return new CompactedChangelogReadOnlyFormat(
+ getIdentifier(format), FileFormat.fromIdentifier(format, formatContext));
+ }
+ }
+
+ /** {@link FileFormatFactory} for compacted changelog, with orc as the real format. */
+ public static class OrcFactory extends AbstractFactory {
+
+ public OrcFactory() {
+ super("orc");
+ }
+ }
+
+ /** {@link FileFormatFactory} for compacted changelog, with parquet as the real format. */
+ public static class ParquetFactory extends AbstractFactory {
+
+ public ParquetFactory() {
+ super("parquet");
+ }
+ }
+
+ /** {@link FileFormatFactory} for compacted changelog, with avro as the real format. */
+ public static class AvroFactory extends AbstractFactory {
+
+ public AvroFactory() {
+ super("avro");
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index e483e3c19f74..6acb5b77c2a9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -21,6 +21,9 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.TagCreationMode;
+import org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator;
+import org.apache.paimon.flink.compact.changelog.ChangelogCompactWorkerOperator;
+import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -31,6 +34,7 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
@@ -54,6 +58,7 @@
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.createCommitUser;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -226,6 +231,22 @@ public DataStream doWrite(
if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
+
+ if (options.contains(CHANGELOG_PRECOMMIT_COMPACT)) {
+ written =
+ written.transform(
+ "Changelog Compact Coordinator",
+ new EitherTypeInfo<>(
+ new CommittableTypeInfo(), new ChangelogTaskTypeInfo()),
+ new ChangelogCompactCoordinateOperator(table))
+ .forceNonParallel()
+ .transform(
+ "Changelog Compact Worker",
+ new CommittableTypeInfo(),
+ new ChangelogCompactWorkerOperator(table))
+ .setParallelism(written.getParallelism());
+ }
+
return written;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
new file mode 100644
index 000000000000..6e7553d5c668
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$OrcFactory
+org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$ParquetFactory
+org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$AvroFactory
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 2bdd90a963d9..9a9abd8f8d7e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -39,6 +39,7 @@
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -48,6 +49,7 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -303,9 +305,7 @@ private void innerTestChangelogProducing(List options) throws Exception
public void testBatchJobWithConflictAndRestart() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().allowRestart(10).build();
tEnv.executeSql(
- "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '"
- + getTempDirPath()
- + "' )");
+ "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )");
tEnv.executeSql("USE CATALOG mycat");
tEnv.executeSql(
"CREATE TABLE t ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) "
@@ -335,6 +335,199 @@ public void testBatchJobWithConflictAndRestart() throws Exception {
}
}
+ @Test
+ @Timeout(120)
+ public void testChangelogCompactInBatchWrite() throws Exception {
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
+ String catalogDdl =
+ "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + path + "' )";
+ bEnv.executeSql(catalogDdl);
+ bEnv.executeSql("USE CATALOG mycat");
+ bEnv.executeSql(
+ "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) "
+ + "PARTITIONED BY (pt) "
+ + "WITH ("
+ + " 'bucket' = '10',\n"
+ + " 'changelog-producer' = 'lookup',\n"
+ + " 'changelog.precommit-compact' = 'true',\n"
+ + " 'snapshot.num-retained.min' = '3',\n"
+ + " 'snapshot.num-retained.max' = '3'\n"
+ + ")");
+
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(1000).build();
+ sEnv.executeSql(catalogDdl);
+ sEnv.executeSql("USE CATALOG mycat");
+
+ List values = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ values.add(String.format("(0, %d, %d)", i, i));
+ values.add(String.format("(1, %d, %d)", i, i));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await();
+
+ List compactedChangelogs2 = listAllFilesWithPrefix("compacted-changelog-");
+ assertThat(compactedChangelogs2).hasSize(2);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+
+ List expected = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ expected.add(Row.ofKind(RowKind.INSERT, 0, i, i));
+ expected.add(Row.ofKind(RowKind.INSERT, 1, i, i));
+ }
+ assertStreamingResult(
+ sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"),
+ expected);
+
+ values.clear();
+ for (int i = 0; i < 1000; i++) {
+ values.add(String.format("(0, %d, %d)", i, i + 1));
+ values.add(String.format("(1, %d, %d)", i, i + 1));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await();
+
+ assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+
+ for (int i = 0; i < 1000; i++) {
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i));
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1));
+ }
+ assertStreamingResult(
+ sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"),
+ expected);
+
+ values.clear();
+ for (int i = 0; i < 1000; i++) {
+ values.add(String.format("(0, %d, %d)", i, i + 2));
+ values.add(String.format("(1, %d, %d)", i, i + 2));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await();
+
+ assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+ LocalFileIO fileIO = LocalFileIO.create();
+ for (String p : compactedChangelogs2) {
+ assertThat(fileIO.exists(new Path(p))).isFalse();
+ }
+
+ expected = expected.subList(2000, 6000);
+ for (int i = 0; i < 1000; i++) {
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i + 1));
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i + 1));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 2));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 2));
+ }
+ assertStreamingResult(
+ sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"),
+ expected);
+ }
+
+ @Test
+ @Timeout(120)
+ public void testChangelogCompactInStreamWrite() throws Exception {
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(2000)
+ .parallelism(4)
+ .build();
+
+ sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ sEnv.executeSql("USE CATALOG testCatalog");
+ sEnv.executeSql(
+ "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) "
+ + "PARTITIONED BY (pt) "
+ + "WITH ("
+ + " 'bucket' = '10',\n"
+ + " 'changelog-producer' = 'lookup',\n"
+ + " 'changelog.precommit-compact' = 'true'\n"
+ + ")");
+
+ Path inputPath = new Path(path, "input");
+ LocalFileIO.create().mkdirs(inputPath);
+ sEnv.executeSql(
+ "CREATE TABLE `default_catalog`.`default_database`.`s` ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED) "
+ + "WITH ( 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '"
+ + inputPath
+ + "', 'source.monitor-interval' = '500ms' )");
+
+ sEnv.executeSql("INSERT INTO t SELECT * FROM `default_catalog`.`default_database`.`s`");
+ CloseableIterator it = sEnv.executeSql("SELECT * FROM t").collect();
+
+ // write initial data
+ List values = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ values.add(String.format("(0, %d, %d)", i, i));
+ values.add(String.format("(1, %d, %d)", i, i));
+ }
+ sEnv.executeSql(
+ "INSERT INTO `default_catalog`.`default_database`.`s` VALUES "
+ + String.join(", ", values))
+ .await();
+
+ List expected = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ expected.add(Row.ofKind(RowKind.INSERT, 0, i, i));
+ expected.add(Row.ofKind(RowKind.INSERT, 1, i, i));
+ }
+ assertStreamingResult(it, expected);
+
+ List compactedChangelogs2 = listAllFilesWithPrefix("compacted-changelog-");
+ assertThat(compactedChangelogs2).hasSize(2);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+
+ // write update data
+ values.clear();
+ for (int i = 0; i < 100; i++) {
+ values.add(String.format("(0, %d, %d)", i, i + 1));
+ values.add(String.format("(1, %d, %d)", i, i + 1));
+ }
+ sEnv.executeSql(
+ "INSERT INTO `default_catalog`.`default_database`.`s` VALUES "
+ + String.join(", ", values))
+ .await();
+ for (int i = 0; i < 100; i++) {
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i));
+ expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1));
+ expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1));
+ }
+ assertStreamingResult(it, expected.subList(200, 600));
+ assertThat(listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
+ assertThat(listAllFilesWithPrefix("changelog-")).isEmpty();
+ }
+
+ private List listAllFilesWithPrefix(String prefix) throws Exception {
+ try (Stream stream = Files.walk(java.nio.file.Paths.get(path))) {
+ return stream.filter(Files::isRegularFile)
+ .filter(p -> p.getFileName().toString().startsWith(prefix))
+ .map(java.nio.file.Path::toString)
+ .collect(Collectors.toList());
+ }
+ }
+
+ private void assertStreamingResult(TableResult result, List expected) throws Exception {
+ List actual = new ArrayList<>();
+ try (CloseableIterator it = result.collect()) {
+ while (actual.size() < expected.size() && it.hasNext()) {
+ actual.add(it.next());
+ }
+ }
+ assertThat(actual).hasSameElementsAs(expected);
+ }
+
+ private void assertStreamingResult(CloseableIterator it, List expected) {
+ List actual = new ArrayList<>();
+ while (actual.size() < expected.size() && it.hasNext()) {
+ actual.add(it.next());
+ }
+
+ assertThat(actual).hasSameElementsAs(expected);
+ }
+
// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
@@ -488,15 +681,17 @@ private void testLookupChangelogProducerRandom(
tEnv,
numProducers,
enableFailure,
- "'bucket' = '4',"
- + String.format(
- "'write-buffer-size' = '%s',"
- + "'changelog-producer' = 'lookup',"
- + "'lookup-wait' = '%s',"
- + "'deletion-vectors.enabled' = '%s'",
- random.nextBoolean() ? "4mb" : "8mb",
- random.nextBoolean(),
- enableDeletionVectors));
+ String.format(
+ "'bucket' = '4', "
+ + "'writer-buffer-size' = '%s', "
+ + "'changelog-producer' = 'lookup', "
+ + "'lookup-wait' = '%s', "
+ + "'deletion-vectors.enabled' = '%s', "
+ + "'changelog.precommit-compact' = '%s'",
+ random.nextBoolean() ? "512kb" : "1mb",
+ random.nextBoolean(),
+ enableDeletionVectors,
+ random.nextBoolean()));
// sleep for a random amount of time to check
// if we can first read complete records then read incremental records correctly
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
new file mode 100644
index 000000000000..906fac850973
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
+import static org.apache.paimon.stats.StatsTestUtils.newSimpleStats;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ChangelogCompactTaskSerializer}. */
+public class ChangelogCompactTaskSerializerTest {
+ private final ChangelogCompactTaskSerializer serializer = new ChangelogCompactTaskSerializer();
+
+ @Test
+ public void testSerializer() throws Exception {
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(partition);
+ writer.writeInt(0, 0);
+ writer.complete();
+
+ ChangelogCompactTask task =
+ new ChangelogCompactTask(
+ 1L,
+ partition,
+ new HashMap>() {
+ {
+ put(0, newFiles(20));
+ put(1, newFiles(20));
+ }
+ },
+ new HashMap>() {
+ {
+ put(0, newFiles(10));
+ put(1, newFiles(10));
+ }
+ });
+ ChangelogCompactTask serializeTask = serializer.deserialize(1, serializer.serialize(task));
+ assertThat(task).isEqualTo(serializeTask);
+ }
+
+ private List newFiles(int num) {
+ List list = new ArrayList<>();
+ for (int i = 0; i < num; i++) {
+ list.add(newFile());
+ }
+ return list;
+ }
+
+ private DataFileMeta newFile() {
+ return new DataFileMeta(
+ UUID.randomUUID().toString(),
+ 0,
+ 1,
+ row(0),
+ row(0),
+ newSimpleStats(0, 1),
+ newSimpleStats(0, 1),
+ 0,
+ 1,
+ 0,
+ 0,
+ 0L,
+ null,
+ FileSource.APPEND,
+ null);
+ }
+}