diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 06528c97168b..448b2aa2d8ef 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -93,6 +93,66 @@ public long targetDataFileSize() { .parse(); } + public String parquetCompressionCodec() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_CODEC.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC) + .tableProperty(TableProperties.PARQUET_COMPRESSION) + .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .parse(); + } + + public String parquetCompressionLevel() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_LEVEL.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL) + .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) + .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + public String avroCompressionCodec() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_CODEC.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC) + .tableProperty(TableProperties.AVRO_COMPRESSION) + .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .parse(); + } + + public String avroCompressionLevel() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_LEVEL.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL) + .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) + .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + public String orcCompressionCodec() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_CODEC.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC) + .tableProperty(TableProperties.ORC_COMPRESSION) + .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .parse(); + } + + public String orcCompressionStrategy() { + return confParser + .stringConf() + .option(FlinkWriteOptions.COMPRESSION_STRATEGY.key()) + .flinkConfig(FlinkWriteOptions.COMPRESSION_STRATEGY) + .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY) + .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .parse(); + } + public DistributionMode distributionMode() { String modeName = confParser diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index a3091d5779c7..f3cc52972bfe 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -34,6 +34,18 @@ private FlinkWriteOptions() {} public static final ConfigOption TARGET_FILE_SIZE_BYTES = ConfigOptions.key("target-file-size-bytes").longType().noDefaultValue(); + // Overrides this table's write..compression-codec + public static final ConfigOption COMPRESSION_CODEC = + ConfigOptions.key("compression-codec").stringType().noDefaultValue(); + + // Overrides this table's write..compression-level + public static final ConfigOption COMPRESSION_LEVEL = + ConfigOptions.key("compression-level").stringType().noDefaultValue(); + + // Overrides this table's write..compression-strategy + public static final ConfigOption COMPRESSION_STRATEGY = + ConfigOptions.key("compression-strategy").stringType().noDefaultValue(); + // Overrides this table's write.upsert.enabled public static final ConfigOption WRITE_UPSERT_ENABLED = ConfigOptions.key("upsert-enabled").booleanType().noDefaultValue(); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index b10dc278fb65..ead0b757e583 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -18,6 +18,12 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import java.io.IOException; @@ -43,6 +49,7 @@ import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -567,14 +574,57 @@ static IcebergStreamWriter createStreamWriter( Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null"); Table serializableTable = SerializableTable.copyOf(table); + FileFormat format = flinkWriteConf.dataFileFormat(); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( serializableTable, flinkRowType, flinkWriteConf.targetDataFileSize(), - flinkWriteConf.dataFileFormat(), + format, + writeProperties(table, format, flinkWriteConf), equalityFieldIds, flinkWriteConf.upsertMode()); return new IcebergStreamWriter<>(table.name(), taskWriterFactory); } + + /** + * Based on the {@link FileFormat} overwrites the table level compression properties for the table + * write. + * + * @param table The table to get the table level settings + * @param format The FileFormat to use + * @param conf The write configuration + * @return The properties to use for writing + */ + private static Map writeProperties( + Table table, FileFormat format, FlinkWriteConf conf) { + Map writeProperties = Maps.newHashMap(table.properties()); + + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); + String parquetCompressionLevel = conf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); + String avroCompressionLevel = conf.avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + + return writeProperties; + } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 1c330434d019..634c2dfddaed 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.sink; import java.util.List; +import java.util.Map; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -57,6 +58,7 @@ public RowDataTaskWriterFactory( RowType flinkSchema, long targetFileSizeBytes, FileFormat format, + Map writeProperties, List equalityFieldIds, boolean upsert) { this.table = table; @@ -70,8 +72,7 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); + this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -82,7 +83,7 @@ public RowDataTaskWriterFactory( new FlinkAppenderFactory( schema, flinkSchema, - table.properties(), + writeProperties, spec, ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), @@ -92,7 +93,7 @@ public RowDataTaskWriterFactory( new FlinkAppenderFactory( schema, flinkSchema, - table.properties(), + writeProperties, spec, ArrayUtil.toIntArray(equalityFieldIds), schema, diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 1468879097de..23665b7c9f0f 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -77,7 +77,13 @@ public RowDataRewriter( RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); this.taskWriterFactory = new RowDataTaskWriterFactory( - SerializableTable.copyOf(table), flinkSchema, Long.MAX_VALUE, format, null, false); + SerializableTable.copyOf(table), + flinkSchema, + Long.MAX_VALUE, + format, + table.properties(), + null, + false); } public List rewriteDataForTasks( diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java new file mode 100644 index 000000000000..49f472b7325e --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.common.DynFields; +import org.apache.iceberg.flink.FlinkWriteConf; +import org.apache.iceberg.flink.FlinkWriteOptions; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestCompressionSettings { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + private Table table; + + private final Map initProperties; + + @Parameterized.Parameters(name = "tableProperties = {0}") + public static Object[] parameters() { + return new Object[] { + ImmutableMap.of(), + ImmutableMap.of( + TableProperties.AVRO_COMPRESSION, + "zstd", + TableProperties.AVRO_COMPRESSION_LEVEL, + "3", + TableProperties.PARQUET_COMPRESSION, + "zstd", + TableProperties.PARQUET_COMPRESSION_LEVEL, + "3", + TableProperties.ORC_COMPRESSION, + "zstd", + TableProperties.ORC_COMPRESSION_STRATEGY, + "compression") + }; + } + + public TestCompressionSettings(Map initProperties) { + this.initProperties = initProperties; + } + + @Before + public void before() throws IOException { + File folder = tempFolder.newFolder(); + table = SimpleDataUtil.createTable(folder.getAbsolutePath(), initProperties, false); + } + + @Test + public void testCompressionAvro() throws Exception { + // No override provided + Map resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "AVRO")); + + if (initProperties.get(TableProperties.AVRO_COMPRESSION) == null) { + Assert.assertEquals( + TableProperties.AVRO_COMPRESSION_DEFAULT, + resultProperties.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals( + TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT, + resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL)); + } else { + Assert.assertEquals( + initProperties.get(TableProperties.AVRO_COMPRESSION), + resultProperties.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals( + initProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL), + resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL)); + } + + // Override compression to snappy and some random level + resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of( + FlinkWriteOptions.WRITE_FORMAT.key(), + "AVRO", + FlinkWriteOptions.COMPRESSION_CODEC.key(), + "snappy", + FlinkWriteOptions.COMPRESSION_LEVEL.key(), + "6")); + + Assert.assertEquals("snappy", resultProperties.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals("6", resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL)); + } + + @Test + public void testCompressionParquet() throws Exception { + // No override provided + Map resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "PARQUET")); + + if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) { + Assert.assertEquals( + TableProperties.PARQUET_COMPRESSION_DEFAULT, + resultProperties.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals( + TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT, + resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + } else { + Assert.assertEquals( + initProperties.get(TableProperties.PARQUET_COMPRESSION), + resultProperties.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals( + initProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL), + resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + } + + // Override compression to snappy and some random level + resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of( + FlinkWriteOptions.WRITE_FORMAT.key(), + "PARQUET", + FlinkWriteOptions.COMPRESSION_CODEC.key(), + "snappy", + FlinkWriteOptions.COMPRESSION_LEVEL.key(), + "6")); + + Assert.assertEquals("snappy", resultProperties.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals("6", resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + } + + @Test + public void testCompressionOrc() throws Exception { + // No override provided + Map resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "ORC")); + + if (initProperties.get(TableProperties.ORC_COMPRESSION) == null) { + Assert.assertEquals( + TableProperties.ORC_COMPRESSION_DEFAULT, + resultProperties.get(TableProperties.ORC_COMPRESSION)); + Assert.assertEquals( + TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT, + resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY)); + } else { + Assert.assertEquals( + initProperties.get(TableProperties.ORC_COMPRESSION), + resultProperties.get(TableProperties.ORC_COMPRESSION)); + Assert.assertEquals( + initProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY), + resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY)); + } + + // Override compression to snappy and a different strategy + resultProperties = + appenderProperties( + table, + SimpleDataUtil.FLINK_SCHEMA, + ImmutableMap.of( + FlinkWriteOptions.WRITE_FORMAT.key(), + "ORC", + FlinkWriteOptions.COMPRESSION_CODEC.key(), + "snappy", + FlinkWriteOptions.COMPRESSION_STRATEGY.key(), + "speed")); + + Assert.assertEquals("snappy", resultProperties.get(TableProperties.ORC_COMPRESSION)); + Assert.assertEquals("speed", resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY)); + } + + private static OneInputStreamOperatorTestHarness createIcebergStreamWriter( + Table icebergTable, TableSchema flinkSchema, Map override) throws Exception { + RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema); + FlinkWriteConf flinkWriteConfig = + new FlinkWriteConf( + icebergTable, override, new org.apache.flink.configuration.Configuration()); + + IcebergStreamWriter streamWriter = + FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); + + harness.setup(); + harness.open(); + + return harness; + } + + private static Map appenderProperties( + Table table, TableSchema schema, Map override) throws Exception { + try (OneInputStreamOperatorTestHarness testHarness = + createIcebergStreamWriter(table, schema, override)) { + testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); + + testHarness.prepareSnapshotPreBarrier(1L); + DynFields.BoundField operatorField = + DynFields.builder() + .hiddenImpl(testHarness.getOperatorFactory().getClass(), "operator") + .build(testHarness.getOperatorFactory()); + DynFields.BoundField writerField = + DynFields.builder() + .hiddenImpl(IcebergStreamWriter.class, "writer") + .build(operatorField.get()); + DynFields.BoundField appenderField = + DynFields.builder() + .hiddenImpl(BaseTaskWriter.class, "appenderFactory") + .build(writerField.get()); + DynFields.BoundField> propsField = + DynFields.builder() + .hiddenImpl(FlinkAppenderFactory.class, "props") + .build(appenderField.get()); + return propsField.get(); + } + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index 2e5e7121bb2b..1f8cbfe19152 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -345,6 +345,7 @@ private TaskWriterFactory createTaskWriterFactory(List equalit FlinkSchemaUtil.convert(table.schema()), 128 * 1024 * 1024, format, + table.properties(), equalityFieldIds, false); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index d83dd8530f98..c56a348e7445 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -237,6 +237,7 @@ private TaskWriter createTaskWriter(long targetFileSize) { (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), targetFileSize, format, + table.properties(), null, false); taskWriterFactory.initialize(1, 1); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java index bc63e4a0b282..25ecec23d216 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java @@ -176,6 +176,7 @@ private TaskWriter createTaskWriter( SimpleDataUtil.ROW_TYPE, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, format, + table.properties(), equalityFieldIds, upsert);