Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@Fork(1)
@State(Scope.Benchmark)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.SingleShotTime)
public class DynamicRecordSerializerDeserializerBenchmark {
private static final int SAMPLE_SIZE = 100_000;
private static final Schema SCHEMA =
new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "name2", Types.StringType.get()),
Types.NestedField.required(3, "name3", Types.StringType.get()),
Types.NestedField.required(4, "name4", Types.StringType.get()),
Types.NestedField.required(5, "name5", Types.StringType.get()),
Types.NestedField.required(6, "name6", Types.StringType.get()),
Types.NestedField.required(7, "name7", Types.StringType.get()),
Types.NestedField.required(8, "name8", Types.StringType.get()),
Types.NestedField.required(9, "name9", Types.StringType.get()));

private List<DynamicRecordInternal> rows = Lists.newArrayListWithExpectedSize(SAMPLE_SIZE);
private DynamicRecordInternalType type;

public static void main(String[] args) throws RunnerException {
Options options =
new OptionsBuilder()
.include(DynamicRecordSerializerDeserializerBenchmark.class.getSimpleName())
.build();
new Runner(options).run();
}

@Setup
public void setupBenchmark() throws IOException {
List<Record> records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L);
this.rows =
records.stream()
.map(
r ->
new DynamicRecordInternal(
"t",
"main",
SCHEMA,
RowDataConverter.convert(SCHEMA, r),
PartitionSpec.unpartitioned(),
1,
false,
Collections.emptySet()))
.collect(Collectors.toList());

File warehouse = Files.createTempFile("perf-bench", null).toFile();
CatalogLoader catalogLoader =
CatalogLoader.hadoop(
"hadoop",
new Configuration(),
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getPath()));
this.type = new DynamicRecordInternalType(catalogLoader, true, 100);
}

@Benchmark
@Threads(1)
public void testSerialize(Blackhole blackhole) throws IOException {
TypeSerializer<DynamicRecordInternal> serializer =
type.createSerializer((SerializerConfig) null);
DataOutputSerializer outputView = new DataOutputSerializer(1024);
for (int i = 0; i < SAMPLE_SIZE; ++i) {
serializer.serialize(rows.get(i), outputView);
}
}

@Benchmark
@Threads(1)
public void testSerializeAndDeserialize(Blackhole blackhole) throws IOException {
TypeSerializer<DynamicRecordInternal> serializer =
type.createSerializer((SerializerConfig) null);

DataOutputSerializer outputView = new DataOutputSerializer(1024);
for (int i = 0; i < SAMPLE_SIZE; ++i) {
serializer.serialize(rows.get(i), outputView);
serializer.deserialize(new DataInputDeserializer(outputView.getSharedBuffer()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
Expand All @@ -43,6 +44,12 @@ public FlinkConfParser(Table table, Map<String, String> options, ReadableConfig
this.readableConfig = readableConfig;
}

FlinkConfParser(Map<String, String> options, ReadableConfig readableConfig) {
this.tableProperties = ImmutableMap.of();
this.options = options;
this.readableConfig = readableConfig;
}

public BooleanConfParser booleanConf() {
return new BooleanConfParser();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public FlinkWriteConf(
this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
}

public FlinkWriteConf(Map<String, String> writeOptions, ReadableConfig readableConfig) {
this.confParser = new FlinkConfParser(writeOptions, readableConfig);
}

public boolean overwriteMode() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;

import java.io.IOException;
Expand Down Expand Up @@ -708,51 +702,10 @@ static IcebergStreamWriter<RowData> createStreamWriter(
flinkRowType,
flinkWriteConf.targetDataFileSize(),
format,
writeProperties(initTable, format, flinkWriteConf),
SinkUtil.writeProperties(format, flinkWriteConf, initTable),
equalityFieldIds,
flinkWriteConf.upsertMode());

return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
}

/**
* Based on the {@link FileFormat} overwrites the table level compression properties for the table
* write.
*
* @param table The table to get the table level settings
* @param format The FileFormat to use
* @param conf The write configuration
* @return The properties to use for writing
*/
private static Map<String, String> writeProperties(
Table table, FileFormat format, FlinkWriteConf conf) {
Map<String, String> writeProperties = Maps.newHashMap(table.properties());

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
String parquetCompressionLevel = conf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
String avroCompressionLevel = conf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}

return writeProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;

import java.io.IOException;
Expand Down Expand Up @@ -637,7 +631,7 @@ IcebergSink build() {
table,
snapshotSummary,
uidSuffix,
writeProperties(table, flinkWriteConf.dataFileFormat(), flinkWriteConf),
SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table),
toFlinkRowType(table.schema(), tableSchema),
tableSupplier,
flinkWriteConf,
Expand Down Expand Up @@ -715,47 +709,6 @@ private static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema
}
}

/**
* Based on the {@link FileFormat} overwrites the table level compression properties for the table
* write.
*
* @param table The table to get the table level settings
* @param format The FileFormat to use
* @param conf The write configuration
* @return The properties to use for writing
*/
private static Map<String, String> writeProperties(
Table table, FileFormat format, FlinkWriteConf conf) {
Map<String, String> writeProperties = Maps.newHashMap(table.properties());

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
String parquetCompressionLevel = conf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
String avroCompressionLevel = conf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}

return writeProperties;
}

private DataStream<RowData> distributeDataStream(DataStream<RowData> input) {
DistributionMode mode = flinkWriteConf.distributionMode();
Schema schema = table.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,30 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;

import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SinkUtil {
@Internal
public class SinkUtil {

private static final long INITIAL_CHECKPOINT_ID = -1L;

Expand Down Expand Up @@ -90,4 +103,48 @@ static long getMaxCommittedCheckpointId(

return lastCommittedCheckpointId;
}

/**
* Based on the {@link FileFormat} overwrites the table level compression properties for the table
* write.
*
* @param format The FileFormat to use
* @param conf The write configuration
* @param table The table to get the table level settings
* @return The properties to use for writing
*/
public static Map<String, String> writeProperties(
FileFormat format, FlinkWriteConf conf, @Nullable Table table) {
Map<String, String> writeProperties = Maps.newHashMap();
if (table != null) {
writeProperties.putAll(table.properties());
}

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
String parquetCompressionLevel = conf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
String avroCompressionLevel = conf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}

return writeProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ private void commitDeltaTxn(
}
}

private void commitOperation(
@VisibleForTesting
void commitOperation(
Table table,
String branch,
SnapshotUpdate<?> operation,
Expand Down
Loading