Skip to content

Conversation

@tsreaper
Copy link
Contributor

Purpose

In #4380 we introduce pre-commit compact for changelog files. Multiple changelog files from the same partition will be merged into one big file in one worker parallelism to decrease the number of small files.

However, when the number of changelog files to merge is large (while each file itself is small enough), the copying process will be slow, because opening these many files from the filesystem takes a lot of time.

In this PR, we add a thread pool to the worker operator, so that when performing pre-commit compact for changelogs, we can copy the bytes with multiple threads, thus speeding up the process.

Tests

Existing IT cases should cover this change. This PR also adds a unit test for the coordinator operator.

API and Format

No format changes.

Documentation

Document is also updated.

.intType()
.noDefaultValue()
.withDescription(
"Maximum number of threads to copy bytes form small changelog files. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

form -> from

int numThreads =
options.getOptional(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM)
.orElse(Runtime.getRuntime().availableProcessors());
LOG.info("Creating thread poll of size {} for changelog compaction.", numThreads);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll -> pool

private void readFully() {
try {
result = IOUtils.readFully(table.fileIO().newInputStream(path), true);
table.fileIO().deleteQuietly(path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If job failover after table.fileIO().deleteQuietly(path); and before copy all files into a new big file.
Is there a risk of file loss here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If job fails then no changelog will be committed, thus no risk of file loss.

@wwj6591812
Copy link
Contributor

+1

ThreadPoolUtils.randomlyExecuteSequentialReturn(
executor,
t -> {
// Total lengths of all bytes will not exceed `targetFileSize * 2`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it is better to use workers and queue and consumer. Even max targetFileSize * 2, if target file size is 1GB, this is too still large.

Workers and queue is safer.

@JingsongLi
Copy link
Contributor

+1

@JingsongLi JingsongLi merged commit 241ac76 into apache:master Apr 6, 2025
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants