From bdf359c5a150e343e458a74f567ec03ff9136922 Mon Sep 17 00:00:00 2001 From: roryqi Date: Sun, 13 Aug 2023 16:02:04 +0800 Subject: [PATCH 01/18] Spark 3.4: Add write options to override the compression properties of the Table --- .../iceberg/spark/SparkSQLProperties.java | 5 ++ .../apache/iceberg/spark/SparkWriteConf.java | 60 +++++++++++++++++++ .../iceberg/spark/SparkWriteOptions.java | 6 ++ .../spark/source/SparkFileWriterFactory.java | 29 ++++++++- .../iceberg/spark/source/SparkWrite.java | 42 ++++++++++++- 5 files changed, 138 insertions(+), 4 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 095d408f17d1..660b90291f51 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -63,4 +63,9 @@ private SparkSQLProperties() {} // Controls the WAP branch used for write-audit-publish workflow. // When set, new snapshots will be committed to this branch. public static final String WAP_BRANCH = "spark.wap.branch"; + + // Controls write compress options + public static final String COMPRESSION_CODEC = "spark.sql.iceberg.write.compression-codec"; + public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.write.compression-level"; + public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.write.compression-strategy"; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index b1a35090d9d5..85d1412b3e90 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -418,4 +418,64 @@ public String branch() { return branch; } + + public String parquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.PARQUET_COMPRESSION) + .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .parse(); + } + + public String parquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) + .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + public String avroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.AVRO_COMPRESSION) + .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .parse(); + } + + public String avroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) + .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + public String orcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.ORC_COMPRESSION) + .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .parse(); + } + + public String orcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY) + .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .parse(); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 91f8ce3b3183..f4114f35d2b0 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -80,4 +80,10 @@ private SparkWriteOptions() {} // Isolation Level for DataFrame calls. Currently supported by overwritePartitions public static final String ISOLATION_LEVEL = "isolation-level"; + + + // Controls write compress options + public static final String COMPRESSION_CODEC = "compression-codec"; + public static final String COMPRESSION_LEVEL = "compression-level"; + public static final String COMPRESSION_STRATEGY = "compression-strategy"; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index d90dc5dafc59..fe041b6f43dc 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -34,6 +34,7 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroWriter; import org.apache.iceberg.spark.data.SparkOrcWriter; @@ -47,6 +48,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { private StructType dataSparkType; private StructType equalityDeleteSparkType; private StructType positionDeleteSparkType; + private Map writeProperties; SparkFileWriterFactory( Table table, @@ -60,7 +62,8 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { StructType equalityDeleteSparkType, SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, - StructType positionDeleteSparkType) { + StructType positionDeleteSparkType, + Map writeProperties) { super( table, @@ -76,6 +79,12 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { this.dataSparkType = dataSparkType; this.equalityDeleteSparkType = equalityDeleteSparkType; this.positionDeleteSparkType = positionDeleteSparkType; + + if (writeProperties != null) { + this.writeProperties = writeProperties; + } else { + this.writeProperties = ImmutableMap.of(); + } } static Builder builderFor(Table table) { @@ -85,11 +94,13 @@ static Builder builderFor(Table table) { @Override protected void configureDataWrite(Avro.DataWriteBuilder builder) { builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); + builder.setAll(writeProperties); } @Override protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); + builder.setAll(writeProperties); } @Override @@ -102,17 +113,20 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); } + builder.setAll(writeProperties); } @Override protected void configureDataWrite(Parquet.DataWriteBuilder builder) { builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); + builder.setAll(writeProperties); } @Override protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); + builder.setAll(writeProperties); } @Override @@ -120,22 +134,26 @@ protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); builder.transformPaths(path -> UTF8String.fromString(path.toString())); + builder.setAll(writeProperties); } @Override protected void configureDataWrite(ORC.DataWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); + builder.setAll(writeProperties); } @Override protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); + builder.setAll(writeProperties); } @Override protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); builder.transformPaths(path -> UTF8String.fromString(path.toString())); + builder.setAll(writeProperties); } private StructType dataSparkType() { @@ -180,6 +198,7 @@ static class Builder { private SortOrder equalityDeleteSortOrder; private Schema positionDeleteRowSchema; private StructType positionDeleteSparkType; + private Map writeProperties; Builder(Table table) { this.table = table; @@ -250,6 +269,11 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) { return this; } + Builder writeProperties(Map properties) { + this.writeProperties = properties; + return this; + } + SparkFileWriterFactory build() { boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null; boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null; @@ -269,7 +293,8 @@ SparkFileWriterFactory build() { equalityDeleteSparkType, equalityDeleteSortOrder, positionDeleteRowSchema, - positionDeleteSparkType); + positionDeleteSparkType, + writeProperties); } } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index b10340d62249..f93097408a3e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -20,6 +20,12 @@ import static org.apache.iceberg.IsolationLevel.SERIALIZABLE; import static org.apache.iceberg.IsolationLevel.SNAPSHOT; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import java.io.IOException; import java.util.Arrays; @@ -54,6 +60,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; @@ -178,6 +185,32 @@ private WriterFactory createWriterFactory() { // broadcast the table metadata as the writer factory will be sent to executors Broadcast tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + Map writeProperties = Maps.newHashMap(); + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, writeConf.parquetCompressionCodec()); + String parquetCompressionLevel = writeConf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, writeConf.avroCompressionCodec()); + String avroCompressionLevel = writeConf.avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, writeConf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, writeConf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, writeConf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + return new WriterFactory( tableBroadcast, queryId, @@ -186,7 +219,8 @@ private WriterFactory createWriterFactory() { targetFileSize, writeSchema, dsSchema, - partitionedFanoutEnabled); + partitionedFanoutEnabled, + writeProperties); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -616,6 +650,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final StructType dsSchema; private final boolean partitionedFanoutEnabled; private final String queryId; + private final Map writeProperties; protected WriterFactory( Broadcast
tableBroadcast, @@ -625,7 +660,8 @@ protected WriterFactory( long targetFileSize, Schema writeSchema, StructType dsSchema, - boolean partitionedFanoutEnabled) { + boolean partitionedFanoutEnabled, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.format = format; this.outputSpecId = outputSpecId; @@ -634,6 +670,7 @@ protected WriterFactory( this.dsSchema = dsSchema; this.partitionedFanoutEnabled = partitionedFanoutEnabled; this.queryId = queryId; + this.writeProperties = writeProperties; } @Override @@ -657,6 +694,7 @@ public DataWriter createWriter(int partitionId, long taskId, long e .dataFileFormat(format) .dataSchema(writeSchema) .dataSparkType(dsSchema) + .writeProperties(writeProperties) .build(); if (spec.isUnpartitioned()) { From 8b8affc58e044f3b473cbdf7ac9e066359efa5cf Mon Sep 17 00:00:00 2001 From: roryqi Date: Sun, 13 Aug 2023 16:04:48 +0800 Subject: [PATCH 02/18] add ut --- .../spark/source/TestCompressionSettings.java | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java new file mode 100644 index 000000000000..821b14ddcf95 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_STRATEGY; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +import java.io.File; +import java.util.List; +import java.util.Map; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AvroFSInput; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.types.Types; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestCompressionSettings { + + private static final Configuration CONF = new Configuration(); + private final FileFormat format; + + private final ImmutableMap properties; + private static SparkSession spark = null; + private static final Schema SCHEMA = + new Schema( + optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Parameterized.Parameters(name = "format = {0}, properties = {1}") + public static Object[] parameters() { + return new Object[] { + new Object[] {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")}, + new Object[] {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")}, + new Object[] { + "orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed") + }, + new Object[] { + "orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression") + }, + new Object[] {"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")} + }; + } + + @BeforeClass + public static void startSpark() { + TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @Parameterized.AfterParam + public static void clearSourceCache() { + ManualSource.clearTables(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestCompressionSettings.spark; + TestCompressionSettings.spark = null; + currentSpark.stop(); + } + + public TestCompressionSettings(String format, ImmutableMap properties) { + this.format = FileFormat.fromString(format); + this.properties = properties; + } + + @Test + public void testWriteDataWithDifferentSetting() throws Exception { + File parent = temp.newFolder(format.toString()); + File location = new File(parent, "test"); + HadoopTables tables = new HadoopTables(CONF); + Map tableProperties = Maps.newHashMap(); + tableProperties.put(PARQUET_COMPRESSION, "gzip"); + tableProperties.put(AVRO_COMPRESSION, "gzip"); + tableProperties.put(ORC_COMPRESSION, "zlib"); + Table table = + tables.create(SCHEMA, PartitionSpec.unpartitioned(), tableProperties, location.toString()); + List expectedOrigin = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + expectedOrigin.add(new SimpleRecord(1, "hello world" + i)); + } + Dataset df = spark.createDataFrame(expectedOrigin, SimpleRecord.class); + + for (Map.Entry entry : properties.entrySet()) { + spark.conf().set(entry.getKey(), entry.getValue()); + } + + df.select("id", "data") + .write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Append) + .save(location.toString()); + + List manifestFiles = table.currentSnapshot().dataManifests(table.io()); + try (ManifestReader reader = ManifestFiles.read(manifestFiles.get(0), table.io())) { + DataFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assert.assertEquals( + getCompressionType(inputFile).toLowerCase(), properties.get(COMPRESSION_CODEC)); + } + } + + private String getCompressionType(InputFile inputFile) throws Exception { + switch (format) { + case ORC: + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(CONF).useUTCTimestamp(true); + Reader orcReader = OrcFile.createReader(new Path(inputFile.location()), readerOptions); + return orcReader.getCompressionKind().name(); + case PARQUET: + ParquetMetadata footer = + ParquetFileReader.readFooter(CONF, new Path(inputFile.location()), NO_FILTER); + return footer.getBlocks().get(0).getColumns().get(0).getCodec().name(); + default: + FileContext fc = FileContext.getFileContext(CONF); + GenericDatumReader reader = new GenericDatumReader(); + DataFileReader fileReader = + (DataFileReader) + DataFileReader.openReader( + new AvroFSInput(fc, new Path(inputFile.location())), reader); + return fileReader.getMetaString(DataFileConstants.CODEC); + } + } +} From 49063cbc8a1094e17423f3a5ecaa0c9dfe173199 Mon Sep 17 00:00:00 2001 From: roryqi Date: Sun, 13 Aug 2023 16:14:45 +0800 Subject: [PATCH 03/18] Fix code style --- .../main/java/org/apache/iceberg/spark/SparkWriteOptions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index f4114f35d2b0..f8cb2c5a1942 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -81,7 +81,6 @@ private SparkWriteOptions() {} // Isolation Level for DataFrame calls. Currently supported by overwritePartitions public static final String ISOLATION_LEVEL = "isolation-level"; - // Controls write compress options public static final String COMPRESSION_CODEC = "compression-codec"; public static final String COMPRESSION_LEVEL = "compression-level"; From d08226b181f28997d910d37c38ac43284006dbf2 Mon Sep 17 00:00:00 2001 From: roryqi Date: Sun, 13 Aug 2023 19:45:04 +0800 Subject: [PATCH 04/18] address comments --- docs/spark-configuration.md | 3 +++ .../apache/iceberg/spark/source/SparkFileWriterFactory.java | 1 + .../apache/iceberg/spark/source/TestCompressionSettings.java | 1 + 3 files changed, 5 insertions(+) diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md index 3a91f211769f..f94efdcc58f4 100644 --- a/docs/spark-configuration.md +++ b/docs/spark-configuration.md @@ -194,6 +194,9 @@ df.write | check-ordering | true | Checks if input schema and table schema are same | | isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. | | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. | +| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write | +| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write | +| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | CommitMetadata provides an interface to add custom metadata to a snapshot summary during a SQL execution, which can be beneficial for purposes such as auditing or change tracking. If properties start with `snapshot-property.`, then that prefix will be removed from each property. Here is an example: diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index fe041b6f43dc..5288bc6acf86 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -113,6 +113,7 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); } + builder.setAll(writeProperties); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 821b14ddcf95..affd89257e46 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -135,6 +135,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { for (int i = 0; i < 1000; i++) { expectedOrigin.add(new SimpleRecord(1, "hello world" + i)); } + Dataset df = spark.createDataFrame(expectedOrigin, SimpleRecord.class); for (Map.Entry entry : properties.entrySet()) { From e2ac0c3d4d7da916b69d64ad5f93635928370e12 Mon Sep 17 00:00:00 2001 From: roryqi Date: Sun, 13 Aug 2023 19:55:37 +0800 Subject: [PATCH 05/18] fix --- .../apache/iceberg/spark/source/TestCompressionSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index affd89257e46..e3d488ea4ebd 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -135,7 +135,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { for (int i = 0; i < 1000; i++) { expectedOrigin.add(new SimpleRecord(1, "hello world" + i)); } - + Dataset df = spark.createDataFrame(expectedOrigin, SimpleRecord.class); for (Map.Entry entry : properties.entrySet()) { From bb11538fea7c9bb1a7d58edfd0e47dd59df7de8d Mon Sep 17 00:00:00 2001 From: roryqi Date: Mon, 14 Aug 2023 10:25:57 +0800 Subject: [PATCH 06/18] address comments --- .../spark/source/TestCompressionSettings.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index e3d488ea4ebd..719859838cfd 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -60,6 +60,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -73,14 +74,15 @@ public class TestCompressionSettings { private static final Configuration CONF = new Configuration(); - private final FileFormat format; - - private final ImmutableMap properties; - private static SparkSession spark = null; private static final Schema SCHEMA = new Schema( optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); + private static SparkSession spark = null; + + private final FileFormat format; + private final ImmutableMap properties; + @Rule public TemporaryFolder temp = new TemporaryFolder(); @Parameterized.Parameters(name = "format = {0}, properties = {1}") @@ -153,8 +155,8 @@ public void testWriteDataWithDifferentSetting() throws Exception { try (ManifestReader reader = ManifestFiles.read(manifestFiles.get(0), table.io())) { DataFile file = reader.iterator().next(); InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assert.assertEquals( - getCompressionType(inputFile).toLowerCase(), properties.get(COMPRESSION_CODEC)); + Assertions.assertThat( + getCompressionType(inputFile)).isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } } From 86458e957156ed4165d7080407c6aa514b055fe4 Mon Sep 17 00:00:00 2001 From: roryqi Date: Mon, 14 Aug 2023 10:29:47 +0800 Subject: [PATCH 07/18] fix --- .../apache/iceberg/spark/source/TestCompressionSettings.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 719859838cfd..03004e4eb2e3 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -62,7 +62,6 @@ import org.apache.spark.sql.SparkSession; import org.assertj.core.api.Assertions; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -155,8 +154,8 @@ public void testWriteDataWithDifferentSetting() throws Exception { try (ManifestReader reader = ManifestFiles.read(manifestFiles.get(0), table.io())) { DataFile file = reader.iterator().next(); InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat( - getCompressionType(inputFile)).isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } } From 923e03a56f2cfc0c3e411da34ca366a35b0e4495 Mon Sep 17 00:00:00 2001 From: roryqi Date: Mon, 14 Aug 2023 16:54:30 +0800 Subject: [PATCH 08/18] Use junit5 --- .../spark/source/TestCompressionSettings.java | 75 +++++++++---------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 03004e4eb2e3..671a893ab3f6 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.spark.source; +import static org.apache.iceberg.FileFormat.AVRO; +import static org.apache.iceberg.FileFormat.ORC; +import static org.apache.iceberg.FileFormat.PARQUET; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; @@ -30,6 +33,7 @@ import java.io.File; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; @@ -61,15 +65,14 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.assertj.core.api.Assertions; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + public class TestCompressionSettings { private static final Configuration CONF = new Configuration(); @@ -79,51 +82,41 @@ public class TestCompressionSettings { private static SparkSession spark = null; - private final FileFormat format; - private final ImmutableMap properties; - - @Rule public TemporaryFolder temp = new TemporaryFolder(); - - @Parameterized.Parameters(name = "format = {0}, properties = {1}") - public static Object[] parameters() { - return new Object[] { - new Object[] {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")}, - new Object[] {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")}, - new Object[] { - "orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed") - }, - new Object[] { - "orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression") - }, - new Object[] {"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")} - }; + @TempDir private File temp; + + private static Stream parameters() { + return Stream.of( + Arguments.of(PARQUET, ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")), + Arguments.of(PARQUET, ImmutableMap.of(COMPRESSION_CODEC, "gzip")), + Arguments.of( + ORC, ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")), + Arguments.of( + ORC, ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")), + Arguments.of(AVRO, ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3"))); } - @BeforeClass + @BeforeAll public static void startSpark() { TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); } - @Parameterized.AfterParam - public static void clearSourceCache() { + @AfterEach + public void clearSourceCache() { ManualSource.clearTables(); } - @AfterClass + @AfterAll public static void stopSpark() { SparkSession currentSpark = TestCompressionSettings.spark; TestCompressionSettings.spark = null; currentSpark.stop(); } - public TestCompressionSettings(String format, ImmutableMap properties) { - this.format = FileFormat.fromString(format); - this.properties = properties; - } - - @Test - public void testWriteDataWithDifferentSetting() throws Exception { - File parent = temp.newFolder(format.toString()); + @ParameterizedTest(name = "format = {0}, properties = {1}") + @MethodSource("parameters") + public void testWriteDataWithDifferentSetting( + FileFormat format, ImmutableMap properties) throws Exception { + File parent = temp; File location = new File(parent, "test"); HadoopTables tables = new HadoopTables(CONF); Map tableProperties = Maps.newHashMap(); @@ -154,12 +147,12 @@ public void testWriteDataWithDifferentSetting() throws Exception { try (ManifestReader reader = ManifestFiles.read(manifestFiles.get(0), table.io())) { DataFile file = reader.iterator().next(); InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(inputFile)) + Assertions.assertThat(getCompressionType(format, inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } } - private String getCompressionType(InputFile inputFile) throws Exception { + private String getCompressionType(FileFormat format, InputFile inputFile) throws Exception { switch (format) { case ORC: OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(CONF).useUTCTimestamp(true); From b607d7af0be1314cc8d98fd945e145ee5a142fc7 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 16 Aug 2023 10:38:25 +0800 Subject: [PATCH 09/18] fix --- .../iceberg/spark/SparkSQLProperties.java | 6 +- .../spark/source/SparkFileWriterFactory.java | 7 +- .../source/SparkPositionDeletesRewrite.java | 13 +++- .../spark/source/SparkPositionDeltaWrite.java | 9 ++- .../iceberg/spark/source/SparkWrite.java | 33 +-------- .../spark/source/WritePropertiesUtil.java | 68 +++++++++++++++++++ 6 files changed, 91 insertions(+), 45 deletions(-) create mode 100644 spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 660b90291f51..0c08587b4b2e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -65,7 +65,7 @@ private SparkSQLProperties() {} public static final String WAP_BRANCH = "spark.wap.branch"; // Controls write compress options - public static final String COMPRESSION_CODEC = "spark.sql.iceberg.write.compression-codec"; - public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.write.compression-level"; - public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.write.compression-strategy"; + public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; + public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level"; + public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy"; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index 5288bc6acf86..9b075b675565 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -79,12 +79,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { this.dataSparkType = dataSparkType; this.equalityDeleteSparkType = equalityDeleteSparkType; this.positionDeleteSparkType = positionDeleteSparkType; - - if (writeProperties != null) { - this.writeProperties = writeProperties; - } else { - this.writeProperties = ImmutableMap.of(); - } + this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); } static Builder builderFor(Table table) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 9835a73c4324..dca15ef42a6c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; + import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -74,6 +76,7 @@ public class SparkPositionDeletesRewrite implements Write { private final String fileSetId; private final int specId; private final StructLike partition; + private final Map writeProperties; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -106,6 +109,7 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; + this.writeProperties = WritePropertiesUtil.writeProperties(format, writeConf); } @Override @@ -129,7 +133,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { writeSchema, dsSchema, specId, - partition); + partition, + writeProperties); } @Override @@ -174,6 +179,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final StructType dsSchema; private final int specId; private final StructLike partition; + private final Map writeProperties; PositionDeletesWriterFactory( Broadcast
tableBroadcast, @@ -183,7 +189,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { Schema writeSchema, StructType dsSchema, int specId, - StructLike partition) { + StructLike partition, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -192,6 +199,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.dsSchema = dsSchema; this.specId = specId; this.partition = partition; + this.writeProperties = writeProperties; } @Override @@ -219,6 +227,7 @@ public DataWriter createWriter(int partitionId, long taskId) { SparkFileWriterFactory.builderFor(table) .deleteFileFormat(format) .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .writeProperties(writeProperties) .build(); return new DeleteWriter( diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 9759d25eb9ac..58cb44acfc4d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -104,6 +104,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Context context; private boolean cleanupOnAbort = true; + private final Map writeProperties; SparkPositionDeltaWrite( SparkSession spark, @@ -126,6 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); + this.writeProperties = WritePropertiesUtil.writeProperties(context.dataFileFormat, writeConf); } @Override @@ -155,7 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context); + return new PositionDeltaWriteFactory(tableBroadcast, command, context, ); } @Override @@ -326,11 +328,13 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast
tableBroadcast; private final Command command; private final Context context; + private final Map writeProperties; - PositionDeltaWriteFactory(Broadcast
tableBroadcast, Command command, Context context) { + PositionDeltaWriteFactory(Broadcast
tableBroadcast, Command command, Context context, Map writeProperties) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; + this.writeProperties = writeProperties; } @Override @@ -356,6 +360,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .dataSparkType(context.dataSparkType()) .deleteFileFormat(context.deleteFileFormat()) .positionDeleteSparkType(context.deleteSparkType()) + .writeProperties(writeProperties) .build(); if (command == DELETE) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index f93097408a3e..f11d3fcfbf81 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -20,12 +20,6 @@ import static org.apache.iceberg.IsolationLevel.SERIALIZABLE; import static org.apache.iceberg.IsolationLevel.SNAPSHOT; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import java.io.IOException; import java.util.Arrays; @@ -185,31 +179,6 @@ private WriterFactory createWriterFactory() { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - Map writeProperties = Maps.newHashMap(); - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, writeConf.parquetCompressionCodec()); - String parquetCompressionLevel = writeConf.parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - - break; - case AVRO: - writeProperties.put(AVRO_COMPRESSION, writeConf.avroCompressionCodec()); - String avroCompressionLevel = writeConf.avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, writeConf.avroCompressionLevel()); - } - - break; - case ORC: - writeProperties.put(ORC_COMPRESSION, writeConf.orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, writeConf.orcCompressionStrategy()); - break; - default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); - } return new WriterFactory( tableBroadcast, @@ -220,7 +189,7 @@ private WriterFactory createWriterFactory() { writeSchema, dsSchema, partitionedFanoutEnabled, - writeProperties); + WritePropertiesUtil.writeProperties(format, writeConf)); } private void commitOperation(SnapshotUpdate operation, String description) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java new file mode 100644 index 000000000000..48d912a2a3e5 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.relocated.com.google.common.collect.Maps +import org.apache.iceberg.spark.SparkWriteConf; + +import java.util.Map; + + +public class WritePropertiesUtil { + + private WritePropertiesUtil() {} + + public static Map writeProperties(FileFormat format, SparkWriteConf writeConf) { + Map writeProperties = Maps.newHashMap(); + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, writeConf.parquetCompressionCodec()); + String parquetCompressionLevel = writeConf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, writeConf.avroCompressionCodec()); + String avroCompressionLevel = writeConf.avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, writeConf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, writeConf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, writeConf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + return writeProperties; + } +} From 083e84fceb373063280e46f064f255ae80ba7675 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 16 Aug 2023 11:07:25 +0800 Subject: [PATCH 10/18] fix --- .../iceberg/spark/source/SparkPositionDeletesRewrite.java | 1 - .../iceberg/spark/source/SparkPositionDeltaWrite.java | 8 ++++++-- .../java/org/apache/iceberg/spark/source/SparkWrite.java | 1 - .../apache/iceberg/spark/source/WritePropertiesUtil.java | 7 ++----- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index dca15ef42a6c..df0bd7eb7e49 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; - import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 58cb44acfc4d..3ebd98d95059 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -157,7 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, ); + return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); } @Override @@ -330,7 +330,11 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Context context; private final Map writeProperties; - PositionDeltaWriteFactory(Broadcast
tableBroadcast, Command command, Context context, Map writeProperties) { + PositionDeltaWriteFactory( + Broadcast
tableBroadcast, + Command command, + Context context, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index f11d3fcfbf81..87416c15e900 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -54,7 +54,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java index 48d912a2a3e5..c892cc574696 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iceberg.spark.source; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; @@ -26,13 +25,11 @@ import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import java.util.Map; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.relocated.com.google.common.collect.Maps +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkWriteConf; -import java.util.Map; - - public class WritePropertiesUtil { private WritePropertiesUtil() {} From 102c119ef3b78bfed902b8abc9e11a02d2641f71 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 16 Aug 2023 15:06:00 +0800 Subject: [PATCH 11/18] fix --- .../spark/source/TestCompressionSettings.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 671a893ab3f6..5594c7b4b509 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -21,7 +21,9 @@ import static org.apache.iceberg.FileFormat.AVRO; import static org.apache.iceberg.FileFormat.ORC; import static org.apache.iceberg.FileFormat.PARQUET; +import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_MODE; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC; @@ -54,6 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.orc.OrcFile; @@ -64,6 +67,12 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.FieldReference; +import org.apache.spark.sql.connector.expressions.LiteralValue; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue; +import org.apache.spark.sql.connector.expressions.filter.Predicate; +import org.apache.spark.sql.types.DataTypes; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -123,6 +132,7 @@ public void testWriteDataWithDifferentSetting( tableProperties.put(PARQUET_COMPRESSION, "gzip"); tableProperties.put(AVRO_COMPRESSION, "gzip"); tableProperties.put(ORC_COMPRESSION, "zlib"); + tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName()); Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), tableProperties, location.toString()); List expectedOrigin = Lists.newArrayList(); @@ -150,6 +160,15 @@ public void testWriteDataWithDifferentSetting( Assertions.assertThat(getCompressionType(format, inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } + Predicate[] predicates = new Predicate[] { new AlwaysTrue() }; + new SparkTable(table, false).newRowLevelOperationBuilder(); + List deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + try (ManifestReader reader = ManifestFiles.read(deleteManifestFiles.get(0), table.io())) { + DataFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(format, inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } } private String getCompressionType(FileFormat format, InputFile inputFile) throws Exception { From faf8f5d2234d551b8b5b6cb8ec4c2cdae5f9ee84 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 16 Aug 2023 19:06:34 +0800 Subject: [PATCH 12/18] fix --- .../spark/source/TestCompressionSettings.java | 155 ++++++++++-------- 1 file changed, 89 insertions(+), 66 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 5594c7b4b509..2ee2b165092a 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -18,24 +18,22 @@ */ package org.apache.iceberg.spark.source; -import static org.apache.iceberg.FileFormat.AVRO; -import static org.apache.iceberg.FileFormat.ORC; import static org.apache.iceberg.FileFormat.PARQUET; import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DELETE_MODE; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC; import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL; import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_STRATEGY; -import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; -import java.io.File; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; @@ -44,100 +42,108 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestReader; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.actions.SizeBasedFileRewriter; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.SparkWriteOptions; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.spark.actions.SparkActions; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.connector.expressions.FieldReference; -import org.apache.spark.sql.connector.expressions.LiteralValue; -import org.apache.spark.sql.connector.expressions.NamedReference; -import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue; -import org.apache.spark.sql.connector.expressions.filter.Predicate; -import org.apache.spark.sql.types.DataTypes; import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -public class TestCompressionSettings { +@RunWith(Parameterized.class) +public class TestCompressionSettings extends SparkCatalogTestBase { private static final Configuration CONF = new Configuration(); - private static final Schema SCHEMA = - new Schema( - optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); + private static final String tableName = "testWriteData"; private static SparkSession spark = null; - @TempDir private File temp; - - private static Stream parameters() { - return Stream.of( - Arguments.of(PARQUET, ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")), - Arguments.of(PARQUET, ImmutableMap.of(COMPRESSION_CODEC, "gzip")), - Arguments.of( - ORC, ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")), - Arguments.of( - ORC, ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")), - Arguments.of(AVRO, ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3"))); + private final FileFormat format; + private final ImmutableMap properties; + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Parameterized.Parameters(name = "format = {0}, properties = {1}") + public static Object[][] parameters() { + return new Object[][] { + {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")}, + {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")}, + {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")}, + {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")}, + {"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")} + }; } - @BeforeAll + @BeforeClass public static void startSpark() { TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); } - @AfterEach - public void clearSourceCache() { - ManualSource.clearTables(); + @Parameterized.AfterParam + public static void clearSourceCache() { + spark.sql(String.format("DROP TABLE IF EXISTS %s", tableName)); } - @AfterAll + @AfterClass public static void stopSpark() { SparkSession currentSpark = TestCompressionSettings.spark; TestCompressionSettings.spark = null; currentSpark.stop(); } - @ParameterizedTest(name = "format = {0}, properties = {1}") - @MethodSource("parameters") - public void testWriteDataWithDifferentSetting( - FileFormat format, ImmutableMap properties) throws Exception { - File parent = temp; - File location = new File(parent, "test"); - HadoopTables tables = new HadoopTables(CONF); + public TestCompressionSettings(String format, ImmutableMap properties) { + super( + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties()); + this.format = FileFormat.fromString(format); + this.properties = properties; + } + + @Test + public void testWriteDataWithDifferentSetting() throws Exception { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); Map tableProperties = Maps.newHashMap(); tableProperties.put(PARQUET_COMPRESSION, "gzip"); tableProperties.put(AVRO_COMPRESSION, "gzip"); tableProperties.put(ORC_COMPRESSION, "zlib"); tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName()); - Table table = - tables.create(SCHEMA, PartitionSpec.unpartitioned(), tableProperties, location.toString()); + tableProperties.put(FORMAT_VERSION, "2"); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, format); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, DELETE_DEFAULT_FILE_FORMAT, format); + for (Map.Entry entry : tableProperties.entrySet()) { + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, entry.getKey(), entry.getValue()); + } List expectedOrigin = Lists.newArrayList(); for (int i = 0; i < 1000; i++) { - expectedOrigin.add(new SimpleRecord(1, "hello world" + i)); + expectedOrigin.add(new SimpleRecord(i, "hello world" + i)); } Dataset df = spark.createDataFrame(expectedOrigin, SimpleRecord.class); @@ -147,31 +153,48 @@ public void testWriteDataWithDifferentSetting( } df.select("id", "data") - .write() - .format("iceberg") + .writeTo(tableName) .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) - .mode(SaveMode.Append) - .save(location.toString()); - + .append(); + Table table = catalog.loadTable(TableIdentifier.of("default", tableName)); List manifestFiles = table.currentSnapshot().dataManifests(table.io()); try (ManifestReader reader = ManifestFiles.read(manifestFiles.get(0), table.io())) { DataFile file = reader.iterator().next(); InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(format, inputFile)) + Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - Predicate[] predicates = new Predicate[] { new AlwaysTrue() }; - new SparkTable(table, false).newRowLevelOperationBuilder(); + sql("DELETE from %s where id < 100", tableName); + + table.refresh(); List deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); - try (ManifestReader reader = ManifestFiles.read(deleteManifestFiles.get(0), table.io())) { - DataFile file = reader.iterator().next(); + Map specMap = Maps.newHashMap(); + specMap.put(0, PartitionSpec.unpartitioned()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(format, inputFile)) - .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } + if (PARQUET.equals(format)) { + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + table.refresh(); + deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } } } - private String getCompressionType(FileFormat format, InputFile inputFile) throws Exception { + private String getCompressionType(InputFile inputFile) throws Exception { switch (format) { case ORC: OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(CONF).useUTCTimestamp(true); From 371ac029325bd6ae13f28512454210d97012fed9 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 16 Aug 2023 19:10:43 +0800 Subject: [PATCH 13/18] fix --- .../iceberg/spark/source/TestCompressionSettings.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 2ee2b165092a..cc3aa9121b3a 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -135,12 +135,15 @@ public void testWriteDataWithDifferentSetting() throws Exception { tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName()); tableProperties.put(FORMAT_VERSION, "2"); sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, format); - sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, DELETE_DEFAULT_FILE_FORMAT, format); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, DELETE_DEFAULT_FILE_FORMAT, format); for (Map.Entry entry : tableProperties.entrySet()) { sql( "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, entry.getKey(), entry.getValue()); } + List expectedOrigin = Lists.newArrayList(); for (int i = 0; i < 1000; i++) { expectedOrigin.add(new SimpleRecord(i, "hello world" + i)); @@ -164,6 +167,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } + sql("DELETE from %s where id < 100", tableName); table.refresh(); @@ -177,6 +181,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } + if (PARQUET.equals(format)) { SparkActions.get(spark) .rewritePositionDeletes(table) @@ -185,7 +190,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { table.refresh(); deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); try (ManifestReader reader = - ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { DeleteFile file = reader.iterator().next(); InputFile inputFile = table.io().newInputFile(file.path().toString()); Assertions.assertThat(getCompressionType(inputFile)) From 9321807913705d453d7c56c45859d7dcf258579c Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 16 Aug 2023 19:56:50 +0800 Subject: [PATCH 14/18] fix --- .../org/apache/iceberg/spark/source/WritePropertiesUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java index c892cc574696..003e9b779dd5 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java @@ -60,6 +60,7 @@ public static Map writeProperties(FileFormat format, SparkWriteC default: throw new IllegalArgumentException(String.format("Unknown file format %s", format)); } + return writeProperties; } } From c6218f7a04c77635aa61c60d23e5fb1e7f77680e Mon Sep 17 00:00:00 2001 From: roryqi Date: Sat, 26 Aug 2023 16:56:46 +0800 Subject: [PATCH 15/18] fix --- .../apache/iceberg/spark/SparkWriteConf.java | 37 +++++++++++ .../source/SparkPositionDeletesRewrite.java | 2 +- .../spark/source/SparkPositionDeltaWrite.java | 2 +- .../iceberg/spark/source/SparkWrite.java | 2 +- .../spark/source/WritePropertiesUtil.java | 66 ------------------- 5 files changed, 40 insertions(+), 69 deletions(-) delete mode 100644 spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 85d1412b3e90..070601bcfd93 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -21,6 +21,12 @@ import static org.apache.iceberg.DistributionMode.HASH; import static org.apache.iceberg.DistributionMode.NONE; import static org.apache.iceberg.DistributionMode.RANGE; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import java.util.Locale; import java.util.Map; @@ -478,4 +484,35 @@ public String orcCompressionStrategy() { .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) .parse(); } + + public Map writeProperties(FileFormat format) { + Map writeProperties = Maps.newHashMap(); + + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + + return writeProperties; + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index df0bd7eb7e49..152279daaf72 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; - this.writeProperties = WritePropertiesUtil.writeProperties(format, writeConf); + this.writeProperties = writeConf.writeProperties(format); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 3ebd98d95059..db184bf48ba9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -127,7 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); - this.writeProperties = WritePropertiesUtil.writeProperties(context.dataFileFormat, writeConf); + this.writeProperties = writeConf.writeProperties(context.dataFileFormat); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 87416c15e900..6a6cad7f99f5 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -188,7 +188,7 @@ private WriterFactory createWriterFactory() { writeSchema, dsSchema, partitionedFanoutEnabled, - WritePropertiesUtil.writeProperties(format, writeConf)); + writeConf.writeProperties(format)); } private void commitOperation(SnapshotUpdate operation, String description) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java deleted file mode 100644 index 003e9b779dd5..000000000000 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.source; - -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; - -import java.util.Map; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.SparkWriteConf; - -public class WritePropertiesUtil { - - private WritePropertiesUtil() {} - - public static Map writeProperties(FileFormat format, SparkWriteConf writeConf) { - Map writeProperties = Maps.newHashMap(); - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, writeConf.parquetCompressionCodec()); - String parquetCompressionLevel = writeConf.parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - - break; - case AVRO: - writeProperties.put(AVRO_COMPRESSION, writeConf.avroCompressionCodec()); - String avroCompressionLevel = writeConf.avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, writeConf.avroCompressionLevel()); - } - - break; - case ORC: - writeProperties.put(ORC_COMPRESSION, writeConf.orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, writeConf.orcCompressionStrategy()); - break; - default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); - } - - return writeProperties; - } -} From 91a88a9857a5796a8e39763aa3f260507609a930 Mon Sep 17 00:00:00 2001 From: roryqi Date: Tue, 29 Aug 2023 10:11:49 +0800 Subject: [PATCH 16/18] address comment --- .../main/java/org/apache/iceberg/spark/SparkWriteConf.java | 7 ++++--- .../java/org/apache/iceberg/spark/source/SparkWrite.java | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 070601bcfd93..ec9825d40bbe 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -495,22 +495,23 @@ public Map writeProperties(FileFormat format) { if (parquetCompressionLevel != null) { writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); } - break; + case AVRO: writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); String avroCompressionLevel = avroCompressionLevel(); if (avroCompressionLevel != null) { writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); } - break; + case ORC: writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); break; + default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + // skip } return writeProperties; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 6a6cad7f99f5..15881098e7a3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -100,6 +100,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final Map extraSnapshotMetadata; private final boolean partitionedFanoutEnabled; private final SparkWriteRequirements writeRequirements; + private final Map writeProperties; private boolean cleanupOnAbort = true; @@ -128,6 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); + this.writeProperties = writeConf.writeProperties(); } @Override @@ -178,7 +180,6 @@ private WriterFactory createWriterFactory() { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new WriterFactory( tableBroadcast, queryId, @@ -188,7 +189,7 @@ private WriterFactory createWriterFactory() { writeSchema, dsSchema, partitionedFanoutEnabled, - writeConf.writeProperties(format)); + writeProperties); } private void commitOperation(SnapshotUpdate operation, String description) { From f0e295ad6e7a1712eb6ba481e4637ea777a130d5 Mon Sep 17 00:00:00 2001 From: roryqi Date: Tue, 29 Aug 2023 10:19:58 +0800 Subject: [PATCH 17/18] trigger ci From 2c0308c2e37ff45ef9c52d42e9f33366b25233ad Mon Sep 17 00:00:00 2001 From: roryqi Date: Tue, 29 Aug 2023 10:36:32 +0800 Subject: [PATCH 18/18] fix --- .../main/java/org/apache/iceberg/spark/source/SparkWrite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 15881098e7a3..d4a4f22bfd49 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -129,7 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); - this.writeProperties = writeConf.writeProperties(); + this.writeProperties = writeConf.writeProperties(format); } @Override