Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package org.apache.iceberg.flink.sink;

import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.util.ScanTaskUtil;

class CommitSummary {
@Internal
public class CommitSummary {
Comment thread
mxm marked this conversation as resolved.

private final AtomicLong dataFilesCount = new AtomicLong();
private final AtomicLong dataFilesRecordCount = new AtomicLong();
Expand All @@ -34,30 +37,35 @@ class CommitSummary {
private final AtomicLong deleteFilesRecordCount = new AtomicLong();
private final AtomicLong deleteFilesByteCount = new AtomicLong();

CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
pendingResults
.values()
public CommitSummary() {}

public CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
pendingResults.values().forEach(this::addWriteResult);
}

public void addAll(NavigableMap<Long, List<WriteResult>> pendingResults) {
pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult));
}

private void addWriteResult(WriteResult writeResult) {
dataFilesCount.addAndGet(writeResult.dataFiles().length);
Arrays.stream(writeResult.dataFiles())
.forEach(
dataFile -> {
dataFilesRecordCount.addAndGet(dataFile.recordCount());
dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes());
});
deleteFilesCount.addAndGet(writeResult.deleteFiles().length);
Arrays.stream(writeResult.deleteFiles())
.forEach(
writeResult -> {
dataFilesCount.addAndGet(writeResult.dataFiles().length);
Arrays.stream(writeResult.dataFiles())
.forEach(
dataFile -> {
dataFilesRecordCount.addAndGet(dataFile.recordCount());
dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes());
});
deleteFilesCount.addAndGet(writeResult.deleteFiles().length);
Arrays.stream(writeResult.deleteFiles())
.forEach(
deleteFile -> {
deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile);
deleteFilesByteCount.addAndGet(deleteBytes);
});
deleteFile -> {
deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile);
deleteFilesByteCount.addAndGet(deleteBytes);
});
}

long dataFilesCount() {
public long dataFilesCount() {
return dataFilesCount.get();
}

Expand All @@ -69,7 +77,7 @@ long dataFilesByteCount() {
return dataFilesByteCount.get();
}

long deleteFilesCount() {
public long deleteFilesCount() {
return deleteFilesCount.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.iceberg.flink.sink;

import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class DeltaManifests {
@Internal
public class DeltaManifests {
Comment thread
pvary marked this conversation as resolved.

private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];

Expand Down Expand Up @@ -56,7 +58,7 @@ CharSequence[] referencedDataFiles() {
return referencedDataFiles;
}

List<ManifestFile> manifests() {
public List<ManifestFile> manifests() {
List<ManifestFile> manifests = Lists.newArrayListWithCapacity(2);
if (dataManifest != null) {
manifests.add(dataManifest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
@Internal
public class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
Comment thread
mxm marked this conversation as resolved.
private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
private static final byte[] EMPTY_BINARY = new byte[0];

static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
public static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();

@Override
public int getVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlinkManifestUtil {
public class FlinkManifestUtil {

private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class);
private static final int FORMAT_V2 = 2;
Expand Down Expand Up @@ -66,7 +66,7 @@ static List<DataFile> readDataFiles(
}
}

static ManifestOutputFileFactory createOutputFileFactory(
public static ManifestOutputFileFactory createOutputFileFactory(
Supplier<Table> tableSupplier,
Map<String, String> tableProps,
String flinkJobId,
Expand All @@ -83,7 +83,7 @@ static ManifestOutputFileFactory createOutputFileFactory(
* @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
* partition spec
*/
static DeltaManifests writeCompletedFiles(
public static DeltaManifests writeCompletedFiles(
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
throws IOException {

Expand Down Expand Up @@ -114,7 +114,7 @@ static DeltaManifests writeCompletedFiles(
return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}

static WriteResult readCompletedFiles(
public static WriteResult readCompletedFiles(
DeltaManifests deltaManifests, FileIO io, Map<Integer, PartitionSpec> specsById)
throws IOException {
WriteResult.Builder builder = WriteResult.builder();
Expand All @@ -135,7 +135,7 @@ static WriteResult readCompletedFiles(
return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build();
}

static void deleteCommittedManifests(
public static void deleteCommittedManifests(
Table table, List<ManifestFile> manifests, String newFlinkJobId, long checkpointId) {
for (ManifestFile manifest : manifests) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*
* <p>In both cases only the respective part is serialized.
*/
class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
public class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
private static final int VERSION = 1;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.iceberg.flink.util.ElapsedTimeGauge;

class IcebergFilesCommitterMetrics {
@Internal
public class IcebergFilesCommitterMetrics {
private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
private final AtomicLong lastCommitDurationMs = new AtomicLong();
private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit;
Expand All @@ -35,7 +37,7 @@ class IcebergFilesCommitterMetrics {
private final Counter committedDeleteFilesRecordCount;
private final Counter committedDeleteFilesByteCount;

IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) {
public IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) {
MetricGroup committerMetrics =
metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName);
committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get);
Expand All @@ -52,16 +54,16 @@ class IcebergFilesCommitterMetrics {
this.committedDeleteFilesByteCount = committerMetrics.counter("committedDeleteFilesByteCount");
}

void checkpointDuration(long checkpointDurationMs) {
public void checkpointDuration(long checkpointDurationMs) {
lastCheckpointDurationMs.set(checkpointDurationMs);
}

void commitDuration(long commitDurationMs) {
public void commitDuration(long commitDurationMs) {
lastCommitDurationMs.set(commitDurationMs);
}

/** This is called upon a successful commit. */
void updateCommitSummary(CommitSummary stats) {
public void updateCommitSummary(CommitSummary stats) {
elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
committedDataFilesCount.inc(stats.dataFilesCount());
committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
import com.codahale.metrics.SlidingWindowReservoir;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.ScanTaskUtil;

class IcebergStreamWriterMetrics {
@Internal
public class IcebergStreamWriterMetrics {
// 1,024 reservoir size should cost about 8KB, which is quite small.
// It should also produce good accuracy for histogram distribution (like percentiles).
private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
Expand All @@ -40,7 +42,7 @@ class IcebergStreamWriterMetrics {
private final Histogram dataFilesSizeHistogram;
private final Histogram deleteFilesSizeHistogram;

IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) {
public IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) {
MetricGroup writerMetrics =
metrics.addGroup("IcebergStreamWriter").addGroup("table", fullTableName);
this.flushedDataFiles = writerMetrics.counter("flushedDataFiles");
Expand All @@ -63,7 +65,7 @@ class IcebergStreamWriterMetrics {
new DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram));
}

void updateFlushResult(WriteResult result) {
public void updateFlushResult(WriteResult result) {
flushedDataFiles.inc(result.dataFiles().length);
flushedDeleteFiles.inc(result.deleteFiles().length);
flushedReferencedDataFiles.inc(result.referencedDataFiles().length);
Expand All @@ -84,7 +86,15 @@ void updateFlushResult(WriteResult result) {
});
}

void flushDuration(long flushDurationMs) {
public void flushDuration(long flushDurationMs) {
lastFlushDurationMs.set(flushDurationMs);
}

public Counter getFlushedDataFiles() {
return flushedDataFiles;
}

public Counter getFlushedDeleteFiles() {
return flushedDeleteFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
Expand All @@ -30,7 +31,8 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Strings;

class ManifestOutputFileFactory {
@Internal
public class ManifestOutputFileFactory {
// Users could define their own flink manifests directory by setting this value in table
// properties.
@VisibleForTesting static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
Expand Down Expand Up @@ -70,7 +72,7 @@ private String generatePath(long checkpointId) {
fileCount.incrementAndGet()));
}

OutputFile create(long checkpointId) {
public OutputFile create(long checkpointId) {
String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION);
TableOperations ops = ((HasTableOperations) tableSupplier.get()).operations();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,28 @@ public RowDataTaskWriterFactory(
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
boolean upsert) {
this(
tableSupplier,
flinkSchema,
targetFileSizeBytes,
format,
writeProperties,
equalityFieldIds,
upsert,
tableSupplier.get().schema(),
tableSupplier.get().spec());
}

public RowDataTaskWriterFactory(
SerializableSupplier<Table> tableSupplier,
RowType flinkSchema,
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
boolean upsert,
Schema schema,
PartitionSpec spec) {
this.tableSupplier = tableSupplier;

Table table;
Expand All @@ -90,9 +112,9 @@ public RowDataTaskWriterFactory(
table = tableSupplier.get();
}

this.schema = table.schema();
this.schema = schema;
this.flinkSchema = flinkSchema;
this.spec = table.spec();
this.spec = spec;
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.equalityFieldIds = equalityFieldIds;
Expand Down Expand Up @@ -148,6 +170,7 @@ public void initialize(int taskId, int attemptId) {
OutputFileFactory.builderFor(table, taskId, attemptId)
.format(format)
.ioSupplier(() -> tableSupplier.get().io())
.defaultSpec(spec)
Comment thread
pvary marked this conversation as resolved.
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.io.WriteResult;

class WriteResultSerializer implements SimpleVersionedSerializer<WriteResult> {
@Internal
public class WriteResultSerializer implements SimpleVersionedSerializer<WriteResult> {
private static final int VERSION = 1;

@Override
Expand Down
Loading