Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions docs/flink-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -700,13 +700,16 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
```

| Flink option | Default | Description |
|------------------------| -------------------------- |------------------------------------------------------------------------------------------------------------|
| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
| Flink option | Default | Description |
|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------|
| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |


## Inspecting tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,66 @@ public long targetDataFileSize() {
.parse();
}

public String parquetCompressionCodec() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_CODEC.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
.tableProperty(TableProperties.PARQUET_COMPRESSION)
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
.parse();
}

public String parquetCompressionLevel() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

public String avroCompressionCodec() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_CODEC.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
.tableProperty(TableProperties.AVRO_COMPRESSION)
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
.parse();
}

public String avroCompressionLevel() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

public String orcCompressionCodec() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_CODEC.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
.tableProperty(TableProperties.ORC_COMPRESSION)
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
.parse();
}

public String orcCompressionStrategy() {
return confParser
.stringConf()
.option(FlinkWriteOptions.COMPRESSION_STRATEGY.key())
.flinkConfig(FlinkWriteOptions.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}

public DistributionMode distributionMode() {
String modeName =
confParser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ private FlinkWriteOptions() {}
public static final ConfigOption<Long> TARGET_FILE_SIZE_BYTES =
ConfigOptions.key("target-file-size-bytes").longType().noDefaultValue();

// Overrides this table's write.<FILE_FORMAT>.compression-codec
public static final ConfigOption<String> COMPRESSION_CODEC =
ConfigOptions.key("compression-codec").stringType().noDefaultValue();

// Overrides this table's write.<FILE_FORMAT>.compression-level
public static final ConfigOption<String> COMPRESSION_LEVEL =
ConfigOptions.key("compression-level").stringType().noDefaultValue();

// Overrides this table's write.<FILE_FORMAT>.compression-strategy
public static final ConfigOption<String> COMPRESSION_STRATEGY =
ConfigOptions.key("compression-strategy").stringType().noDefaultValue();

// Overrides this table's write.upsert.enabled
public static final ConfigOption<Boolean> WRITE_UPSERT_ENABLED =
ConfigOptions.key("upsert-enabled").booleanType().noDefaultValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;

import java.io.IOException;
Expand All @@ -43,6 +49,7 @@
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -564,14 +571,57 @@ static IcebergStreamWriter<RowData> createStreamWriter(
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");

Table serializableTable = SerializableTable.copyOf(table);
FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
flinkWriteConf.dataFileFormat(),
format,
writeProperties(table, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}

/**
* Based on the {@link FileFormat} overwrites the table level compression properties for the table
* write.
*
* @param table The table to get the table level settings
* @param format The FileFormat to use
* @param conf The write configuration
* @return The properties to use for writing
*/
private static Map<String, String> writeProperties(
Table table, FileFormat format, FlinkWriteConf conf) {
Map<String, String> writeProperties = Maps.newHashMap(table.properties());

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
String parquetCompressionLevel = conf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
String avroCompressionLevel = conf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use level in this line.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate when the blocks in the switch are not independent.
I refactored to use a different variable instead inside each block

}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}

return writeProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Map;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -57,6 +58,7 @@ public RowDataTaskWriterFactory(
RowType flinkSchema,
long targetFileSizeBytes,
FileFormat format,
Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
boolean upsert) {
this.table = table;
Expand All @@ -70,8 +72,7 @@ public RowDataTaskWriterFactory(
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory =
new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
Expand All @@ -82,7 +83,7 @@ public RowDataTaskWriterFactory(
new FlinkAppenderFactory(
schema,
flinkSchema,
table.properties(),
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)),
Expand All @@ -92,7 +93,7 @@ public RowDataTaskWriterFactory(
new FlinkAppenderFactory(
schema,
flinkSchema,
table.properties(),
writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ public RowDataRewriter(
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
this.taskWriterFactory =
new RowDataTaskWriterFactory(
SerializableTable.copyOf(table), flinkSchema, Long.MAX_VALUE, format, null, false);
SerializableTable.copyOf(table),
flinkSchema,
Long.MAX_VALUE,
format,
table.properties(),
null,
false);
}

public List<DataFile> rewriteDataForTasks(
Expand Down
Loading