Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,66 @@ public long targetDataFileSize() {
.parse();
}

public String parquetCompressionCodec() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_CODEC.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
.tableProperty(TableProperties.PARQUET_COMPRESSION)
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
.parse();
}

public String parquetCompressionLevel() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

public String avroCompressionCodec() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_CODEC.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
.tableProperty(TableProperties.AVRO_COMPRESSION)
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
.parse();
}

public String avroCompressionLevel() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

public String orcCompressionCodec() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_CODEC.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
.tableProperty(TableProperties.ORC_COMPRESSION)
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
.parse();
}

public String orcCompressionStrategy() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_STRATEGY.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}

public DistributionMode distributionMode() {
String modeName =
confParser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ private FlinkWriteOptions() {}
public static final ConfigOption<Long> TARGET_FILE_SIZE_BYTES =
ConfigOptions.key("target-file-size-bytes").longType().noDefaultValue();

// Overrides this table's write.<FILE_FORMAT>.compression-codec
public static final ConfigOption<String> COMPRESSION_CODEC =
ConfigOptions.key("compression-codec").stringType().noDefaultValue();

// Overrides this table's write.<FILE_FORMAT>.compression-level
public static final ConfigOption<String> COMPRESSION_LEVEL =
ConfigOptions.key("compression-level").stringType().noDefaultValue();

// Overrides this table's write.<FILE_FORMAT>.compression-strategy
public static final ConfigOption<String> COMPRESSION_STRATEGY =
ConfigOptions.key("compression-strategy").stringType().noDefaultValue();

// Overrides this table's write.upsert.enabled
public static final ConfigOption<Boolean> WRITE_UPSERT_ENABLED =
ConfigOptions.key("upsert-enabled").booleanType().noDefaultValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;

import java.io.IOException;
Expand All @@ -43,6 +49,7 @@
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -567,14 +574,57 @@ static IcebergStreamWriter<RowData> createStreamWriter(
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");

Table serializableTable = SerializableTable.copyOf(table);
FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
flinkWriteConf.dataFileFormat(),
format,
writeProperties(table, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}

/**
* Based on the {@link FileFormat} overwrites the table level compression properties for the table
* write.
*
* @param table The table to get the table level settings
* @param format The FileFormat to use
* @param conf The write configuration
* @return The properties to use for writing
*/
private static Map<String, String> writeProperties(
Table table, FileFormat format, FlinkWriteConf conf) {
Map<String, String> writeProperties = Maps.newHashMap(table.properties());

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
String parquetCompressionLevel = conf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
String avroCompressionLevel = conf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}

return writeProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Map;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -57,6 +58,7 @@ public RowDataTaskWriterFactory(
RowType flinkSchema,
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
boolean upsert) {
this.table = table;
Expand All @@ -70,8 +72,7 @@ public RowDataTaskWriterFactory(
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory =
new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
Expand All @@ -82,7 +83,7 @@ public RowDataTaskWriterFactory(
new FlinkAppenderFactory(
schema,
flinkSchema,
table.properties(),
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)),
Expand All @@ -92,7 +93,7 @@ public RowDataTaskWriterFactory(
new FlinkAppenderFactory(
schema,
flinkSchema,
table.properties(),
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ public RowDataRewriter(
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
this.taskWriterFactory =
new RowDataTaskWriterFactory(
SerializableTable.copyOf(table), flinkSchema, Long.MAX_VALUE, format, null, false);
SerializableTable.copyOf(table),
flinkSchema,
Long.MAX_VALUE,
format,
table.properties(),
null,
false);
}

public List<DataFile> rewriteDataForTasks(
Expand Down
Loading