From 46b1ab16c241b3115c53611afe285bda88ebbd46 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 22:53:42 +0800 Subject: [PATCH 1/6] Spark 3.4: BaseMetadataTable should respect the base table properties --- .../org/apache/iceberg/BaseMetadataTable.java | 2 +- .../spark/source/TestCompressionSettings.java | 27 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 5f7c48e95867..a174da3d3c19 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -166,7 +166,7 @@ public Map sortOrders() { @Override public Map properties() { - return ImmutableMap.of(); + return table.properties(); } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index cc3aa9121b3a..be5ea8e9def4 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -182,21 +182,20 @@ public void testWriteDataWithDifferentSetting() throws Exception { .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - if (PARQUET.equals(format)) { - SparkActions.get(spark) - .rewritePositionDeletes(table) - .option(SizeBasedFileRewriter.REWRITE_ALL, "true") - .execute(); - table.refresh(); - deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); - try (ManifestReader reader = - ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { - DeleteFile file = reader.iterator().next(); - InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(inputFile)) - .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); - } + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + table.refresh(); + deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } + } private String getCompressionType(InputFile inputFile) throws Exception { From bc51208474f5e294d6cb2b46f4f06befdbdc0eed Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 23:16:59 +0800 Subject: [PATCH 2/6] fix --- .../org/apache/iceberg/spark/source/TestCompressionSettings.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index be5ea8e9def4..4030034bf248 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -195,7 +195,6 @@ public void testWriteDataWithDifferentSetting() throws Exception { Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - } private String getCompressionType(InputFile inputFile) throws Exception { From a854c873fdcafe27ab0e73fdbf09e042bd4000d7 Mon Sep 17 00:00:00 2001 From: roryqi Date: Thu, 31 Aug 2023 00:24:54 +0800 Subject: [PATCH 3/6] trigger From f9ebf653f82658cebf781ece9ed58670699f2786 Mon Sep 17 00:00:00 2001 From: roryqi Date: Fri, 1 Sep 2023 07:57:15 +0800 Subject: [PATCH 4/6] Other method use table() instead of table, keep consistent --- core/src/main/java/org/apache/iceberg/BaseMetadataTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index a174da3d3c19..49f46f780f5d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -166,7 +166,7 @@ public Map sortOrders() { @Override public Map properties() { - return table.properties(); + return table().properties(); } @Override From 9d892807b2190e5667dfadb3a5347a4f5f6d8899 Mon Sep 17 00:00:00 2001 From: roryqi Date: Tue, 12 Sep 2023 12:28:08 +0800 Subject: [PATCH 5/6] fix --- .../org/apache/iceberg/PositionDeletesTable.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index f8cb924e536a..dd01c789c8a2 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -21,9 +21,11 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ManifestEvaluator; @@ -93,6 +95,16 @@ public Map specs() { return specs; } + @Override + public Map properties() { + // The write properties are needed by PositionDeletesRewriteAction, + // these properties should respect the ones of BaseTable. + return Collections.unmodifiableMap( + table().properties().entrySet().stream() + .filter(entry -> entry.getKey().startsWith("write.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + private Schema calculateSchema() { Types.StructType partitionType = Partitioning.partitionType(table()); Schema result = From be279f7528fca503fdf6da6f1b0f8aa65e4f337a Mon Sep 17 00:00:00 2001 From: roryqi Date: Tue, 12 Sep 2023 12:33:21 +0800 Subject: [PATCH 6/6] fix --- core/src/main/java/org/apache/iceberg/BaseMetadataTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 49f46f780f5d..5f7c48e95867 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -166,7 +166,7 @@ public Map sortOrders() { @Override public Map properties() { - return table().properties(); + return ImmutableMap.of(); } @Override