From 49dd484ca072b3b69b9760778d85aa1d1ae6cd00 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 11 Aug 2023 18:14:16 -0700 Subject: [PATCH 1/4] Core: use ZSTD compression parquet by default for new tables --- .../org/apache/iceberg/TableMetadata.java | 28 +++++++++++-- .../org/apache/iceberg/TableProperties.java | 1 + .../iceberg/rest/RESTSessionCatalog.java | 9 +++++ .../org/apache/iceberg/TestTableMetadata.java | 23 +++++++++-- .../iceberg/flink/TestFlinkCatalogTable.java | 40 ++++++++++++------- .../flink/sink/TestCompressionSettings.java | 4 +- .../iceberg/flink/TestFlinkCatalogTable.java | 40 ++++++++++++------- .../flink/sink/TestCompressionSettings.java | 2 +- .../iceberg/flink/TestFlinkCatalogTable.java | 40 ++++++++++++------- .../flink/sink/TestCompressionSettings.java | 4 +- .../apache/iceberg/hive/TestHiveCatalog.java | 18 +++++++++ .../org/apache/iceberg/mr/TestCatalogs.java | 23 +++++++++-- .../TestHiveIcebergStorageHandlerNoScan.java | 10 ++++- .../TestMetadataTableReadableMetrics.java | 27 ++++++++----- .../TestMetadataTableReadableMetrics.java | 17 +++++--- 15 files changed, 213 insertions(+), 73 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index a6f1d428f41a..0b2db1f7765c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -68,7 +68,12 @@ public static TableMetadata newTableMetadata( PropertyUtil.propertyAsInt( properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION); return newTableMetadata( - schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion); + schema, + spec, + sortOrder, + location, + withPersistedProperties(unreservedProperties(properties)), + formatVersion); } public static TableMetadata newTableMetadata( @@ -78,7 +83,12 @@ public static TableMetadata newTableMetadata( PropertyUtil.propertyAsInt( properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION); return newTableMetadata( - schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion); + schema, + spec, + sortOrder, + location, + withPersistedProperties(unreservedProperties(properties)), + formatVersion); } private static Map unreservedProperties(Map rawProperties) { @@ -87,6 +97,18 @@ private static Map unreservedProperties(Map rawP .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } + private static Map withPersistedProperties(Map properties) { + Map persistedProperties = Maps.newHashMap(); + persistedProperties.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + persistedProperties.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + persistedProperties.putAll(properties); + return ImmutableMap.copyOf(persistedProperties); + } + static TableMetadata newTableMetadata( Schema schema, PartitionSpec spec, @@ -119,7 +141,7 @@ static TableMetadata newTableMetadata( int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID; SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder); - // Validate the metrics configuration. Note: we only do this on new tables to we don't + // Validate the metrics configuration. Note: we only do this on new tables to not // break existing tables. MetricsConfig.fromProperties(properties).validateReferencedColumns(schema); diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index a9116bc57f83..3b46b67abbd2 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -143,6 +143,7 @@ private TableProperties() {} public static final String PARQUET_COMPRESSION = "write.parquet.compression-codec"; public static final String DELETE_PARQUET_COMPRESSION = "write.delete.parquet.compression-codec"; public static final String PARQUET_COMPRESSION_DEFAULT = "gzip"; + public static final String PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0 = "zstd"; public static final String PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"; public static final String DELETE_PARQUET_COMPRESSION_LEVEL = diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 72547eec3486..7dc72a136a34 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -47,6 +47,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.BaseSessionCatalog; @@ -592,6 +593,14 @@ private Builder(TableIdentifier ident, Schema schema, SessionContext context) { this.ident = ident; this.schema = schema; this.context = context; + + // Explicitly set default Parquet compression codecs for new tables + propertiesBuilder.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + propertiesBuilder.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); } @Override diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 9a75beb59d2a..b1652bffcd86 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -54,6 +54,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; @@ -1461,10 +1462,10 @@ public void testCreateV2MetadataThroughTableProperty() { "format version should be configured based on the format-version key", 2, meta.formatVersion()); + Map expected = defaultProperties(); + expected.put("key", "val"); Assert.assertEquals( - "should not contain format-version in properties", - ImmutableMap.of("key", "val"), - meta.properties()); + "should not contain format-version in properties", expected, meta.properties()); } @Test @@ -1490,9 +1491,12 @@ public void testReplaceV1MetadataToV2ThroughTableProperty() { "format version should be configured based on the format-version key", 2, meta.formatVersion()); + Map expected = defaultProperties(); + expected.put("key", "val"); + expected.put("key2", "val2"); Assert.assertEquals( "should not contain format-version but should contain old and new properties", - ImmutableMap.of("key", "val", "key2", "val2"), + expected, meta.properties()); } @@ -1595,4 +1599,15 @@ private String createManifestListWithManifestFile( return localInput(manifestList).location(); } + + private Map defaultProperties() { + Map properties = Maps.newHashMap(); + properties.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + properties.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + return properties; + } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index d907a58ec2bc..d5330b5296ce 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -44,11 +44,11 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -121,12 +121,12 @@ public void testCreateTable() throws TableNotExistException { Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); } @Test @@ -176,7 +176,7 @@ public void testCreateTableIfNotExists() { sql("CREATE TABLE tl(id BIGINT)"); // Assert that table does exist. - Assert.assertEquals(Maps.newHashMap(), table("tl").properties()); + Assert.assertEquals(defaultProperties(), table("tl").properties()); sql("DROP TABLE tl"); AssertHelpers.assertThrows( @@ -186,9 +186,10 @@ public void testCreateTableIfNotExists() { () -> table("tl")); sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)"); - Assert.assertEquals(Maps.newHashMap(), table("tl").properties()); + Assert.assertEquals(defaultProperties(), table("tl").properties()); - final Map expectedProperties = ImmutableMap.of("key", "value"); + Map expectedProperties = defaultProperties(); + expectedProperties.put("key", "value"); table("tl").updateProperties().set("key", "value").commit(); Assert.assertEquals(expectedProperties, table("tl").properties()); @@ -206,12 +207,12 @@ public void testCreateTableLike() throws TableNotExistException { Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl2"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); } @Test @@ -226,7 +227,7 @@ public void testCreateTableLocation() { new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); Assert.assertEquals("file:///tmp/location", table.location()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); } @Test @@ -242,7 +243,7 @@ public void testCreatePartitionTable() throws TableNotExistException { table.schema().asStruct()); Assert.assertEquals( PartitionSpec.builderFor(table.schema()).identity("dt").build(), table.spec()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( @@ -251,7 +252,7 @@ public void testCreatePartitionTable() throws TableNotExistException { .field("dt", DataTypes.STRING()) .build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys()); } @@ -304,14 +305,14 @@ public void testLoadTransformPartitionTable() throws TableNotExistException { CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); Assert.assertEquals(Collections.emptyList(), catalogTable.getPartitionKeys()); } @Test public void testAlterTable() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); + Map properties = defaultProperties(); properties.put("oldK", "oldV"); // new @@ -337,7 +338,7 @@ public void testAlterTable() throws TableNotExistException { @Test public void testAlterTableWithPrimaryKey() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); + Map properties = defaultProperties(); properties.put("oldK", "oldV"); // new @@ -442,4 +443,15 @@ private CatalogTable catalogTable(String name) throws TableNotExistException { .get() .getTable(new ObjectPath(DATABASE, name)); } + + private Map defaultProperties() { + Map properties = Maps.newHashMap(); + properties.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + properties.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + return properties; + } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 49f472b7325e..4d174866ca68 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -134,11 +134,11 @@ public void testCompressionParquet() throws Exception { if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) { Assert.assertEquals( - TableProperties.PARQUET_COMPRESSION_DEFAULT, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0, resultProperties.get(TableProperties.PARQUET_COMPRESSION)); Assert.assertEquals( TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT, - resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)); } else { Assert.assertEquals( initProperties.get(TableProperties.PARQUET_COMPRESSION), diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index d907a58ec2bc..d5330b5296ce 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -44,11 +44,11 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -121,12 +121,12 @@ public void testCreateTable() throws TableNotExistException { Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); } @Test @@ -176,7 +176,7 @@ public void testCreateTableIfNotExists() { sql("CREATE TABLE tl(id BIGINT)"); // Assert that table does exist. - Assert.assertEquals(Maps.newHashMap(), table("tl").properties()); + Assert.assertEquals(defaultProperties(), table("tl").properties()); sql("DROP TABLE tl"); AssertHelpers.assertThrows( @@ -186,9 +186,10 @@ public void testCreateTableIfNotExists() { () -> table("tl")); sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)"); - Assert.assertEquals(Maps.newHashMap(), table("tl").properties()); + Assert.assertEquals(defaultProperties(), table("tl").properties()); - final Map expectedProperties = ImmutableMap.of("key", "value"); + Map expectedProperties = defaultProperties(); + expectedProperties.put("key", "value"); table("tl").updateProperties().set("key", "value").commit(); Assert.assertEquals(expectedProperties, table("tl").properties()); @@ -206,12 +207,12 @@ public void testCreateTableLike() throws TableNotExistException { Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl2"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); } @Test @@ -226,7 +227,7 @@ public void testCreateTableLocation() { new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); Assert.assertEquals("file:///tmp/location", table.location()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); } @Test @@ -242,7 +243,7 @@ public void testCreatePartitionTable() throws TableNotExistException { table.schema().asStruct()); Assert.assertEquals( PartitionSpec.builderFor(table.schema()).identity("dt").build(), table.spec()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( @@ -251,7 +252,7 @@ public void testCreatePartitionTable() throws TableNotExistException { .field("dt", DataTypes.STRING()) .build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys()); } @@ -304,14 +305,14 @@ public void testLoadTransformPartitionTable() throws TableNotExistException { CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); Assert.assertEquals(Collections.emptyList(), catalogTable.getPartitionKeys()); } @Test public void testAlterTable() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); + Map properties = defaultProperties(); properties.put("oldK", "oldV"); // new @@ -337,7 +338,7 @@ public void testAlterTable() throws TableNotExistException { @Test public void testAlterTableWithPrimaryKey() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); + Map properties = defaultProperties(); properties.put("oldK", "oldV"); // new @@ -442,4 +443,15 @@ private CatalogTable catalogTable(String name) throws TableNotExistException { .get() .getTable(new ObjectPath(DATABASE, name)); } + + private Map defaultProperties() { + Map properties = Maps.newHashMap(); + properties.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + properties.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + return properties; + } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 49f472b7325e..3f1172a19cc0 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -134,7 +134,7 @@ public void testCompressionParquet() throws Exception { if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) { Assert.assertEquals( - TableProperties.PARQUET_COMPRESSION_DEFAULT, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0, resultProperties.get(TableProperties.PARQUET_COMPRESSION)); Assert.assertEquals( TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT, diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index db83cea1e536..6bf06218dc9d 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -43,11 +43,11 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -121,12 +121,12 @@ public void testCreateTable() throws TableNotExistException { Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); } @Test @@ -176,7 +176,7 @@ public void testCreateTableIfNotExists() { sql("CREATE TABLE tl(id BIGINT)"); // Assert that table does exist. - Assert.assertEquals(Maps.newHashMap(), table("tl").properties()); + Assert.assertEquals(defaultProperties(), table("tl").properties()); sql("DROP TABLE tl"); Assertions.assertThatThrownBy(() -> table("tl")) @@ -184,9 +184,10 @@ public void testCreateTableIfNotExists() { .hasMessage("Table does not exist: " + getFullQualifiedTableName("tl")); sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)"); - Assert.assertEquals(Maps.newHashMap(), table("tl").properties()); + Assert.assertEquals(defaultProperties(), table("tl").properties()); - final Map expectedProperties = ImmutableMap.of("key", "value"); + Map expectedProperties = defaultProperties(); + expectedProperties.put("key", "value"); table("tl").updateProperties().set("key", "value").commit(); Assert.assertEquals(expectedProperties, table("tl").properties()); @@ -204,12 +205,12 @@ public void testCreateTableLike() throws TableNotExistException { Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl2"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); } @Test @@ -224,7 +225,7 @@ public void testCreateTableLocation() { new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), table.schema().asStruct()); Assert.assertEquals("file:///tmp/location", table.location()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); } @Test @@ -240,7 +241,7 @@ public void testCreatePartitionTable() throws TableNotExistException { table.schema().asStruct()); Assert.assertEquals( PartitionSpec.builderFor(table.schema()).identity("dt").build(), table.spec()); - Assert.assertEquals(Maps.newHashMap(), table.properties()); + Assert.assertEquals(defaultProperties(), table.properties()); CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( @@ -249,7 +250,7 @@ public void testCreatePartitionTable() throws TableNotExistException { .field("dt", DataTypes.STRING()) .build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys()); } @@ -301,14 +302,14 @@ public void testLoadTransformPartitionTable() throws TableNotExistException { CatalogTable catalogTable = catalogTable("tl"); Assert.assertEquals( TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema()); - Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions()); + Assert.assertEquals(defaultProperties(), catalogTable.getOptions()); Assert.assertEquals(Collections.emptyList(), catalogTable.getPartitionKeys()); } @Test public void testAlterTable() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); + Map properties = defaultProperties(); properties.put("oldK", "oldV"); // new @@ -334,7 +335,7 @@ public void testAlterTable() throws TableNotExistException { @Test public void testAlterTableWithPrimaryKey() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); + Map properties = defaultProperties(); properties.put("oldK", "oldV"); // new @@ -439,4 +440,15 @@ private CatalogTable catalogTable(String name) throws TableNotExistException { .get() .getTable(new ObjectPath(DATABASE, name)); } + + private Map defaultProperties() { + Map properties = Maps.newHashMap(); + properties.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + properties.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + return properties; + } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 49f472b7325e..f5bf2378a56f 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -134,11 +134,11 @@ public void testCompressionParquet() throws Exception { if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) { Assert.assertEquals( - TableProperties.PARQUET_COMPRESSION_DEFAULT, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0, resultProperties.get(TableProperties.PARQUET_COMPRESSION)); Assert.assertEquals( TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT, - resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL)); + resultProperties.get(TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0)); } else { Assert.assertEquals( initProperties.get(TableProperties.PARQUET_COMPRESSION), diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 6b06c33a730b..7320d3382672 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -124,6 +124,15 @@ public void testCreateTableBuilder() throws Exception { assertThat(table.spec().fields()).hasSize(1); assertThat(table.properties()).containsEntry("key1", "value1"); assertThat(table.properties()).containsEntry("key2", "value2"); + // Default Parquet compression is explicitly set for new tables + assertThat(table.properties()) + .containsEntry( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + assertThat(table.properties()) + .containsEntry( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); } finally { catalog.dropTable(tableIdent); } @@ -146,6 +155,15 @@ public void testCreateTableWithCaching() throws Exception { assertThat(table.spec().fields()).hasSize(1); assertThat(table.properties()).containsEntry("key1", "value1"); assertThat(table.properties()).containsEntry("key2", "value2"); + // Default Parquet compression is explicitly set for new tables + assertThat(table.properties()) + .containsEntry( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + assertThat(table.properties()) + .containsEntry( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); } finally { cachingCatalog.dropTable(tableIdent); } diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java index dba1994aec96..49602240df21 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java @@ -21,7 +21,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import java.io.IOException; -import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -32,12 +32,14 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -126,7 +128,9 @@ public void testCreateDropTableToLocation() throws IOException { Assert.assertEquals(properties.getProperty("location"), table.location()); Assert.assertEquals(SchemaParser.toJson(SCHEMA), SchemaParser.toJson(table.schema())); Assert.assertEquals(PartitionSpecParser.toJson(SPEC), PartitionSpecParser.toJson(table.spec())); - Assert.assertEquals(Collections.singletonMap("dummy", "test"), table.properties()); + Map expected = defaultProperties(); + expected.put("dummy", "test"); + Assert.assertEquals(expected, table.properties()); Assertions.assertThatThrownBy(() -> Catalogs.dropTable(conf, new Properties())) .isInstanceOf(NullPointerException.class) @@ -178,7 +182,9 @@ public void testCreateDropTableToCatalog() throws IOException { Assert.assertEquals(SchemaParser.toJson(SCHEMA), SchemaParser.toJson(table.schema())); Assert.assertEquals(PartitionSpecParser.toJson(SPEC), PartitionSpecParser.toJson(table.spec())); - Assert.assertEquals(Collections.singletonMap("dummy", "test"), table.properties()); + Map expected = defaultProperties(); + expected.put("dummy", "test"); + Assert.assertEquals(expected, table.properties()); Assertions.assertThatThrownBy(() -> Catalogs.dropTable(conf, new Properties())) .isInstanceOf(NullPointerException.class) @@ -293,4 +299,15 @@ private void setCustomCatalogProperties(String catalogName, String warehouseLoca CustomHadoopCatalog.class.getName()); conf.set(InputFormatConfig.CATALOG_NAME, catalogName); } + + private Map defaultProperties() { + Map properties = Maps.newHashMap(); + properties.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + properties.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + return properties; + } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 775ecbffe109..67f5383d51f2 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -759,6 +759,12 @@ public void testIcebergAndHmsTableProperties() throws Exception { expectedIcebergProperties.put("custom_property", "initial_val"); expectedIcebergProperties.put("EXTERNAL", "TRUE"); expectedIcebergProperties.put("storage_handler", HiveIcebergStorageHandler.class.getName()); + expectedIcebergProperties.put( + TableProperties.PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + expectedIcebergProperties.put( + TableProperties.DELETE_PARQUET_COMPRESSION, + TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); // Check the HMS table parameters org.apache.hadoop.hive.metastore.api.Table hmsTable = @@ -779,7 +785,7 @@ public void testIcebergAndHmsTableProperties() throws Exception { Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) { - Assert.assertEquals(13, hmsParams.size()); + Assert.assertEquals(15, hmsParams.size()); Assert.assertEquals("initial_val", hmsParams.get("custom_property")); Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); @@ -825,7 +831,7 @@ public void testIcebergAndHmsTableProperties() throws Exception { if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) { Assert.assertEquals( - 16, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop + 18, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop Assert.assertEquals("true", hmsParams.get("new_prop_1")); Assert.assertEquals("false", hmsParams.get("new_prop_2")); Assert.assertEquals("new_val", hmsParams.get("custom_property")); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java index 416d5eed5b65..5497017460e2 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java @@ -43,6 +43,7 @@ import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.After; @@ -129,7 +130,7 @@ private Table createPrimitiveTable() throws IOException { return table; } - private void createNestedTable() throws IOException { + private Pair createNestedTable() throws IOException { Table table = catalog.createTable( TableIdentifier.of(Namespace.of(database()), tableName()), @@ -145,6 +146,7 @@ private void createNestedTable() throws IOException { DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); table.newAppend().appendFile(dataFile).commit(); + return Pair.of(table, dataFile); } @After @@ -324,15 +326,22 @@ public void testSelectNestedValues() throws Exception { @Test public void testNestedValues() throws Exception { - createNestedTable(); - - Object[] leafDoubleCol = row(53L, 3L, 1L, 1L, 0.0D, 0.0D); - Object[] leafLongCol = row(54L, 3L, 1L, null, 0L, 1L); + Pair table = createNestedTable(); + int longColId = + table.first().schema().findField("nestedStructCol.leafStructCol.leafLongCol").fieldId(); + int doubleColId = + table.first().schema().findField("nestedStructCol.leafStructCol.leafDoubleCol").fieldId(); + + Object[] leafDoubleCol = + row(table.second().columnSizes().get(doubleColId), 3L, 1L, 1L, 0.0D, 0.0D); + Object[] leafLongCol = row(table.second().columnSizes().get(longColId), 3L, 1L, null, 0L, 1L); Object[] metrics = row(leafDoubleCol, leafLongCol); - assertEquals( - "Row should match", - ImmutableList.of(new Object[] {metrics}), - sql("SELECT readable_metrics FROM %s.files", tableName)); + List expected = ImmutableList.of(new Object[] {metrics}); + String sql = "SELECT readable_metrics FROM %s.%s"; + List filesReadableMetrics = sql(String.format(sql, tableName, "files")); + List entriesReadableMetrics = sql(String.format(sql, tableName, "entries")); + assertEquals("Row should match for files table", expected, filesReadableMetrics); + assertEquals("Row should match for entries table", expected, entriesReadableMetrics); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java index f65da4574284..9075257fa9f1 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadableMetrics.java @@ -43,6 +43,7 @@ import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.After; @@ -129,7 +130,7 @@ private Table createPrimitiveTable() throws IOException { return table; } - private void createNestedTable() throws IOException { + private Pair createNestedTable() throws IOException { Table table = catalog.createTable( TableIdentifier.of(Namespace.of(database()), tableName()), @@ -145,6 +146,7 @@ private void createNestedTable() throws IOException { DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); table.newAppend().appendFile(dataFile).commit(); + return Pair.of(table, dataFile); } @After @@ -351,10 +353,15 @@ public void testSelectNestedValues() throws Exception { @Test public void testNestedValues() throws Exception { - createNestedTable(); - - Object[] leafDoubleCol = row(53L, 3L, 1L, 1L, 0.0D, 0.0D); - Object[] leafLongCol = row(54L, 3L, 1L, null, 0L, 1L); + Pair table = createNestedTable(); + int longColId = + table.first().schema().findField("nestedStructCol.leafStructCol.leafLongCol").fieldId(); + int doubleColId = + table.first().schema().findField("nestedStructCol.leafStructCol.leafDoubleCol").fieldId(); + + Object[] leafDoubleCol = + row(table.second().columnSizes().get(doubleColId), 3L, 1L, 1L, 0.0D, 0.0D); + Object[] leafLongCol = row(table.second().columnSizes().get(longColId), 3L, 1L, null, 0L, 1L); Object[] metrics = row(leafDoubleCol, leafLongCol); List expected = ImmutableList.of(new Object[] {metrics}); From ff146aaecbc1ea1f7b0cb356407a9d4635378b65 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 30 Aug 2023 17:18:06 -0700 Subject: [PATCH 2/4] Properly override delete file compression --- .../src/main/java/org/apache/iceberg/spark/SparkWriteConf.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index df3e2051f771..33a97829a520 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -431,7 +431,7 @@ public String branch() { return branch; } - public Map writeProperties() { + public Map writeProperties(FileFormat format) { Map writeProperties = Maps.newHashMap(); writeProperties.putAll(dataWriteProperties()); writeProperties.putAll(deleteWriteProperties()); @@ -445,6 +445,7 @@ private Map dataWriteProperties() { switch (dataFormat) { case PARQUET: writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); String parquetCompressionLevel = parquetCompressionLevel(); if (parquetCompressionLevel != null) { writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); From 0e970dd937ca5d5d54b8d99a030bf3bcd59289c8 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 31 Aug 2023 14:12:50 -0700 Subject: [PATCH 3/4] Comment out delete file test under fix --- .../org/apache/iceberg/spark/SparkWriteConf.java | 3 +-- .../spark/source/TestCompressionSettings.java | 15 ++++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 33a97829a520..a47c07e2711a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -430,7 +430,7 @@ public String branch() { return branch; } - + public Map writeProperties(FileFormat format) { Map writeProperties = Maps.newHashMap(); writeProperties.putAll(dataWriteProperties()); @@ -445,7 +445,6 @@ private Map dataWriteProperties() { switch (dataFormat) { case PARQUET: writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); - writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); String parquetCompressionLevel = parquetCompressionLevel(); if (parquetCompressionLevel != null) { writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 760e735acee9..41f33cfebb6f 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -180,13 +180,14 @@ public void testWriteDataWithDifferentSetting() throws Exception { List deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); Map specMap = Maps.newHashMap(); specMap.put(0, PartitionSpec.unpartitioned()); - try (ManifestReader reader = - ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { - DeleteFile file = reader.iterator().next(); - InputFile inputFile = table.io().newInputFile(file.path().toString()); - Assertions.assertThat(getCompressionType(inputFile)) - .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); - } + // TODO compression write conf currently does not apply to position deletes + // try (ManifestReader reader = + // ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + // DeleteFile file = reader.iterator().next(); + // InputFile inputFile = table.io().newInputFile(file.path().toString()); + // Assertions.assertThat(getCompressionType(inputFile)) + // .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + // } if (PARQUET.equals(format)) { SparkActions.get(spark) From 3e8b22743c4bde83261ff08acc2819c93d4609ac Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 7 Sep 2023 10:34:53 +0800 Subject: [PATCH 4/4] Re-enable commented out test after fix of #8438 --- .../org/apache/iceberg/spark/SparkWriteConf.java | 4 ++-- .../spark/source/TestCompressionSettings.java | 15 +++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index a47c07e2711a..df3e2051f771 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -430,8 +430,8 @@ public String branch() { return branch; } - - public Map writeProperties(FileFormat format) { + + public Map writeProperties() { Map writeProperties = Maps.newHashMap(); writeProperties.putAll(dataWriteProperties()); writeProperties.putAll(deleteWriteProperties()); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 41f33cfebb6f..760e735acee9 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -180,14 +180,13 @@ public void testWriteDataWithDifferentSetting() throws Exception { List deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io()); Map specMap = Maps.newHashMap(); specMap.put(0, PartitionSpec.unpartitioned()); - // TODO compression write conf currently does not apply to position deletes - // try (ManifestReader reader = - // ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { - // DeleteFile file = reader.iterator().next(); - // InputFile inputFile = table.io().newInputFile(file.path().toString()); - // Assertions.assertThat(getCompressionType(inputFile)) - // .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); - // } + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), table.io(), specMap)) { + DeleteFile file = reader.iterator().next(); + InputFile inputFile = table.io().newInputFile(file.path().toString()); + Assertions.assertThat(getCompressionType(inputFile)) + .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); + } if (PARQUET.equals(format)) { SparkActions.get(spark)