Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public static TableMetadata newTableMetadata(
PropertyUtil.propertyAsInt(
properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(
schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
schema,
spec,
sortOrder,
location,
withPersistedProperties(unreservedProperties(properties)),
formatVersion);
}

public static TableMetadata newTableMetadata(
Expand All @@ -78,7 +83,12 @@ public static TableMetadata newTableMetadata(
PropertyUtil.propertyAsInt(
properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(
schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
schema,
spec,
sortOrder,
location,
withPersistedProperties(unreservedProperties(properties)),
formatVersion);
}

private static Map<String, String> unreservedProperties(Map<String, String> rawProperties) {
Expand All @@ -87,6 +97,18 @@ private static Map<String, String> unreservedProperties(Map<String, String> rawP
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static Map<String, String> withPersistedProperties(Map<String, String> properties) {
Map<String, String> persistedProperties = Maps.newHashMap();
persistedProperties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
persistedProperties.put(
TableProperties.DELETE_PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
persistedProperties.putAll(properties);
return ImmutableMap.copyOf(persistedProperties);
}

static TableMetadata newTableMetadata(
Schema schema,
PartitionSpec spec,
Expand Down Expand Up @@ -119,7 +141,7 @@ static TableMetadata newTableMetadata(
int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID;
SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder);

// Validate the metrics configuration. Note: we only do this on new tables to we don't
// Validate the metrics configuration. Note: we only do this on new tables to not
// break existing tables.
MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);

Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private TableProperties() {}
public static final String PARQUET_COMPRESSION = "write.parquet.compression-codec";
public static final String DELETE_PARQUET_COMPRESSION = "write.delete.parquet.compression-codec";
public static final String PARQUET_COMPRESSION_DEFAULT = "gzip";
public static final String PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0 = "zstd";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think. I do like the name but I also wonder if it sends a message the new default applies to all existing tables that did not provide a default value. It is probably not what happens as we only use this value in new tables.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@aokolnychyi aokolnychyi Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can go either way here, up to you @szehon-ho.


public static final String PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level";
public static final String DELETE_PARQUET_COMPRESSION_LEVEL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.BaseSessionCatalog;
Expand Down Expand Up @@ -592,6 +593,14 @@ private Builder(TableIdentifier ident, Schema schema, SessionContext context) {
this.ident = ident;
this.schema = schema;
this.context = context;

// Explicitly set default Parquet compression codecs for new tables
propertiesBuilder.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
propertiesBuilder.put(
TableProperties.DELETE_PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
}

@Override
Expand Down
23 changes: 19 additions & 4 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -1461,10 +1462,10 @@ public void testCreateV2MetadataThroughTableProperty() {
"format version should be configured based on the format-version key",
2,
meta.formatVersion());
Map<String, String> expected = defaultProperties();
expected.put("key", "val");
Assert.assertEquals(
"should not contain format-version in properties",
ImmutableMap.of("key", "val"),
meta.properties());
"should not contain format-version in properties", expected, meta.properties());
}

@Test
Expand All @@ -1490,9 +1491,12 @@ public void testReplaceV1MetadataToV2ThroughTableProperty() {
"format version should be configured based on the format-version key",
2,
meta.formatVersion());
Map<String, String> expected = defaultProperties();
expected.put("key", "val");
expected.put("key2", "val2");
Assert.assertEquals(
"should not contain format-version but should contain old and new properties",
ImmutableMap.of("key", "val", "key2", "val2"),
expected,
meta.properties());
}

Expand Down Expand Up @@ -1595,4 +1599,15 @@ private String createManifestListWithManifestFile(

return localInput(manifestList).location();
}

private Map<String, String> defaultProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
properties.put(
TableProperties.DELETE_PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -121,12 +121,12 @@ public void testCreateTable() throws TableNotExistException {
Assert.assertEquals(
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals(Maps.newHashMap(), table.properties());
Assert.assertEquals(defaultProperties(), table.properties());

CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(defaultProperties(), catalogTable.getOptions());
}

@Test
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testCreateTableIfNotExists() {
sql("CREATE TABLE tl(id BIGINT)");

// Assert that table does exist.
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
Assert.assertEquals(defaultProperties(), table("tl").properties());

sql("DROP TABLE tl");
AssertHelpers.assertThrows(
Expand All @@ -186,9 +186,10 @@ public void testCreateTableIfNotExists() {
() -> table("tl"));

sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
Assert.assertEquals(defaultProperties(), table("tl").properties());

final Map<String, String> expectedProperties = ImmutableMap.of("key", "value");
Map<String, String> expectedProperties = defaultProperties();
expectedProperties.put("key", "value");
table("tl").updateProperties().set("key", "value").commit();
Assert.assertEquals(expectedProperties, table("tl").properties());

Expand All @@ -206,12 +207,12 @@ public void testCreateTableLike() throws TableNotExistException {
Assert.assertEquals(
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals(Maps.newHashMap(), table.properties());
Assert.assertEquals(defaultProperties(), table.properties());

CatalogTable catalogTable = catalogTable("tl2");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(defaultProperties(), catalogTable.getOptions());
}

@Test
Expand All @@ -226,7 +227,7 @@ public void testCreateTableLocation() {
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals("file:///tmp/location", table.location());
Assert.assertEquals(Maps.newHashMap(), table.properties());
Assert.assertEquals(defaultProperties(), table.properties());
}

@Test
Expand All @@ -242,7 +243,7 @@ public void testCreatePartitionTable() throws TableNotExistException {
table.schema().asStruct());
Assert.assertEquals(
PartitionSpec.builderFor(table.schema()).identity("dt").build(), table.spec());
Assert.assertEquals(Maps.newHashMap(), table.properties());
Assert.assertEquals(defaultProperties(), table.properties());

CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
Expand All @@ -251,7 +252,7 @@ public void testCreatePartitionTable() throws TableNotExistException {
.field("dt", DataTypes.STRING())
.build(),
catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(defaultProperties(), catalogTable.getOptions());
Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys());
}

Expand Down Expand Up @@ -304,14 +305,14 @@ public void testLoadTransformPartitionTable() throws TableNotExistException {
CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(defaultProperties(), catalogTable.getOptions());
Assert.assertEquals(Collections.emptyList(), catalogTable.getPartitionKeys());
}

@Test
public void testAlterTable() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
Map<String, String> properties = Maps.newHashMap();
Map<String, String> properties = defaultProperties();
properties.put("oldK", "oldV");

// new
Expand All @@ -337,7 +338,7 @@ public void testAlterTable() throws TableNotExistException {
@Test
public void testAlterTableWithPrimaryKey() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')");
Map<String, String> properties = Maps.newHashMap();
Map<String, String> properties = defaultProperties();
properties.put("oldK", "oldV");

// new
Expand Down Expand Up @@ -442,4 +443,15 @@ private CatalogTable catalogTable(String name) throws TableNotExistException {
.get()
.getTable(new ObjectPath(DATABASE, name));
}

private Map<String, String> defaultProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
properties.put(
TableProperties.DELETE_PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ public void testCompressionParquet() throws Exception {

if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) {
Assert.assertEquals(
TableProperties.PARQUET_COMPRESSION_DEFAULT,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0,
resultProperties.get(TableProperties.PARQUET_COMPRESSION));
Assert.assertEquals(
TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT));
} else {
Assert.assertEquals(
initProperties.get(TableProperties.PARQUET_COMPRESSION),
Expand Down
Loading