From 46b1ab16c241b3115c53611afe285bda88ebbd46 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 22:53:42 +0800 Subject: [PATCH 1/4] Spark 3.4: BaseMetadataTable should respect the base table properties --- .../org/apache/iceberg/BaseMetadataTable.java | 2 +- .../spark/source/TestCompressionSettings.java | 27 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 5f7c48e95867..a174da3d3c19 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -166,7 +166,7 @@ public Map sortOrders() { @Override public Map properties() { - return ImmutableMap.of(); + return table.properties(); } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index cc3aa9121b3a..be5ea8e9def4 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -182,21 +182,20 @@ public void testWriteDataWithDifferentSetting() throws Exception { .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - if (PARQUET.equals(format)) { - SparkActions.get(spark) - .rewritePositionDeletes(table) - .option(SizeBasedFileRewriter.REWRITE_ALL, "true") - .execute(); - table.refresh(); - deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); - try (ManifestReader reader = - ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { - DeleteFile file = reader.iterator().next(); - InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(inputFile)) - .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); - } + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + table.refresh(); + deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } + } private String getCompressionType(InputFile inputFile) throws Exception { From 182a73844c40574b114bc98dd347a9b36fe3257e Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 23:03:57 +0800 Subject: [PATCH 2/4] [WIP] Spark 3.4: PositionDeletesTable properties should respect the base table properties --- core/src/main/java/org/apache/iceberg/BaseMetadataTable.java | 2 +- .../main/java/org/apache/iceberg/PositionDeletesTable.java | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index a174da3d3c19..5f7c48e95867 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -166,7 +166,7 @@ public Map sortOrders() { @Override public Map properties() { - return table.properties(); + return ImmutableMap.of(); } @Override diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index f8cb924e536a..0b3ce435a2ae 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -253,6 +253,11 @@ protected CloseableIterable doPlanFiles() { } } + @Override + public Map properties() { + return table().properties(); + } + private CloseableIterable posDeletesScanTasks( ManifestFile manifest, PartitionSpec spec, From b5fce2c1d7c9e4bfada53f5016e9c121926e0131 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 23:13:46 +0800 Subject: [PATCH 3/4] fix --- .../java/org/apache/iceberg/PositionDeletesTable.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 0b3ce435a2ae..5f1a5b148722 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -93,6 +93,11 @@ public Map specs() { return specs; } + @Override + public Map properties() { + return table().properties(); + } + private Schema calculateSchema() { Types.StructType partitionType = Partitioning.partitionType(table()); Schema result = @@ -253,10 +258,6 @@ protected CloseableIterable doPlanFiles() { } } - @Override - public Map properties() { - return table().properties(); - } private CloseableIterable posDeletesScanTasks( ManifestFile manifest, From f600ed20a6e6be411f968fa8bf5005820b630479 Mon Sep 17 00:00:00 2001 From: roryqi Date: Wed, 30 Aug 2023 23:16:30 +0800 Subject: [PATCH 4/4] fix --- core/src/main/java/org/apache/iceberg/PositionDeletesTable.java | 1 - .../org/apache/iceberg/spark/source/TestCompressionSettings.java | 1 - 2 files changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 5f1a5b148722..38c778633b2c 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -258,7 +258,6 @@ protected CloseableIterable doPlanFiles() { } } - private CloseableIterable posDeletesScanTasks( ManifestFile manifest, PartitionSpec spec, diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index be5ea8e9def4..4030034bf248 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -195,7 +195,6 @@ public void testWriteDataWithDifferentSetting() throws Exception { Assertions.assertThat(getCompressionType(inputFile)) .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); } - } private String getCompressionType(InputFile inputFile) throws Exception {