Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
import static org.apache.iceberg.DistributionMode.RANGE;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
Expand Down Expand Up @@ -425,7 +431,107 @@ public String branch() {
return branch;
}

public String parquetCompressionCodec() {
public Map<String, String> writeProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
writeProperties.putAll(dataWriteProperties());
writeProperties.putAll(deleteWriteProperties());
return writeProperties;
}

private Map<String, String> dataWriteProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
FileFormat dataFormat = dataFileFormat();

switch (dataFormat) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
String parquetCompressionLevel = parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
break;

case AVRO:
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
String avroCompressionLevel = avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel);
}
break;

case ORC:
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
}

private Map<String, String> deleteWriteProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
FileFormat deleteFormat = deleteFileFormat();

switch (deleteFormat) {
case PARQUET:
setWritePropertyWithFallback(
writeProperties,
DELETE_PARQUET_COMPRESSION,
deleteParquetCompressionCodec(),
parquetCompressionCodec());
setWritePropertyWithFallback(
writeProperties,
DELETE_PARQUET_COMPRESSION_LEVEL,
deleteParquetCompressionLevel(),
parquetCompressionLevel());
break;

case AVRO:
setWritePropertyWithFallback(
writeProperties,
DELETE_AVRO_COMPRESSION,
deleteAvroCompressionCodec(),
avroCompressionCodec());
setWritePropertyWithFallback(
writeProperties,
DELETE_AVRO_COMPRESSION_LEVEL,
deleteAvroCompressionLevel(),
avroCompressionLevel());
break;

case ORC:
setWritePropertyWithFallback(
writeProperties,
DELETE_ORC_COMPRESSION,
deleteOrcCompressionCodec(),
orcCompressionCodec());
setWritePropertyWithFallback(
writeProperties,
DELETE_ORC_COMPRESSION_STRATEGY,
deleteOrcCompressionStrategy(),
orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
}

private void setWritePropertyWithFallback(
Map<String, String> writeProperties, String key, String value, String fallbackValue) {
if (value != null) {
writeProperties.put(key, value);
} else if (fallbackValue != null) {
writeProperties.put(key, fallbackValue);
}
}

private String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
Expand All @@ -435,7 +541,16 @@ public String parquetCompressionCodec() {
.parse();
}

public String parquetCompressionLevel() {
private String deleteParquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_PARQUET_COMPRESSION)
.parseOptional();
}

private String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
Expand All @@ -445,7 +560,16 @@ public String parquetCompressionLevel() {
.parseOptional();
}

public String avroCompressionCodec() {
private String deleteParquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL)
.parseOptional();
}

private String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
Expand All @@ -455,7 +579,16 @@ public String avroCompressionCodec() {
.parse();
}

public String avroCompressionLevel() {
private String deleteAvroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_AVRO_COMPRESSION)
.parseOptional();
}

private String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
Expand All @@ -465,7 +598,16 @@ public String avroCompressionLevel() {
.parseOptional();
}

public String orcCompressionCodec() {
private String deleteAvroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(DELETE_AVRO_COMPRESSION_LEVEL)
.parseOptional();
}

private String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
Expand All @@ -475,7 +617,16 @@ public String orcCompressionCodec() {
.parse();
}

public String orcCompressionStrategy() {
private String deleteOrcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_ORC_COMPRESSION)
.parseOptional();
}

private String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
Expand All @@ -485,35 +636,12 @@ public String orcCompressionStrategy() {
.parse();
}

public Map<String, String> writeProperties(FileFormat format) {
Map<String, String> writeProperties = Maps.newHashMap();

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
String parquetCompressionLevel = parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
break;

case AVRO:
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
String avroCompressionLevel = avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel());
}
break;

case ORC:
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
private String deleteOrcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
.parseOptional();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.fileSetId = writeConf.rewrittenFileSetId();
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties(format);
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand Down Expand Up @@ -221,6 +221,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
.deleteFileFormat(format)
.positionDeleteRowSchema(positionDeleteRowSchema)
.positionDeleteSparkType(deleteSparkType)
.writeProperties(writeProperties)
.build();
SparkFileWriterFactory writerFactoryWithoutRow =
SparkFileWriterFactory.builderFor(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties(context.dataFileFormat);
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
this.writeProperties = writeConf.writeProperties(format);
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand Down
Loading