From 46b1ab16c241b3115c53611afe285bda88ebbd46 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 22:53:42 +0800 Subject: [PATCH 01/22] Spark 3.4: BaseMetadataTable should respect the base table properties --- .../org/apache/iceberg/BaseMetadataTable.java | 2 +- .../spark/source/TestCompressionSettings.java | 27 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 5f7c48e95867..a174da3d3c19 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -166,7 +166,7 @@ public Map sortOrders() { @Override public Map properties() { - return ImmutableMap.of(); + return table.properties(); } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index cc3aa9121b3a..be5ea8e9def4 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -182,21 +182,20 @@ public void testWriteDataWithDifferentSetting() throws Exception { .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - if (PARQUET.equals(format)) { - SparkActions.get(spark) - .rewritePositionDeletes(table) - .option(SizeBasedFileRewriter.REWRITE_ALL, "true") - .execute(); - table.refresh(); - deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); - try (ManifestReader reader = - ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { - DeleteFile file = reader.iterator().next(); - InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(inputFile)) - .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); - } + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + table.refresh(); + deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } + } private String getCompressionType(InputFile inputFile) throws Exception { From 182a73844c40574b114bc98dd347a9b36fe3257e Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 23:03:57 +0800 Subject: [PATCH 02/22] [WIP] Spark 3.4: PositionDeletesTable properties should respect the base table properties --- core/src/main/java/org/apache/iceberg/BaseMetadataTable.java | 2 +- .../main/java/org/apache/iceberg/PositionDeletesTable.java | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index a174da3d3c19..5f7c48e95867 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -166,7 +166,7 @@ public Map sortOrders() { @Override public Map properties() { - return table.properties(); + return ImmutableMap.of(); } @Override diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index f8cb924e536a..0b3ce435a2ae 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -253,6 +253,11 @@ protected CloseableIterable doPlanFiles() { } } + @Override + public Map properties() { + return table().properties(); + } + private CloseableIterable posDeletesScanTasks( ManifestFile manifest, PartitionSpec spec, From b5fce2c1d7c9e4bfada53f5016e9c121926e0131 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 23:13:46 +0800 Subject: [PATCH 03/22] fix --- .../java/org/apache/iceberg/PositionDeletesTable.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 0b3ce435a2ae..5f1a5b148722 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -93,6 +93,11 @@ public Map specs() { return specs; } + @Override + public Map properties() { + return table().properties(); + } + private Schema calculateSchema() { Types.StructType partitionType = Partitioning.partitionType(table()); Schema result = @@ -253,10 +258,6 @@ protected CloseableIterable doPlanFiles() { } } - @Override - public Map properties() { - return table().properties(); - } private CloseableIterable posDeletesScanTasks( ManifestFile manifest, From f600ed20a6e6be411f968fa8bf5005820b630479 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 23:16:30 +0800 Subject: [PATCH 04/22] fix --- core/src/main/java/org/apache/iceberg/PositionDeletesTable.java | 1 - .../org/apache/iceberg/spark/source/TestCompressionSettings.java | 1 - 2 files changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 5f1a5b148722..38c778633b2c 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -258,7 +258,6 @@ protected CloseableIterable doPlanFiles() { } } - private CloseableIterable posDeletesScanTasks( ManifestFile manifest, PartitionSpec spec, diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index be5ea8e9def4..4030034bf248 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -195,7 +195,6 @@ public void testWriteDataWithDifferentSetting() throws Exception { Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - } private String getCompressionType(InputFile inputFile) throws Exception { From c8363fb08afe58d157072d9a9344d998e41989a4 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 11:03:16 +0800 Subject: [PATCH 05/22] Spark 3.4: Fix the delete file compression option --- .../apache/iceberg/spark/SparkWriteConf.java | 156 ++++++++++++++---- .../source/SparkPositionDeletesRewrite.java | 3 +- .../spark/source/SparkPositionDeltaWrite.java | 2 +- .../iceberg/spark/source/SparkWrite.java | 2 +- .../spark/source/TestCompressionSettings.java | 6 + 5 files changed, 136 insertions(+), 33 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index ec9825d40bbe..b9dd5cf4f585 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -23,6 +23,12 @@ import static org.apache.iceberg.DistributionMode.RANGE; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; @@ -425,7 +431,7 @@ public String branch() { return branch; } - public String parquetCompressionCodec() { + private String parquetCompressionCodec() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_CODEC) @@ -435,7 +441,17 @@ public String parquetCompressionCodec() { .parse(); } - public String parquetCompressionLevel() { + private String deleteParquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_PARQUET_COMPRESSION) + .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .parseOptional(); + } + + private String parquetCompressionLevel() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_LEVEL) @@ -445,7 +461,17 @@ public String parquetCompressionLevel() { .parseOptional(); } - public String avroCompressionCodec() { + private String deleteParquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) + .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + private String avroCompressionCodec() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_CODEC) @@ -455,7 +481,17 @@ public String avroCompressionCodec() { .parse(); } - public String avroCompressionLevel() { + private String deleteAvroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_AVRO_COMPRESSION) + .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .parse(); + } + + private String avroCompressionLevel() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_LEVEL) @@ -465,7 +501,17 @@ public String avroCompressionLevel() { .parseOptional(); } - public String orcCompressionCodec() { + private String deleteAvroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) + .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + private String orcCompressionCodec() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_CODEC) @@ -475,7 +521,17 @@ public String orcCompressionCodec() { .parse(); } - public String orcCompressionStrategy() { + private String deleteOrcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_ORC_COMPRESSION) + .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .parse(); + } + + private String orcCompressionStrategy() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_STRATEGY) @@ -485,33 +541,73 @@ public String orcCompressionStrategy() { .parse(); } - public Map writeProperties(FileFormat format) { + private String deleteOrcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) + .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .parse(); + } + + public Map writeProperties(FileFormat dataFormat, FileFormat deleteFormat) { Map writeProperties = Maps.newHashMap(); - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); - String parquetCompressionLevel = parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - break; - - case AVRO: - writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); - String avroCompressionLevel = avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); - } - break; - - case ORC: - writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); - break; + if (dataFormat != null) { + switch (dataFormat) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; - default: - // skip + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; + + default: + // skip + } + } + + if (deleteFormat != null) { + switch (deleteFormat) { + case PARQUET: + writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); + String parquetCompressionLevel = deleteParquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(DELETE_AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = deleteAvroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, avroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec()); + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy()); + break; + + default: + // skip + } } return writeProperties; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 152279daaf72..3370a7d384ad 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; - this.writeProperties = writeConf.writeProperties(format); + this.writeProperties = writeConf.writeProperties(null, format); } @Override @@ -221,6 +221,7 @@ public DataWriter createWriter(int partitionId, long taskId) { .deleteFileFormat(format) .positionDeleteRowSchema(positionDeleteRowSchema) .positionDeleteSparkType(deleteSparkType) + .writeProperties(writeProperties) .build(); SparkFileWriterFactory writerFactoryWithoutRow = SparkFileWriterFactory.builderFor(table) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index b80d28368543..b430d6b59549 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -127,7 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); - this.writeProperties = writeConf.writeProperties(context.dataFileFormat); + this.writeProperties = writeConf.writeProperties(context.dataFileFormat, context.deleteFileFormat); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index d4a4f22bfd49..239b85d0549c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -129,7 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); - this.writeProperties = writeConf.writeProperties(format); + this.writeProperties = writeConf.writeProperties(format, null); } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 4030034bf248..9721b625cf39 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -22,8 +22,11 @@ import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DELETE_MODE; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; @@ -132,6 +135,9 @@ public void testWriteDataWithDifferentSetting() throws Exception { tableProperties.put(PARQUET_COMPRESSION, "gzip"); tableProperties.put(AVRO_COMPRESSION, "gzip"); tableProperties.put(ORC_COMPRESSION, "zlib"); + tableProperties.put(DELETE_PARQUET_COMPRESSION, "gzip"); + tableProperties.put(DELETE_AVRO_COMPRESSION, "zlib"); + tableProperties.put(DELETE_ORC_COMPRESSION, "zlib"); tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName()); tableProperties.put(FORMAT_VERSION, "2"); sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, format); From c0e8ff1c3b01c16d2945931444ce1770a83ec97c Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 11:04:39 +0800 Subject: [PATCH 06/22] Revert "fix" This reverts commit b5fce2c1d7c9e4bfada53f5016e9c121926e0131. --- .../main/java/org/apache/iceberg/PositionDeletesTable.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 38c778633b2c..f8cb924e536a 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -93,11 +93,6 @@ public Map specs() { return specs; } - @Override - public Map properties() { - return table().properties(); - } - private Schema calculateSchema() { Types.StructType partitionType = Partitioning.partitionType(table()); Schema result = From e57b0f3c58f8aa029d6c506c76cb46a5f1566b11 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 11:08:00 +0800 Subject: [PATCH 07/22] fix --- .../spark/source/TestCompressionSettings.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 9721b625cf39..402bffa381a4 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -187,19 +187,20 @@ public void testWriteDataWithDifferentSetting() throws Exception { Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - - SparkActions.get(spark) - .rewritePositionDeletes(table) - .option(SizeBasedFileRewriter.REWRITE_ALL, "true") - .execute(); - table.refresh(); - deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); - try (ManifestReader reader = - ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { - DeleteFile file = reader.iterator().next(); - InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(inputFile)) - .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + if (PARQUET.equals(format)) { + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + table.refresh(); + deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } } } From a7cea948ad605d1d74bc7cea8794935a359a5a57 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 11:09:34 +0800 Subject: [PATCH 08/22] fix --- .../apache/iceberg/spark/source/TestCompressionSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 402bffa381a4..c219580cb3c5 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -195,7 +195,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { table.refresh(); deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); try (ManifestReader reader = - ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { DeleteFile file = reader.iterator().next(); InputFile inputFile = table.io().newInputFile(file.path().toString()); Assertions.assertThat(getCompressionType(inputFile)) From 621c1dc332ccc497a99dbce739db70efdd25efd2 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 11:09:56 +0800 Subject: [PATCH 09/22] fix --- .../org/apache/iceberg/spark/source/TestCompressionSettings.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index c219580cb3c5..94669869479d 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -187,6 +187,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } + if (PARQUET.equals(format)) { SparkActions.get(spark) .rewritePositionDeletes(table) From eeb264c4d6185cff7c7940a002aeaeba1c4a730f Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 11:15:13 +0800 Subject: [PATCH 10/22] fix --- .../apache/iceberg/spark/source/SparkPositionDeltaWrite.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index b430d6b59549..b65ab16d0448 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -127,7 +127,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); - this.writeProperties = writeConf.writeProperties(context.dataFileFormat, context.deleteFileFormat); + this.writeProperties = + writeConf.writeProperties(context.dataFileFormat, context.deleteFileFormat); } @Override From 67b3ce816968cd337e832cf168a0cf6f05a6979f Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 11:44:40 +0800 Subject: [PATCH 11/22] fix --- .../apache/iceberg/spark/SparkWriteConf.java | 155 ++++++++++-------- 1 file changed, 87 insertions(+), 68 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index b9dd5cf4f585..9b4fa0b7d0f6 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -74,6 +74,9 @@ public class SparkWriteConf { private static final Logger LOG = LoggerFactory.getLogger(SparkWriteConf.class); + private static final String DELETE_COMPRESSION_CODEC_DEFAULT_VALUE = null; + private static final String DELETE_COMPRESSION_LEVEL_DEFAULT = null; + private static final String DELETE_COMPRESSION_STRATEGY_DEFAULT = null; private final Table table; private final String branch; @@ -431,6 +434,84 @@ public String branch() { return branch; } + public Map writeProperties(FileFormat dataFormat, FileFormat deleteFormat) { + Map writeProperties = Maps.newHashMap(); + + if (dataFormat != null) { + switch (dataFormat) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; + + default: + // skip + } + } + + if (deleteFormat != null) { + switch (deleteFormat) { + case PARQUET: + String parquetCodec = deleteParquetCompressionCodec(); + writeProperties.put( + DELETE_PARQUET_COMPRESSION, + parquetCodec != null ? parquetCodec : parquetCompressionCodec()); + String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); + if (deleteParquetCompressionLevel != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); + } else if (parquetCompressionCodec() != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel()); + } + break; + + case AVRO: + String avroCodec = deleteAvroCompressionCodec(); + writeProperties.put( + DELETE_AVRO_COMPRESSION, avroCodec != null ? avroCodec : avroCompressionCodec()); + String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); + if (deleteAvroCompressionLevel != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); + } else if (avroCompressionLevel() != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); + } + break; + + case ORC: + String orcCodec = deleteOrcCompressionCodec(); + writeProperties.put( + DELETE_ORC_COMPRESSION, orcCodec != null ? orcCodec : orcCompressionCodec()); + String strategy = deleteOrcCompressionStrategy(); + if (strategy != null) { + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, strategy); + } else { + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + } + break; + + default: + // skip + } + } + + return writeProperties; + } + private String parquetCompressionCodec() { return confParser .stringConf() @@ -447,7 +528,7 @@ private String deleteParquetCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_PARQUET_COMPRESSION) - .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .defaultValue(DELETE_COMPRESSION_CODEC_DEFAULT_VALUE) .parseOptional(); } @@ -467,7 +548,7 @@ private String deleteParquetCompressionLevel() { .option(SparkWriteOptions.COMPRESSION_LEVEL) .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) - .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .defaultValue(DELETE_COMPRESSION_STRATEGY_DEFAULT) .parseOptional(); } @@ -487,7 +568,7 @@ private String deleteAvroCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_AVRO_COMPRESSION) - .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .defaultValue(DELETE_COMPRESSION_CODEC_DEFAULT_VALUE) .parse(); } @@ -507,7 +588,7 @@ private String deleteAvroCompressionLevel() { .option(SparkWriteOptions.COMPRESSION_LEVEL) .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) - .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .defaultValue(DELETE_COMPRESSION_LEVEL_DEFAULT) .parseOptional(); } @@ -527,7 +608,7 @@ private String deleteOrcCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_ORC_COMPRESSION) - .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .defaultValue(DELETE_COMPRESSION_CODEC_DEFAULT_VALUE) .parse(); } @@ -547,69 +628,7 @@ private String deleteOrcCompressionStrategy() { .option(SparkWriteOptions.COMPRESSION_STRATEGY) .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) - .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .defaultValue(DELETE_COMPRESSION_STRATEGY_DEFAULT) .parse(); } - - public Map writeProperties(FileFormat dataFormat, FileFormat deleteFormat) { - Map writeProperties = Maps.newHashMap(); - - if (dataFormat != null) { - switch (dataFormat) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); - String parquetCompressionLevel = parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - break; - - case AVRO: - writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); - String avroCompressionLevel = avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); - } - break; - - case ORC: - writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); - break; - - default: - // skip - } - } - - if (deleteFormat != null) { - switch (deleteFormat) { - case PARQUET: - writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); - String parquetCompressionLevel = deleteParquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - break; - - case AVRO: - writeProperties.put(DELETE_AVRO_COMPRESSION, avroCompressionCodec()); - String avroCompressionLevel = deleteAvroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, avroCompressionLevel); - } - break; - - case ORC: - writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec()); - writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy()); - break; - - default: - // skip - } - } - - return writeProperties; - } } From 98f44940177e2333f68d5bfe3100aeea408e41c9 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 12:01:37 +0800 Subject: [PATCH 12/22] fix --- .../org/apache/iceberg/spark/SparkWriteConf.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 9b4fa0b7d0f6..304523ee82e7 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -74,9 +74,6 @@ public class SparkWriteConf { private static final Logger LOG = LoggerFactory.getLogger(SparkWriteConf.class); - private static final String DELETE_COMPRESSION_CODEC_DEFAULT_VALUE = null; - private static final String DELETE_COMPRESSION_LEVEL_DEFAULT = null; - private static final String DELETE_COMPRESSION_STRATEGY_DEFAULT = null; private final Table table; private final String branch; @@ -528,7 +525,6 @@ private String deleteParquetCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_PARQUET_COMPRESSION) - .defaultValue(DELETE_COMPRESSION_CODEC_DEFAULT_VALUE) .parseOptional(); } @@ -548,7 +544,6 @@ private String deleteParquetCompressionLevel() { .option(SparkWriteOptions.COMPRESSION_LEVEL) .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) - .defaultValue(DELETE_COMPRESSION_STRATEGY_DEFAULT) .parseOptional(); } @@ -568,8 +563,7 @@ private String deleteAvroCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_AVRO_COMPRESSION) - .defaultValue(DELETE_COMPRESSION_CODEC_DEFAULT_VALUE) - .parse(); + .parseOptional(); } private String avroCompressionLevel() { @@ -588,7 +582,6 @@ private String deleteAvroCompressionLevel() { .option(SparkWriteOptions.COMPRESSION_LEVEL) .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) - .defaultValue(DELETE_COMPRESSION_LEVEL_DEFAULT) .parseOptional(); } @@ -608,8 +601,7 @@ private String deleteOrcCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_ORC_COMPRESSION) - .defaultValue(DELETE_COMPRESSION_CODEC_DEFAULT_VALUE) - .parse(); + .parseOptional(); } private String orcCompressionStrategy() { @@ -628,7 +620,6 @@ private String deleteOrcCompressionStrategy() { .option(SparkWriteOptions.COMPRESSION_STRATEGY) .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) - .defaultValue(DELETE_COMPRESSION_STRATEGY_DEFAULT) - .parse(); + .parseOptional(); } } From 1c17d4926cc403acb18ba120c8756bc58ceab57c Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 12:19:19 +0800 Subject: [PATCH 13/22] fix --- .../apache/iceberg/spark/SparkWriteConf.java | 132 +++++++++--------- .../source/SparkPositionDeletesRewrite.java | 2 +- .../spark/source/SparkPositionDeltaWrite.java | 3 +- .../iceberg/spark/source/SparkWrite.java | 2 +- 4 files changed, 68 insertions(+), 71 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 304523ee82e7..5e3132746067 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -431,79 +431,77 @@ public String branch() { return branch; } - public Map writeProperties(FileFormat dataFormat, FileFormat deleteFormat) { + public Map writeProperties() { Map writeProperties = Maps.newHashMap(); - if (dataFormat != null) { - switch (dataFormat) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); - String parquetCompressionLevel = parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - break; - - case AVRO: - writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); - String avroCompressionLevel = avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); - } - break; + FileFormat dataFormat = dataFileFormat(); + switch (dataFormat) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; - case ORC: - writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); - break; - - default: - // skip - } + default: + // skip } - if (deleteFormat != null) { - switch (deleteFormat) { - case PARQUET: - String parquetCodec = deleteParquetCompressionCodec(); - writeProperties.put( - DELETE_PARQUET_COMPRESSION, - parquetCodec != null ? parquetCodec : parquetCompressionCodec()); - String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); - if (deleteParquetCompressionLevel != null) { - writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); - } else if (parquetCompressionCodec() != null) { - writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel()); - } - break; - - case AVRO: - String avroCodec = deleteAvroCompressionCodec(); - writeProperties.put( - DELETE_AVRO_COMPRESSION, avroCodec != null ? avroCodec : avroCompressionCodec()); - String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); - if (deleteAvroCompressionLevel != null) { - writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); - } else if (avroCompressionLevel() != null) { - writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); - } - break; - - case ORC: - String orcCodec = deleteOrcCompressionCodec(); - writeProperties.put( - DELETE_ORC_COMPRESSION, orcCodec != null ? orcCodec : orcCompressionCodec()); - String strategy = deleteOrcCompressionStrategy(); - if (strategy != null) { - writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, strategy); - } else { - writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); - } - break; + FileFormat deleteFormat = deleteFileFormat(); + switch (deleteFormat) { + case PARQUET: + String parquetCodec = deleteParquetCompressionCodec(); + writeProperties.put( + DELETE_PARQUET_COMPRESSION, + parquetCodec != null ? parquetCodec : parquetCompressionCodec()); + String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); + if (deleteParquetCompressionLevel != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); + } else if (parquetCompressionCodec() != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel()); + } + break; + + case AVRO: + String avroCodec = deleteAvroCompressionCodec(); + writeProperties.put( + DELETE_AVRO_COMPRESSION, avroCodec != null ? avroCodec : avroCompressionCodec()); + String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); + if (deleteAvroCompressionLevel != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); + } else if (avroCompressionLevel() != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); + } + break; + + case ORC: + String orcCodec = deleteOrcCompressionCodec(); + writeProperties.put( + DELETE_ORC_COMPRESSION, orcCodec != null ? orcCodec : orcCompressionCodec()); + String strategy = deleteOrcCompressionStrategy(); + if (strategy != null) { + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, strategy); + } else { + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + } + break; - default: - // skip - } + default: + // skip } return writeProperties; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 3370a7d384ad..d0769eaa5f4e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; - this.writeProperties = writeConf.writeProperties(null, format); + this.writeProperties = writeConf.writeProperties(); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index b65ab16d0448..9fea33948b3e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -127,8 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); - this.writeProperties = - writeConf.writeProperties(context.dataFileFormat, context.deleteFileFormat); + this.writeProperties = writeConf.writeProperties(); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 239b85d0549c..15881098e7a3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -129,7 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); - this.writeProperties = writeConf.writeProperties(format, null); + this.writeProperties = writeConf.writeProperties(); } @Override From e319ed5ac5b618611a8056db8f271aff907a0a94 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 13:00:06 +0800 Subject: [PATCH 14/22] fix --- .../org/apache/iceberg/spark/SparkWriteConf.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 5e3132746067..17ad1afcc948 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -433,6 +433,13 @@ public String branch() { public Map writeProperties() { Map writeProperties = Maps.newHashMap(); + writeProperties.putAll(dataWriteProperties()); + writeProperties.putAll(deleteWriteProperties()); + return writeProperties; + } + + private Map dataWriteProperties() { + Map writeProperties = Maps.newHashMap(); FileFormat dataFormat = dataFileFormat(); switch (dataFormat) { @@ -461,6 +468,12 @@ public Map writeProperties() { // skip } + return writeProperties; + } + + private Map deleteWriteProperties() { + Map writeProperties = Maps.newHashMap(); + FileFormat deleteFormat = deleteFileFormat(); switch (deleteFormat) { case PARQUET: From 070fdda10e757d91531aba7880fcea9a15bf6912 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 13:45:20 +0800 Subject: [PATCH 15/22] fix --- .../src/main/java/org/apache/iceberg/spark/SparkWriteConf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 17ad1afcc948..ece406a8d92c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -484,7 +484,7 @@ private Map deleteWriteProperties() { String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); if (deleteParquetCompressionLevel != null) { writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); - } else if (parquetCompressionCodec() != null) { + } else if (parquetCompressionLevel() != null) { writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel()); } break; From 53f71735abaa7b45fb517a2f7f09d306941b691f Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 15:32:48 +0800 Subject: [PATCH 16/22] fix --- .../iceberg/spark/TestSparkWriteConf.java | 257 ++++++++++++++++++ .../spark/source/TestCompressionSettings.java | 2 +- 2 files changed, 258 insertions(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 1dcdc1fc6595..7b417cfe33c1 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -18,13 +18,29 @@ */ package org.apache.iceberg.spark; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; @@ -32,6 +48,8 @@ import java.util.Map; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.After; import org.junit.Assert; @@ -164,6 +182,245 @@ public void testSparkWriteConfDistributionModeWithEverything() { }); } + @Test + public void testSparkWriteConfWriteProperties() { + Object[][] propertiesSuites = + new Object[][] { + { + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "gzip", + TableProperties.DELETE_PARQUET_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "3", + DELETE_PARQUET_COMPRESSION_LEVEL, + "3") + }, + { + ImmutableMap.of( + COMPRESSION_CODEC, "zstd", SparkSQLProperties.COMPRESSION_STRATEGY, "compression"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "zlib", + DELETE_ORC_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression", + ORC_COMPRESSION_STRATEGY, + "compression") + }, + { + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "gzip", + DELETE_AVRO_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "5") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "speed") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION, + "zstd", + DELETE_AVRO_COMPRESSION_LEVEL, + "16"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "16") + } + }; + + for (Object[] propertiesSuite : propertiesSuites) { + withSQLConf( + (Map) propertiesSuite[0], + () -> { + Table table = validationCatalog.loadTable(tableIdent); + Map writeOptions = ImmutableMap.of(); + Map tableProperties = (Map) propertiesSuite[1]; + UpdateProperties updateProperties = table.updateProperties(); + for (Map.Entry entry : tableProperties.entrySet()) { + updateProperties.set(entry.getKey(), entry.getValue()); + } + + updateProperties.commit(); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + Map writeProperties = writeConf.writeProperties(); + Map expectedProperties = (Map) propertiesSuite[2]; + Assert.assertEquals(expectedProperties.size(), writeConf.writeProperties().size()); + for (Map.Entry entry : writeProperties.entrySet()) { + Assert.assertEquals(entry.getValue(), expectedProperties.get(entry.getKey())); + } + + table.refresh(); + updateProperties = table.updateProperties(); + for (Map.Entry entry : tableProperties.entrySet()) { + updateProperties.remove(entry.getKey()); + } + + updateProperties.commit(); + }); + } + } + private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf) { Assert.assertEquals(expectedMode, writeConf.distributionMode()); Assert.assertEquals(expectedMode, writeConf.copyOnWriteDistributionMode(DELETE)); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 94669869479d..760e735acee9 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -136,7 +136,7 @@ public void testWriteDataWithDifferentSetting() throws Exception { tableProperties.put(AVRO_COMPRESSION, "gzip"); tableProperties.put(ORC_COMPRESSION, "zlib"); tableProperties.put(DELETE_PARQUET_COMPRESSION, "gzip"); - tableProperties.put(DELETE_AVRO_COMPRESSION, "zlib"); + tableProperties.put(DELETE_AVRO_COMPRESSION, "gzip"); tableProperties.put(DELETE_ORC_COMPRESSION, "zlib"); tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName()); tableProperties.put(FORMAT_VERSION, "2"); From f440e53fe5d7fa872ca24df5e65a85dff0cd851c Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 16:36:01 +0800 Subject: [PATCH 17/22] fix --- .../iceberg/spark/TestSparkWriteConf.java | 482 +++++++++--------- 1 file changed, 248 insertions(+), 234 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 7b417cfe33c1..a32bff75f245 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -184,243 +184,257 @@ public void testSparkWriteConfDistributionModeWithEverything() { @Test public void testSparkWriteConfWriteProperties() { - Object[][] propertiesSuites = - new Object[][] { - { - ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - TableProperties.PARQUET_COMPRESSION, - "gzip", - TableProperties.DELETE_PARQUET_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "3", - DELETE_PARQUET_COMPRESSION_LEVEL, - "3") - }, - { - ImmutableMap.of( - COMPRESSION_CODEC, "zstd", SparkSQLProperties.COMPRESSION_STRATEGY, "compression"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "zlib", - DELETE_ORC_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "zstd", - ORC_COMPRESSION, - "zstd", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression", - ORC_COMPRESSION_STRATEGY, - "compression") - }, - { - ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "gzip", - DELETE_AVRO_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "9") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "5") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION_STRATEGY, - "speed") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "9") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - TableProperties.PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "6"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "6") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION, - "zstd", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "zstd", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION, - "zstd", - DELETE_AVRO_COMPRESSION_LEVEL, - "16"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "16") - } - }; - - for (Object[] propertiesSuite : propertiesSuites) { - withSQLConf( - (Map) propertiesSuite[0], - () -> { - Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = ImmutableMap.of(); - Map tableProperties = (Map) propertiesSuite[1]; - UpdateProperties updateProperties = table.updateProperties(); - for (Map.Entry entry : tableProperties.entrySet()) { - updateProperties.set(entry.getKey(), entry.getValue()); - } - - updateProperties.commit(); - - SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); - Map writeProperties = writeConf.writeProperties(); - Map expectedProperties = (Map) propertiesSuite[2]; - Assert.assertEquals(expectedProperties.size(), writeConf.writeProperties().size()); - for (Map.Entry entry : writeProperties.entrySet()) { - Assert.assertEquals(entry.getValue(), expectedProperties.get(entry.getKey())); - } - - table.refresh(); - updateProperties = table.updateProperties(); - for (Map.Entry entry : tableProperties.entrySet()) { - updateProperties.remove(entry.getKey()); - } - - updateProperties.commit(); - }); + for (Object[] propertiesSuite : getOverridePropertiesSuites()) { + testWritePropertiesBySuite(propertiesSuite); + } + + for (Object[] propertiesSuite : getNonOverridePropertiesSuite()) { + testWritePropertiesBySuite(propertiesSuite); } } + private void testWritePropertiesBySuite(Object[] propertiesSuite) { + withSQLConf( + (Map) propertiesSuite[0], + () -> { + Table table = validationCatalog.loadTable(tableIdent); + Map writeOptions = ImmutableMap.of(); + Map tableProperties = (Map) propertiesSuite[1]; + UpdateProperties updateProperties = table.updateProperties(); + for (Map.Entry entry : tableProperties.entrySet()) { + updateProperties.set(entry.getKey(), entry.getValue()); + } + + updateProperties.commit(); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + Map writeProperties = writeConf.writeProperties(); + Map expectedProperties = (Map) propertiesSuite[2]; + Assert.assertEquals(expectedProperties.size(), writeConf.writeProperties().size()); + for (Map.Entry entry : writeProperties.entrySet()) { + Assert.assertEquals(entry.getValue(), expectedProperties.get(entry.getKey())); + } + + table.refresh(); + updateProperties = table.updateProperties(); + for (Map.Entry entry : tableProperties.entrySet()) { + updateProperties.remove(entry.getKey()); + } + + updateProperties.commit(); + }); + } + + private Object[][] getOverridePropertiesSuites() { + return new Object[][] { + { + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "gzip", + TableProperties.DELETE_PARQUET_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "3", + DELETE_PARQUET_COMPRESSION_LEVEL, + "3") + }, + { + ImmutableMap.of( + COMPRESSION_CODEC, "zstd", SparkSQLProperties.COMPRESSION_STRATEGY, "compression"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "zlib", + DELETE_ORC_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression", + ORC_COMPRESSION_STRATEGY, + "compression") + }, + { + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "gzip", + DELETE_AVRO_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9") + }, + }; + } + + private Object[][] getNonOverridePropertiesSuite() { + return new Object[][] { + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "5") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "speed") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION, + "zstd", + DELETE_AVRO_COMPRESSION_LEVEL, + "16"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "16") + } + }; + } + private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf) { Assert.assertEquals(expectedMode, writeConf.distributionMode()); Assert.assertEquals(expectedMode, writeConf.copyOnWriteDistributionMode(DELETE)); From 12436d4fda0ef0824d2b5d370e2c67af60392a82 Mon Sep 17 00:00:00 2001 From: roryqi Date: Fri, 1 Sep 2023 07:16:13 +0800 Subject: [PATCH 18/22] fix --- .../apache/iceberg/spark/SparkWriteConf.java | 65 +++++++++++-------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index ece406a8d92c..914c02155b92 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -477,40 +477,42 @@ private Map deleteWriteProperties() { FileFormat deleteFormat = deleteFileFormat(); switch (deleteFormat) { case PARQUET: - String parquetCodec = deleteParquetCompressionCodec(); - writeProperties.put( + setWritePropertyWithFallback( + writeProperties, DELETE_PARQUET_COMPRESSION, - parquetCodec != null ? parquetCodec : parquetCompressionCodec()); - String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); - if (deleteParquetCompressionLevel != null) { - writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); - } else if (parquetCompressionLevel() != null) { - writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel()); - } + deleteParquetCompressionCodec(), + parquetCompressionCodec()); + setWritePropertyWithFallback( + writeProperties, + DELETE_PARQUET_COMPRESSION_LEVEL, + deleteParquetCompressionLevel(), + parquetCompressionLevel()); break; case AVRO: - String avroCodec = deleteAvroCompressionCodec(); - writeProperties.put( - DELETE_AVRO_COMPRESSION, avroCodec != null ? avroCodec : avroCompressionCodec()); - String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); - if (deleteAvroCompressionLevel != null) { - writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); - } else if (avroCompressionLevel() != null) { - writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); - } + setWritePropertyWithFallback( + writeProperties, + DELETE_AVRO_COMPRESSION, + deleteAvroCompressionCodec(), + avroCompressionCodec()); + setWritePropertyWithFallback( + writeProperties, + DELETE_AVRO_COMPRESSION_LEVEL, + deleteAvroCompressionLevel(), + avroCompressionLevel()); break; case ORC: - String orcCodec = deleteOrcCompressionCodec(); - writeProperties.put( - DELETE_ORC_COMPRESSION, orcCodec != null ? orcCodec : orcCompressionCodec()); - String strategy = deleteOrcCompressionStrategy(); - if (strategy != null) { - writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, strategy); - } else { - writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); - } + setWritePropertyWithFallback( + writeProperties, + DELETE_ORC_COMPRESSION, + deleteOrcCompressionCodec(), + orcCompressionCodec()); + setWritePropertyWithFallback( + writeProperties, + DELETE_ORC_COMPRESSION_STRATEGY, + deleteOrcCompressionStrategy(), + orcCompressionStrategy()); break; default: @@ -520,6 +522,15 @@ private Map deleteWriteProperties() { return writeProperties; } + private void setWritePropertyWithFallback( + Map writeProperties, String key, String value, String fallbackValue) { + if (value != null) { + writeProperties.put(key, value); + } else if (fallbackValue != null) { + writeProperties.put(key, fallbackValue); + } + } + private String parquetCompressionCodec() { return confParser .stringConf() From 7f9c1450243e15d0dde82f90f685d7397c83c313 Mon Sep 17 00:00:00 2001 From: roryqi Date: Fri, 1 Sep 2023 12:12:43 +0800 Subject: [PATCH 19/22] fix --- .../src/main/java/org/apache/iceberg/spark/SparkWriteConf.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 914c02155b92..14847a97bf1a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -442,6 +442,7 @@ private Map dataWriteProperties() { Map writeProperties = Maps.newHashMap(); FileFormat dataFormat = dataFileFormat(); + switch (dataFormat) { case PARQUET: writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); @@ -475,6 +476,7 @@ private Map deleteWriteProperties() { Map writeProperties = Maps.newHashMap(); FileFormat deleteFormat = deleteFileFormat(); + switch (deleteFormat) { case PARQUET: setWritePropertyWithFallback( From 5ba3feffd00214dff754b9455d6bf7fe4ae849cf Mon Sep 17 00:00:00 2001 From: roryqi Date: Fri, 1 Sep 2023 12:18:00 +0800 Subject: [PATCH 20/22] fix --- .../src/main/java/org/apache/iceberg/spark/SparkWriteConf.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 14847a97bf1a..df3e2051f771 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -440,7 +440,6 @@ public Map writeProperties() { private Map dataWriteProperties() { Map writeProperties = Maps.newHashMap(); - FileFormat dataFormat = dataFileFormat(); switch (dataFormat) { @@ -474,7 +473,6 @@ private Map dataWriteProperties() { private Map deleteWriteProperties() { Map writeProperties = Maps.newHashMap(); - FileFormat deleteFormat = deleteFileFormat(); switch (deleteFormat) { From 8c49de0c6dfccd1a44382137fa60014f34a16e4e Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 6 Sep 2023 20:21:45 +0800 Subject: [PATCH 21/22] fix --- .../iceberg/spark/TestSparkWriteConf.java | 443 +++++++++--------- 1 file changed, 226 insertions(+), 217 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index a32bff75f245..6c9f4af76928 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -183,22 +183,240 @@ public void testSparkWriteConfDistributionModeWithEverything() { } @Test - public void testSparkWriteConfWriteProperties() { - for (Object[] propertiesSuite : getOverridePropertiesSuites()) { - testWritePropertiesBySuite(propertiesSuite); + public void testSparkConfOverride() { + Object[][] propertiesSuites = + new Object[][] { + { + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "gzip", + TableProperties.DELETE_PARQUET_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "3", + DELETE_PARQUET_COMPRESSION_LEVEL, + "3") + }, + { + ImmutableMap.of( + COMPRESSION_CODEC, "zstd", SparkSQLProperties.COMPRESSION_STRATEGY, "compression"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "zlib", + DELETE_ORC_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression", + ORC_COMPRESSION_STRATEGY, + "compression") + }, + { + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "gzip", + DELETE_AVRO_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9") + }, + }; + for (Object[] propertiesSuite : propertiesSuites) { + testWriteProperties(propertiesSuite); } + } - for (Object[] propertiesSuite : getNonOverridePropertiesSuite()) { - testWritePropertiesBySuite(propertiesSuite); + @Test + public void testDataPropsDefaultsAsDeleteProps() { + Object[][] propertiesSuites = + new Object[][] { + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "5") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "speed") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9") + } + }; + for (Object[] propertiesSuite : propertiesSuites) { + testWriteProperties(propertiesSuite); + } + } + + @Test + public void testDeleteFileWriteConf() { + Object[][] propertiesSuites = + new Object[][] { + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression") + }, + { + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION, + "zstd", + DELETE_AVRO_COMPRESSION_LEVEL, + "16"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "16") + } + }; + for (Object[] propertiesSuite : propertiesSuites) { + testWriteProperties(propertiesSuite); } } - private void testWritePropertiesBySuite(Object[] propertiesSuite) { + private void testWriteProperties(Object[] propertiesSuite) { withSQLConf( (Map) propertiesSuite[0], () -> { Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = ImmutableMap.of(); Map tableProperties = (Map) propertiesSuite[1]; UpdateProperties updateProperties = table.updateProperties(); for (Map.Entry entry : tableProperties.entrySet()) { @@ -207,6 +425,7 @@ private void testWritePropertiesBySuite(Object[] propertiesSuite) { updateProperties.commit(); + Map writeOptions = ImmutableMap.of(); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); Map writeProperties = writeConf.writeProperties(); Map expectedProperties = (Map) propertiesSuite[2]; @@ -225,216 +444,6 @@ private void testWritePropertiesBySuite(Object[] propertiesSuite) { }); } - private Object[][] getOverridePropertiesSuites() { - return new Object[][] { - { - ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - TableProperties.PARQUET_COMPRESSION, - "gzip", - TableProperties.DELETE_PARQUET_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "3", - DELETE_PARQUET_COMPRESSION_LEVEL, - "3") - }, - { - ImmutableMap.of( - COMPRESSION_CODEC, "zstd", SparkSQLProperties.COMPRESSION_STRATEGY, "compression"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "zlib", - DELETE_ORC_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "zstd", - ORC_COMPRESSION, - "zstd", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression", - ORC_COMPRESSION_STRATEGY, - "compression") - }, - { - ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "gzip", - DELETE_AVRO_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "9") - }, - }; - } - - private Object[][] getNonOverridePropertiesSuite() { - return new Object[][] { - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "5") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION_STRATEGY, - "speed") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "9") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - TableProperties.PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "6"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "6") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION, - "zstd", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "zstd", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION, - "zstd", - DELETE_AVRO_COMPRESSION_LEVEL, - "16"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "16") - } - }; - } - private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf) { Assert.assertEquals(expectedMode, writeConf.distributionMode()); Assert.assertEquals(expectedMode, writeConf.copyOnWriteDistributionMode(DELETE)); From 028fd20303a185b274b132f8e5465d40ca9bc249 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 6 Sep 2023 22:28:50 +0800 Subject: [PATCH 22/22] fix --- .../iceberg/spark/TestSparkWriteConf.java | 425 +++++++++--------- 1 file changed, 209 insertions(+), 216 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 6c9f4af76928..a7c22b446813 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -45,12 +45,14 @@ import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; +import java.util.List; import java.util.Map; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -184,240 +186,231 @@ public void testSparkWriteConfDistributionModeWithEverything() { @Test public void testSparkConfOverride() { - Object[][] propertiesSuites = - new Object[][] { - { - ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - TableProperties.PARQUET_COMPRESSION, - "gzip", - TableProperties.DELETE_PARQUET_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "3", - DELETE_PARQUET_COMPRESSION_LEVEL, - "3") - }, - { - ImmutableMap.of( - COMPRESSION_CODEC, "zstd", SparkSQLProperties.COMPRESSION_STRATEGY, "compression"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "zlib", - DELETE_ORC_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "zstd", - ORC_COMPRESSION, - "zstd", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression", - ORC_COMPRESSION_STRATEGY, - "compression") - }, - { - ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "gzip", - DELETE_AVRO_COMPRESSION, - "snappy"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "9") - }, - }; - for (Object[] propertiesSuite : propertiesSuites) { + List>> propertiesSuites = + Lists.newArrayList( + Lists.newArrayList( + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "gzip", + TableProperties.DELETE_PARQUET_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "3", + DELETE_PARQUET_COMPRESSION_LEVEL, + "3")), + Lists.newArrayList( + ImmutableMap.of( + COMPRESSION_CODEC, + "zstd", + SparkSQLProperties.COMPRESSION_STRATEGY, + "compression"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "zlib", + DELETE_ORC_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression", + ORC_COMPRESSION_STRATEGY, + "compression")), + Lists.newArrayList( + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "gzip", + DELETE_AVRO_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9"))); + for (List> propertiesSuite : propertiesSuites) { testWriteProperties(propertiesSuite); } } @Test public void testDataPropsDefaultsAsDeleteProps() { - Object[][] propertiesSuites = - new Object[][] { - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "5") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION_STRATEGY, - "speed") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "9") - } - }; - for (Object[] propertiesSuite : propertiesSuites) { + List>> propertiesSuites = + Lists.newArrayList( + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "5")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "speed")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9"))); + for (List> propertiesSuite : propertiesSuites) { testWriteProperties(propertiesSuite); } } @Test public void testDeleteFileWriteConf() { - Object[][] propertiesSuites = - new Object[][] { - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "parquet", - DELETE_DEFAULT_FILE_FORMAT, - "parquet", - TableProperties.PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "6"), - ImmutableMap.of( - DELETE_PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION, - "zstd", - PARQUET_COMPRESSION_LEVEL, - "5", - DELETE_PARQUET_COMPRESSION_LEVEL, - "6") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "orc", - DELETE_DEFAULT_FILE_FORMAT, - "orc", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION, - "zstd", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression"), - ImmutableMap.of( - DELETE_ORC_COMPRESSION, - "zstd", - ORC_COMPRESSION, - "snappy", - ORC_COMPRESSION_STRATEGY, - "speed", - DELETE_ORC_COMPRESSION_STRATEGY, - "compression") - }, - { - ImmutableMap.of(), - ImmutableMap.of( - DEFAULT_FILE_FORMAT, - "avro", - DELETE_DEFAULT_FILE_FORMAT, - "avro", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION, - "zstd", - DELETE_AVRO_COMPRESSION_LEVEL, - "16"), - ImmutableMap.of( - DELETE_AVRO_COMPRESSION, - "zstd", - AVRO_COMPRESSION, - "snappy", - AVRO_COMPRESSION_LEVEL, - "9", - DELETE_AVRO_COMPRESSION_LEVEL, - "16") - } - }; - for (Object[] propertiesSuite : propertiesSuites) { + List>> propertiesSuites = + Lists.newArrayList( + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION, + "zstd", + DELETE_AVRO_COMPRESSION_LEVEL, + "16"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "16"))); + for (List> propertiesSuite : propertiesSuites) { testWriteProperties(propertiesSuite); } } - private void testWriteProperties(Object[] propertiesSuite) { + private void testWriteProperties(List> propertiesSuite) { withSQLConf( - (Map) propertiesSuite[0], + propertiesSuite.get(0), () -> { Table table = validationCatalog.loadTable(tableIdent); - Map tableProperties = (Map) propertiesSuite[1]; + Map tableProperties = propertiesSuite.get(1); UpdateProperties updateProperties = table.updateProperties(); for (Map.Entry entry : tableProperties.entrySet()) { updateProperties.set(entry.getKey(), entry.getValue()); @@ -428,7 +421,7 @@ private void testWriteProperties(Object[] propertiesSuite) { Map writeOptions = ImmutableMap.of(); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); Map writeProperties = writeConf.writeProperties(); - Map expectedProperties = (Map) propertiesSuite[2]; + Map expectedProperties = propertiesSuite.get(2); Assert.assertEquals(expectedProperties.size(), writeConf.writeProperties().size()); for (Map.Entry entry : writeProperties.entrySet()) { Assert.assertEquals(entry.getValue(), expectedProperties.get(entry.getKey()));