diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md index 3a91f211769f..f94efdcc58f4 100644 --- a/docs/spark-configuration.md +++ b/docs/spark-configuration.md @@ -194,6 +194,9 @@ df.write | check-ordering | true | Checks if input schema and table schema are same | | isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. | | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. | +| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write | +| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write | +| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | CommitMetadata provides an interface to add custom metadata to a snapshot summary during a SQL execution, which can be beneficial for purposes such as auditing or change tracking. If properties start with `snapshot-property.`, then that prefix will be removed from each property. Here is an example: diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 095d408f17d1..0c08587b4b2e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -63,4 +63,9 @@ private SparkSQLProperties() {} // Controls the WAP branch used for write-audit-publish workflow. // When set, new snapshots will be committed to this branch. public static final String WAP_BRANCH = "spark.wap.branch"; + + // Controls write compress options + public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; + public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level"; + public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy"; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index b1a35090d9d5..ec9825d40bbe 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -21,6 +21,12 @@ import static org.apache.iceberg.DistributionMode.HASH; import static org.apache.iceberg.DistributionMode.NONE; import static org.apache.iceberg.DistributionMode.RANGE; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import java.util.Locale; import java.util.Map; @@ -418,4 +424,96 @@ public String branch() { return branch; } + + public String parquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.PARQUET_COMPRESSION) + .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .parse(); + } + + public String parquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) + .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + public String avroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.AVRO_COMPRESSION) + .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .parse(); + } + + public String avroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) + .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + public String orcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.ORC_COMPRESSION) + .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .parse(); + } + + public String orcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY) + .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .parse(); + } + + public Map writeProperties(FileFormat format) { + Map writeProperties = Maps.newHashMap(); + + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); + } + break; + + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; + + default: + // skip + } + + return writeProperties; + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 91f8ce3b3183..f8cb2c5a1942 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -80,4 +80,9 @@ private SparkWriteOptions() {} // Isolation Level for DataFrame calls. Currently supported by overwritePartitions public static final String ISOLATION_LEVEL = "isolation-level"; + + // Controls write compress options + public static final String COMPRESSION_CODEC = "compression-codec"; + public static final String COMPRESSION_LEVEL = "compression-level"; + public static final String COMPRESSION_STRATEGY = "compression-strategy"; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index d90dc5dafc59..9b075b675565 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -34,6 +34,7 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroWriter; import org.apache.iceberg.spark.data.SparkOrcWriter; @@ -47,6 +48,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { private StructType dataSparkType; private StructType equalityDeleteSparkType; private StructType positionDeleteSparkType; + private Map writeProperties; SparkFileWriterFactory( Table table, @@ -60,7 +62,8 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { StructType equalityDeleteSparkType, SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, - StructType positionDeleteSparkType) { + StructType positionDeleteSparkType, + Map writeProperties) { super( table, @@ -76,6 +79,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { this.dataSparkType = dataSparkType; this.equalityDeleteSparkType = equalityDeleteSparkType; this.positionDeleteSparkType = positionDeleteSparkType; + this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); } static Builder builderFor(Table table) { @@ -85,11 +89,13 @@ static Builder builderFor(Table table) { @Override protected void configureDataWrite(Avro.DataWriteBuilder builder) { builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); + builder.setAll(writeProperties); } @Override protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); + builder.setAll(writeProperties); } @Override @@ -102,17 +108,21 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); } + + builder.setAll(writeProperties); } @Override protected void configureDataWrite(Parquet.DataWriteBuilder builder) { builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); + builder.setAll(writeProperties); } @Override protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); + builder.setAll(writeProperties); } @Override @@ -120,22 +130,26 @@ protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); builder.transformPaths(path -> UTF8String.fromString(path.toString())); + builder.setAll(writeProperties); } @Override protected void configureDataWrite(ORC.DataWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); + builder.setAll(writeProperties); } @Override protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); + builder.setAll(writeProperties); } @Override protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); builder.transformPaths(path -> UTF8String.fromString(path.toString())); + builder.setAll(writeProperties); } private StructType dataSparkType() { @@ -180,6 +194,7 @@ static class Builder { private SortOrder equalityDeleteSortOrder; private Schema positionDeleteRowSchema; private StructType positionDeleteSparkType; + private Map writeProperties; Builder(Table table) { this.table = table; @@ -250,6 +265,11 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) { return this; } + Builder writeProperties(Map properties) { + this.writeProperties = properties; + return this; + } + SparkFileWriterFactory build() { boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null; boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null; @@ -269,7 +289,8 @@ SparkFileWriterFactory build() { equalityDeleteSparkType, equalityDeleteSortOrder, positionDeleteRowSchema, - positionDeleteSparkType); + positionDeleteSparkType, + writeProperties); } } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 9835a73c4324..152279daaf72 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -74,6 +75,7 @@ public class SparkPositionDeletesRewrite implements Write { private final String fileSetId; private final int specId; private final StructLike partition; + private final Map writeProperties; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -106,6 +108,7 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; + this.writeProperties = writeConf.writeProperties(format); } @Override @@ -129,7 +132,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { writeSchema, dsSchema, specId, - partition); + partition, + writeProperties); } @Override @@ -174,6 +178,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final StructType dsSchema; private final int specId; private final StructLike partition; + private final Map writeProperties; PositionDeletesWriterFactory( Broadcast tableBroadcast, @@ -183,7 +188,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { Schema writeSchema, StructType dsSchema, int specId, - StructLike partition) { + StructLike partition, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -192,6 +198,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.dsSchema = dsSchema; this.specId = specId; this.partition = partition; + this.writeProperties = writeProperties; } @Override @@ -219,6 +226,7 @@ public DataWriter createWriter(int partitionId, long taskId) { SparkFileWriterFactory.builderFor(table) .deleteFileFormat(format) .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .writeProperties(writeProperties) .build(); return new DeleteWriter( diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 9759d25eb9ac..db184bf48ba9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -104,6 +104,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Context context; private boolean cleanupOnAbort = true; + private final Map writeProperties; SparkPositionDeltaWrite( SparkSession spark, @@ -126,6 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); + this.writeProperties = writeConf.writeProperties(context.dataFileFormat); } @Override @@ -155,7 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context); + return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); } @Override @@ -326,11 +328,17 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast
tableBroadcast; private final Command command; private final Context context; + private final Map writeProperties; - PositionDeltaWriteFactory(Broadcast
tableBroadcast, Command command, Context context) { + PositionDeltaWriteFactory( + Broadcast
tableBroadcast, + Command command, + Context context, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; + this.writeProperties = writeProperties; } @Override @@ -356,6 +364,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .dataSparkType(context.dataSparkType()) .deleteFileFormat(context.deleteFileFormat()) .positionDeleteSparkType(context.deleteSparkType()) + .writeProperties(writeProperties) .build(); if (command == DELETE) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index b10340d62249..d4a4f22bfd49 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -100,6 +100,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final Map extraSnapshotMetadata; private final boolean partitionedFanoutEnabled; private final SparkWriteRequirements writeRequirements; + private final Map writeProperties; private boolean cleanupOnAbort = true; @@ -128,6 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); + this.writeProperties = writeConf.writeProperties(format); } @Override @@ -186,7 +188,8 @@ private WriterFactory createWriterFactory() { targetFileSize, writeSchema, dsSchema, - partitionedFanoutEnabled); + partitionedFanoutEnabled, + writeProperties); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -616,6 +619,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final StructType dsSchema; private final boolean partitionedFanoutEnabled; private final String queryId; + private final Map writeProperties; protected WriterFactory( Broadcast
tableBroadcast, @@ -625,7 +629,8 @@ protected WriterFactory( long targetFileSize, Schema writeSchema, StructType dsSchema, - boolean partitionedFanoutEnabled) { + boolean partitionedFanoutEnabled, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.format = format; this.outputSpecId = outputSpecId; @@ -634,6 +639,7 @@ protected WriterFactory( this.dsSchema = dsSchema; this.partitionedFanoutEnabled = partitionedFanoutEnabled; this.queryId = queryId; + this.writeProperties = writeProperties; } @Override @@ -657,6 +663,7 @@ public DataWriter createWriter(int partitionId, long taskId, long e .dataFileFormat(format) .dataSchema(writeSchema) .dataSparkType(dsSchema) + .writeProperties(writeProperties) .build(); if (spec.isUnpartitioned()) { diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java new file mode 100644 index 000000000000..cc3aa9121b3a --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DELETE_MODE; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_STRATEGY; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +import java.util.List; +import java.util.Map; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AvroFSInput; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.SizeBasedFileRewriter; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.assertj.core.api.Assertions; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestCompressionSettings extends SparkCatalogTestBase { + + private static final Configuration CONF = new Configuration(); + private static final String tableName = "testWriteData"; + + private static SparkSession spark = null; + + private final FileFormat format; + private final ImmutableMap properties; + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Parameterized.Parameters(name = "format = {0}, properties = {1}") + public static Object[][] parameters() { + return new Object[][] { + {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")}, + {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")}, + {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")}, + {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")}, + {"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")} + }; + } + + @BeforeClass + public static void startSpark() { + TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @Parameterized.AfterParam + public static void clearSourceCache() { + spark.sql(String.format("DROP TABLE IF EXISTS %s", tableName)); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestCompressionSettings.spark; + TestCompressionSettings.spark = null; + currentSpark.stop(); + } + + public TestCompressionSettings(String format, ImmutableMap properties) { + super( + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties()); + this.format = FileFormat.fromString(format); + this.properties = properties; + } + + @Test + public void testWriteDataWithDifferentSetting() throws Exception { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + Map tableProperties = Maps.newHashMap(); + tableProperties.put(PARQUET_COMPRESSION, "gzip"); + tableProperties.put(AVRO_COMPRESSION, "gzip"); + tableProperties.put(ORC_COMPRESSION, "zlib"); + tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName()); + tableProperties.put(FORMAT_VERSION, "2"); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, format); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, DELETE_DEFAULT_FILE_FORMAT, format); + for (Map.Entry entry : tableProperties.entrySet()) { + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, entry.getKey(), entry.getValue()); + } + + List expectedOrigin = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + expectedOrigin.add(new SimpleRecord(i, "hello world" + i)); + } + + Dataset df = spark.createDataFrame(expectedOrigin, SimpleRecord.class); + + for (Map.Entry entry : properties.entrySet()) { + spark.conf().set(entry.getKey(), entry.getValue()); + } + + df.select("id", "data") + .writeTo(tableName) + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .append(); + Table table = catalog.loadTable(TableIdentifier.of("default", tableName)); + List manifestFiles = table.currentSnapshot().dataManifests(table.io()); + try (ManifestReader reader = ManifestFiles.read(manifestFiles.get(0), table.io())) { + DataFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } + + sql("DELETE from %s where id < 100", tableName); + + table.refresh(); + List deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + Map specMap = Maps.newHashMap(); + specMap.put(0, PartitionSpec.unpartitioned()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } + + if (PARQUET.equals(format)) { + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + table.refresh(); + deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } + } + } + + private String getCompressionType(InputFile inputFile) throws Exception { + switch (format) { + case ORC: + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(CONF).useUTCTimestamp(true); + Reader orcReader = OrcFile.createReader(new Path(inputFile.location()), readerOptions); + return orcReader.getCompressionKind().name(); + case PARQUET: + ParquetMetadata footer = + ParquetFileReader.readFooter(CONF, new Path(inputFile.location()), NO_FILTER); + return footer.getBlocks().get(0).getColumns().get(0).getCodec().name(); + default: + FileContext fc = FileContext.getFileContext(CONF); + GenericDatumReader reader = new GenericDatumReader(); + DataFileReader fileReader = + (DataFileReader) + DataFileReader.openReader( + new AvroFSInput(fc, new Path(inputFile.location())), reader); + return fileReader.getMetaString(DataFileConstants.CODEC); + } + } +}