From ead88cf882e302615ec8f5cb5336ac17d96724c6 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 24 Oct 2022 17:56:03 +0200 Subject: [PATCH 1/2] Flink compression properties --- docs/flink-getting-started.md | 17 +- .../apache/iceberg/flink/FlinkConfParser.java | 1 - .../apache/iceberg/flink/FlinkWriteConf.java | 60 +++++ .../iceberg/flink/FlinkWriteOptions.java | 12 + .../apache/iceberg/flink/sink/FlinkSink.java | 53 +++- .../flink/sink/RowDataTaskWriterFactory.java | 9 +- .../iceberg/flink/source/RowDataRewriter.java | 8 +- .../flink/sink/TestCompressionSettings.java | 254 ++++++++++++++++++ .../flink/sink/TestDeltaTaskWriter.java | 1 + .../iceberg/flink/sink/TestTaskWriters.java | 1 + .../flink/source/TestProjectMetaColumn.java | 1 + 11 files changed, 403 insertions(+), 14 deletions(-) create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md index 1ca421e0fcd1..c6a2e0d49037 100644 --- a/docs/flink-getting-started.md +++ b/docs/flink-getting-started.md @@ -700,13 +700,16 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ ... ``` -| Flink option | Default | Description | -|------------------------| -------------------------- |------------------------------------------------------------------------------------------------------------| -| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | -| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | -| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled | -| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. | -| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode | +| Flink option | Default | Description | +|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | +| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | +| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled | +| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. | +| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode | +| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write | +| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write | +| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | ## Inspecting tables. diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java index 83fa09de544c..1f409fcfae59 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java @@ -139,7 +139,6 @@ public StringConfParser defaultValue(String value) { } public String parse() { - Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Function.identity(), defaultValue); } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 06528c97168b..ff295c8afa4b 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -93,6 +93,66 @@ public long targetDataFileSize() { .parse(); } + public String parquetCompressionCodec() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_CODEC.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC) + .tableProperty(TableProperties.PARQUET_COMPRESSION) + .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .parse(); + } + + public String parquetCompressionLevel() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_LEVEL.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL) + .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) + .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .parse(); + } + + public String avroCompressionCodec() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_CODEC.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC) + .tableProperty(TableProperties.AVRO_COMPRESSION) + .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .parse(); + } + + public String avroCompressionLevel() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_LEVEL.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL) + .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) + .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .parse(); + } + + public String orcCompressionCodec() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_CODEC.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC) + .tableProperty(TableProperties.ORC_COMPRESSION) + .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .parse(); + } + + public String orcCompressionStrategy() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_STRATEGY.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_STRATEGY) + .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY) + .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .parse(); + } + public DistributionMode distributionMode() { String modeName = confParser diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index a3091d5779c7..f3cc52972bfe 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -34,6 +34,18 @@ private FlinkWriteOptions() {} public static final ConfigOption TARGET_FILE_SIZE_BYTES = ConfigOptions.key("target-file-size-bytes").longType().noDefaultValue(); + // Overrides this table's write..compression-codec + public static final ConfigOption COMPRESSION_CODEC = + ConfigOptions.key("compression-codec").stringType().noDefaultValue(); + + // Overrides this table's write..compression-level + public static final ConfigOption COMPRESSION_LEVEL = + ConfigOptions.key("compression-level").stringType().noDefaultValue(); + + // Overrides this table's write..compression-strategy + public static final ConfigOption COMPRESSION_STRATEGY = + ConfigOptions.key("compression-strategy").stringType().noDefaultValue(); + // Overrides this table's write.upsert.enabled public static final ConfigOption WRITE_UPSERT_ENABLED = ConfigOptions.key("upsert-enabled").booleanType().noDefaultValue(); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 172484fc113a..e9b08d10cbde 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -18,6 +18,12 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import java.io.IOException; @@ -43,6 +49,7 @@ import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -564,14 +571,58 @@ static IcebergStreamWriter createStreamWriter( Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null"); Table serializableTable = SerializableTable.copyOf(table); + FileFormat format = flinkWriteConf.dataFileFormat(); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( serializableTable, flinkRowType, flinkWriteConf.targetDataFileSize(), - flinkWriteConf.dataFileFormat(), + format, + writeProperties(table, format, flinkWriteConf), equalityFieldIds, flinkWriteConf.upsertMode()); return new IcebergStreamWriter<>(table.name(), taskWriterFactory); } + + /** + * Based on the {@link FileFormat} overwrites the table level compression properties for the table + * write. + * + * @param table The table to get the table level settings + * @param format The FileFormat to use + * @param conf The write configuration + * @return The properties to use for writing + */ + private static Map writeProperties( + Table table, FileFormat format, FlinkWriteConf conf) { + Map writeProperties = Maps.newHashMap(table.properties()); + + String level; + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); + level = conf.parquetCompressionLevel(); + if (level != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, level); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); + level = conf.avroCompressionLevel(); + if (level != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + + return writeProperties; + } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 1c330434d019..634c2dfddaed 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.sink; import java.util.List; +import java.util.Map; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -57,6 +58,7 @@ public RowDataTaskWriterFactory( RowType flinkSchema, long targetFileSizeBytes, FileFormat format, + Map writeProperties, List equalityFieldIds, boolean upsert) { this.table = table; @@ -70,8 +72,7 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); + this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -82,7 +83,7 @@ public RowDataTaskWriterFactory( new FlinkAppenderFactory( schema, flinkSchema, - table.properties(), + writeProperties, spec, ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), @@ -92,7 +93,7 @@ public RowDataTaskWriterFactory( new FlinkAppenderFactory( schema, flinkSchema, - table.properties(), + writeProperties, spec, ArrayUtil.toIntArray(equalityFieldIds), schema, diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 1468879097de..23665b7c9f0f 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -77,7 +77,13 @@ public RowDataRewriter( RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); this.taskWriterFactory = new RowDataTaskWriterFactory( - SerializableTable.copyOf(table), flinkSchema, Long.MAX_VALUE, format, null, false); + SerializableTable.copyOf(table), + flinkSchema, + Long.MAX_VALUE, + format, + table.properties(), + null, + false); } public List rewriteDataForTasks( diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java new file mode 100644 index 000000000000..49f472b7325e --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.common.DynFields; +import org.apache.iceberg.flink.FlinkWriteConf; +import org.apache.iceberg.flink.FlinkWriteOptions; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestCompressionSettings { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + private Table table; + + private final Map initProperties; + + @Parameterized.Parameters(name = "tableProperties = {0}") + public static Object[] parameters() { + return new Object[] { + ImmutableMap.of(), + ImmutableMap.of( + TableProperties.AVRO_COMPRESSION, + "zstd", + TableProperties.AVRO_COMPRESSION_LEVEL, + "3", + TableProperties.PARQUET_COMPRESSION, + "zstd", + TableProperties.PARQUET_COMPRESSION_LEVEL, + "3", + TableProperties.ORC_COMPRESSION, + "zstd", + TableProperties.ORC_COMPRESSION_STRATEGY, + "compression") + }; + } + + public TestCompressionSettings(Map initProperties) { + this.initProperties = initProperties; + } + + @Before + public void before() throws IOException { + File folder = tempFolder.newFolder(); + table = SimpleDataUtil.createTable(folder.getAbsolutePath(), initProperties, false); + } + + @Test + public void testCompressionAvro() throws Exception { + // No override provided + Map resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "AVRO")); + + if (initProperties.get(TableProperties.AVRO_COMPRESSION) == null) { + Assert.assertEquals( + TableProperties.AVRO_COMPRESSION_DEFAULT, + resultProperties.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals( + TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT, + resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL)); + } else { + Assert.assertEquals( + initProperties.get(TableProperties.AVRO_COMPRESSION), + resultProperties.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals( + initProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL), + resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL)); + } + + // Override compression to snappy and some random level + resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of( + FlinkWriteOptions.WRITE_FORMAT.key(), + "AVRO", + FlinkWriteOptions.COMPRESSION_CODEC.key(), + "snappy", + FlinkWriteOptions.COMPRESSION_LEVEL.key(), + "6")); + + Assert.assertEquals("snappy", resultProperties.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals("6", resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL)); + } + + @Test + public void testCompressionParquet() throws Exception { + // No override provided + Map resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "PARQUET")); + + if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) { + Assert.assertEquals( + TableProperties.PARQUET_COMPRESSION_DEFAULT, + resultProperties.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals( + TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT, + resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + } else { + Assert.assertEquals( + initProperties.get(TableProperties.PARQUET_COMPRESSION), + resultProperties.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals( + initProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL), + resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + } + + // Override compression to snappy and some random level + resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of( + FlinkWriteOptions.WRITE_FORMAT.key(), + "PARQUET", + FlinkWriteOptions.COMPRESSION_CODEC.key(), + "snappy", + FlinkWriteOptions.COMPRESSION_LEVEL.key(), + "6")); + + Assert.assertEquals("snappy", resultProperties.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals("6", resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + } + + @Test + public void testCompressionOrc() throws Exception { + // No override provided + Map resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "ORC")); + + if (initProperties.get(TableProperties.ORC_COMPRESSION) == null) { + Assert.assertEquals( + TableProperties.ORC_COMPRESSION_DEFAULT, + resultProperties.get(TableProperties.ORC_COMPRESSION)); + Assert.assertEquals( + TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT, + resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY)); + } else { + Assert.assertEquals( + initProperties.get(TableProperties.ORC_COMPRESSION), + resultProperties.get(TableProperties.ORC_COMPRESSION)); + Assert.assertEquals( + initProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY), + resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY)); + } + + // Override compression to snappy and a different strategy + resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of( + FlinkWriteOptions.WRITE_FORMAT.key(), + "ORC", + FlinkWriteOptions.COMPRESSION_CODEC.key(), + "snappy", + FlinkWriteOptions.COMPRESSION_STRATEGY.key(), + "speed")); + + Assert.assertEquals("snappy", resultProperties.get(TableProperties.ORC_COMPRESSION)); + Assert.assertEquals("speed", resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY)); + } + + private static OneInputStreamOperatorTestHarness createIcebergStreamWriter( + Table icebergTable, TableSchema flinkSchema, Map override) throws Exception { + RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema); + FlinkWriteConf flinkWriteConfig = + new FlinkWriteConf( + icebergTable, override, new org.apache.flink.configuration.Configuration()); + + IcebergStreamWriter streamWriter = + FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); + + harness.setup(); + harness.open(); + + return harness; + } + + private static Map appenderProperties( + Table table, TableSchema schema, Map override) throws Exception { + try (OneInputStreamOperatorTestHarness testHarness = + createIcebergStreamWriter(table, schema, override)) { + testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); + + testHarness.prepareSnapshotPreBarrier(1L); + DynFields.BoundField operatorField = + DynFields.builder() + .hiddenImpl(testHarness.getOperatorFactory().getClass(), "operator") + .build(testHarness.getOperatorFactory()); + DynFields.BoundField writerField = + DynFields.builder() + .hiddenImpl(IcebergStreamWriter.class, "writer") + .build(operatorField.get()); + DynFields.BoundField appenderField = + DynFields.builder() + .hiddenImpl(BaseTaskWriter.class, "appenderFactory") + .build(writerField.get()); + DynFields.BoundField> propsField = + DynFields.builder() + .hiddenImpl(FlinkAppenderFactory.class, "props") + .build(appenderField.get()); + return propsField.get(); + } + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index 2e5e7121bb2b..1f8cbfe19152 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -345,6 +345,7 @@ private TaskWriterFactory createTaskWriterFactory(List equalit FlinkSchemaUtil.convert(table.schema()), 128 * 1024 * 1024, format, + table.properties(), equalityFieldIds, false); } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 04ec662fb8eb..112dbb511310 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -234,6 +234,7 @@ private TaskWriter createTaskWriter(long targetFileSize) { (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), targetFileSize, format, + table.properties(), null, false); taskWriterFactory.initialize(1, 1); diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java index bc63e4a0b282..25ecec23d216 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java @@ -176,6 +176,7 @@ private TaskWriter createTaskWriter( SimpleDataUtil.ROW_TYPE, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, format, + table.properties(), equalityFieldIds, upsert); From 7a6af5101615d263cd5cd8aa0c47fdc891a29b6b Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 26 Oct 2022 09:11:41 +0200 Subject: [PATCH 2/2] Review comments --- .../org/apache/iceberg/flink/FlinkConfParser.java | 1 + .../java/org/apache/iceberg/flink/FlinkWriteConf.java | 4 ++-- .../java/org/apache/iceberg/flink/sink/FlinkSink.java | 11 +++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java index 1f409fcfae59..83fa09de544c 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java @@ -139,6 +139,7 @@ public StringConfParser defaultValue(String value) { } public String parse() { + Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Function.identity(), defaultValue); } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index ff295c8afa4b..448b2aa2d8ef 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -110,7 +110,7 @@ public String parquetCompressionLevel() { .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL) .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) - .parse(); + .parseOptional(); } public String avroCompressionCodec() { @@ -130,7 +130,7 @@ public String avroCompressionLevel() { .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL) .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) - .parse(); + .parseOptional(); } public String orcCompressionCodec() { diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index e9b08d10cbde..81706e582413 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -597,20 +597,19 @@ private static Map writeProperties( Table table, FileFormat format, FlinkWriteConf conf) { Map writeProperties = Maps.newHashMap(table.properties()); - String level; switch (format) { case PARQUET: writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); - level = conf.parquetCompressionLevel(); - if (level != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, level); + String parquetCompressionLevel = conf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); } break; case AVRO: writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); - level = conf.avroCompressionLevel(); - if (level != null) { + String avroCompressionLevel = conf.avroCompressionLevel(); + if (avroCompressionLevel != null) { writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); }