Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,12 @@ You can use fine-grained-resource-management of Flink to increase committer heap
1. Configure Flink Configuration `cluster.fine-grained-resource-management.enabled: true`. (This is default after Flink 1.18)
2. Configure Paimon Table Options: `sink.committer-memory`, for example 300 MB, depends on your `TaskManager`.
(`sink.committer-cpu` is also supported)

## Changelog Compaction

If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large,
each snapshot may produce lots of small changelog files.
Too many files may put a burden on the distributed storage cluster.

In order to compact small changelog files into large ones, you can set the table option `changelog.precommit-compact = true`.
Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer operator, which copies changelog files into large ones.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>changelog.precommit-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, it will add a changelog compact coordinator and worker operator after the writer operator,in order to compact several changelog files from the same partition into large ones, which can decrease the number of small files. </td>
</tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class IOUtils {
private static final Logger LOG = LoggerFactory.getLogger(IOUtils.class);

/** The block size for byte operations in byte. */
private static final int BLOCKSIZE = 4096;
public static final int BLOCKSIZE = 4096;

// ------------------------------------------------------------------------
// Byte copy operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,15 @@ public class FlinkConnectorOptions {
.withDescription(
"Optional endInput watermark used in case of batch mode or bounded stream.");

public static final ConfigOption<Boolean> CHANGELOG_PRECOMMIT_COMPACT =
key("changelog.precommit-compact")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, it will add a changelog compact coordinator and worker operator after the writer operator,"
+ "in order to compact several changelog files from the same partition into large ones, "
+ "which can decrease the number of small files. ");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact.changelog;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Coordinator operator for compacting changelog files.
*
* <p>{@link ChangelogCompactCoordinateOperator} calculates the file size of changelog files
* contained in all buckets within each partition from {@link Committable} message emitted from
* writer operator. And emit {@link ChangelogCompactTask} to {@link ChangelogCompactWorkerOperator}.
*/
public class ChangelogCompactCoordinateOperator
extends AbstractStreamOperator<Either<Committable, ChangelogCompactTask>>
implements OneInputStreamOperator<Committable, Either<Committable, ChangelogCompactTask>>,
BoundedOneInput {
private final FileStoreTable table;

private transient long checkpointId;
private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;

public ChangelogCompactCoordinateOperator(FileStoreTable table) {
this.table = table;
}

@Override
public void open() throws Exception {
super.open();

checkpointId = Long.MIN_VALUE;
partitionChangelogs = new HashMap<>();
}

public void processElement(StreamRecord<Committable> record) {
Committable committable = record.getValue();
checkpointId = Math.max(checkpointId, committable.checkpointId());
if (committable.kind() != Committable.Kind.FILE) {
output.collect(new StreamRecord<>(Either.Left(record.getValue())));
return;
}

CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable();
if (message.newFilesIncrement().changelogFiles().isEmpty()
&& message.compactIncrement().changelogFiles().isEmpty()) {
output.collect(new StreamRecord<>(Either.Left(record.getValue())));
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long targetFileSize = table.coreOptions().targetFileSize(false);

BinaryRow partition = message.partition();
Integer bucket = message.bucket();
long targetFileSize = table.coreOptions().targetFileSize(false);
for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two for statement has lots of some code, you can avoid this.

partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addNewChangelogFile(bucket, meta);
PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
if (partitionChangelog.totalFileSize >= targetFileSize) {
emitPartitionChanglogCompactTask(partition);
}
}
for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addCompactChangelogFile(bucket, meta);
PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
if (partitionChangelog.totalFileSize >= targetFileSize) {
emitPartitionChanglogCompactTask(partition);
}
}

CommitMessageImpl newMessage =
new CommitMessageImpl(
message.partition(),
message.bucket(),
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
Collections.emptyList()),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
Collections.emptyList()),
message.indexIncrement());
Committable newCommittable =
new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
output.collect(new StreamRecord<>(Either.Left(newCommittable)));
}

public void prepareSnapshotPreBarrier(long checkpointId) {
emitAllPartitionsChanglogCompactTask();
}

public void endInput() {
emitAllPartitionsChanglogCompactTask();
}

private void emitPartitionChanglogCompactTask(BinaryRow partition) {
PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionChangelog may be null or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionChangelog may be null or not?

output.collect(
new StreamRecord<>(
Either.Right(
new ChangelogCompactTask(
checkpointId,
partition,
partitionChangelog.newFileChangelogFiles,
partitionChangelog.compactChangelogFiles))));
partitionChangelogs.remove(partition);
}

private void emitAllPartitionsChanglogCompactTask() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionChangelogs.keySet().forEach(this::emitPartitionChangelogCompactTask);

List<BinaryRow> partitions = new ArrayList<>(partitionChangelogs.keySet());
for (BinaryRow partition : partitions) {
emitPartitionChanglogCompactTask(partition);
}
}

private static class PartitionChangelog {
private long totalFileSize;
private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newChangelogFiles

private final Map<Integer, List<DataFileMeta>> compactChangelogFiles;

public PartitionChangelog() {
totalFileSize = 0;
newFileChangelogFiles = new HashMap<>();
compactChangelogFiles = new HashMap<>();
}

public void addNewChangelogFile(Integer bucket, DataFileMeta file) {
totalFileSize += file.fileSize();
newFileChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file);
}

public void addCompactChangelogFile(Integer bucket, DataFileMeta file) {
totalFileSize += file.fileSize();
compactChangelogFiles.computeIfAbsent(bucket, k -> new ArrayList<>()).add(file);
}
}
}
Loading