Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>changelog.precommit-compact.thread-num</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Maximum number of threads to copy bytes from small changelog files. By default is the number of processors available to the Java virtual machine.</td>
</tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -108,7 +114,7 @@
<td><h5>precommit-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files. </td>
<td>If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files.</td>
</tr>
<tr>
<td><h5>scan.bounded</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,7 @@ private void advanceIfNeeded() {

public static <U> void randomlyOnlyExecute(
ExecutorService executor, Consumer<U> processor, Collection<U> input) {
List<Future<?>> futures = new ArrayList<>(input.size());
ClassLoader cl = Thread.currentThread().getContextClassLoader();
for (U u : input) {
futures.add(
executor.submit(
() -> {
Thread.currentThread().setContextClassLoader(cl);
processor.accept(u);
}));
}
awaitAllFutures(futures);
awaitAllFutures(submitAllTasks(executor, processor, input));
}

public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(
Expand Down Expand Up @@ -189,7 +179,22 @@ public Iterator<T> next() {
});
}

private static void awaitAllFutures(List<Future<?>> futures) {
public static <U> List<Future<?>> submitAllTasks(
ExecutorService executor, Consumer<U> processor, Collection<U> input) {
List<Future<?>> futures = new ArrayList<>(input.size());
ClassLoader cl = Thread.currentThread().getContextClassLoader();
for (U u : input) {
futures.add(
executor.submit(
() -> {
Thread.currentThread().setContextClassLoader(cl);
processor.accept(u);
}));
}
return futures;
}

public static void awaitAllFutures(List<Future<?>> futures) {
for (Future<?> future : futures) {
try {
future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,24 @@ public class FlinkConnectorOptions {
+ "in order to compact several changelog files (for primary key tables) "
+ "or newly created data files (for unaware bucket tables) "
+ "from the same partition into large ones, "
+ "which can decrease the number of small files. ");
+ "which can decrease the number of small files.");

public static final ConfigOption<Integer> CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM =
key("changelog.precommit-compact.thread-num")
.intType()
.noDefaultValue()
.withDescription(
"Maximum number of threads to copy bytes from small changelog files. "
+ "By default is the number of processors available to the Java virtual machine.");

@ExcludeFromDocumentation("Most users won't need to adjust this config")
public static final ConfigOption<MemorySize> CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE =
key("changelog.precommit-compact.buffer-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription(
"The buffer size for copying bytes from small changelog files. "
+ "The default value is 128 MB.");

public static final ConfigOption<String> SOURCE_OPERATOR_UID_SUFFIX =
key("source.operator-uid.suffix")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.paimon.flink.compact.changelog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand All @@ -35,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -49,21 +51,22 @@ public class ChangelogCompactCoordinateOperator
extends AbstractStreamOperator<Either<Committable, ChangelogCompactTask>>
implements OneInputStreamOperator<Committable, Either<Committable, ChangelogCompactTask>>,
BoundedOneInput {
private final FileStoreTable table;

private final CoreOptions options;

private transient long checkpointId;
private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;

public ChangelogCompactCoordinateOperator(FileStoreTable table) {
this.table = table;
public ChangelogCompactCoordinateOperator(CoreOptions options) {
this.options = options;
}

@Override
public void open() throws Exception {
super.open();

checkpointId = Long.MIN_VALUE;
partitionChangelogs = new HashMap<>();
partitionChangelogs = new LinkedHashMap<>();
}

public void processElement(StreamRecord<Committable> record) {
Expand All @@ -81,10 +84,26 @@ public void processElement(StreamRecord<Committable> record) {
return;
}

// Changelog files are not stored in an LSM tree,
// so we can regard them as files without primary keys.
long targetFileSize = options.targetFileSize(false);
long compactionFileSize =
Math.min(
options.compactionFileSize(false),
options.toConfiguration()
.get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE)
.getBytes());

BinaryRow partition = message.partition();
Integer bucket = message.bucket();
long targetFileSize = table.coreOptions().targetFileSize(false);
List<DataFileMeta> skippedNewChangelogs = new ArrayList<>();
List<DataFileMeta> skippedCompactChangelogs = new ArrayList<>();

for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
if (meta.fileSize() >= compactionFileSize) {
skippedNewChangelogs.add(meta);
continue;
}
partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addNewChangelogFile(bucket, meta);
Expand All @@ -94,6 +113,10 @@ public void processElement(StreamRecord<Committable> record) {
}
}
for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
if (meta.fileSize() >= compactionFileSize) {
skippedCompactChangelogs.add(meta);
continue;
}
partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addCompactChangelogFile(bucket, meta);
Expand All @@ -111,11 +134,11 @@ public void processElement(StreamRecord<Committable> record) {
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
Collections.emptyList()),
skippedNewChangelogs),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
Collections.emptyList()),
skippedCompactChangelogs),
message.indexIncrement());
Committable newCommittable =
new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
Expand All @@ -132,15 +155,59 @@ public void endInput() {

private void emitPartitionChangelogCompactTask(BinaryRow partition) {
PartitionChangelog partitionChangelog = partitionChangelogs.get(partition);
output.collect(
new StreamRecord<>(
Either.Right(
new ChangelogCompactTask(
checkpointId,
partition,
table.coreOptions().bucket(),
partitionChangelog.newFileChangelogFiles,
partitionChangelog.compactChangelogFiles))));
int numNewChangelogFiles =
partitionChangelog.newFileChangelogFiles.values().stream()
.mapToInt(List::size)
.sum();
int numCompactChangelogFiles =
partitionChangelog.compactChangelogFiles.values().stream()
.mapToInt(List::size)
.sum();
if (numNewChangelogFiles + numCompactChangelogFiles == 1) {
// there is only one changelog file in this partition, so we don't wrap it as a
// compaction task
CommitMessageImpl message;
if (numNewChangelogFiles == 1) {
Map.Entry<Integer, List<DataFileMeta>> entry =
partitionChangelog.newFileChangelogFiles.entrySet().iterator().next();
message =
new CommitMessageImpl(
partition,
entry.getKey(),
options.bucket(),
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
entry.getValue()),
CompactIncrement.emptyIncrement());
} else {
Map.Entry<Integer, List<DataFileMeta>> entry =
partitionChangelog.compactChangelogFiles.entrySet().iterator().next();
message =
new CommitMessageImpl(
partition,
entry.getKey(),
options.bucket(),
DataIncrement.emptyIncrement(),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
entry.getValue()));
}
Committable newCommittable =
new Committable(checkpointId, Committable.Kind.FILE, message);
output.collect(new StreamRecord<>(Either.Left(newCommittable)));
} else {
output.collect(
new StreamRecord<>(
Either.Right(
new ChangelogCompactTask(
checkpointId,
partition,
options.bucket(),
partitionChangelog.newFileChangelogFiles,
partitionChangelog.compactChangelogFiles))));
}
partitionChangelogs.remove(partition);
}

Expand Down
Loading
Loading