From 4b28f3f2b4a0d6ac79e64defb5b2b257578f24d7 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 17 Mar 2025 14:15:08 +0530 Subject: [PATCH 1/2] Data: Refactor PartitionStatsHandler --- .../iceberg/data/PartitionStatsHandler.java | 108 +++++++++--------- .../data/TestPartitionStatsHandler.java | 48 +++++--- 2 files changed, 82 insertions(+), 74 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java index 7e8eff294fa8..46409185f61b 100644 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -61,30 +61,29 @@ public class PartitionStatsHandler { private PartitionStatsHandler() {} - public enum Column { - PARTITION(0), - SPEC_ID(1), - DATA_RECORD_COUNT(2), - DATA_FILE_COUNT(3), - TOTAL_DATA_FILE_SIZE_IN_BYTES(4), - POSITION_DELETE_RECORD_COUNT(5), - POSITION_DELETE_FILE_COUNT(6), - EQUALITY_DELETE_RECORD_COUNT(7), - EQUALITY_DELETE_FILE_COUNT(8), - TOTAL_RECORD_COUNT(9), - LAST_UPDATED_AT(10), - LAST_UPDATED_SNAPSHOT_ID(11); - - private final int id; - - Column(int id) { - this.id = id; - } - - public int id() { - return id; - } - } + public static final int PARTITION_FIELD_ID = 0; + public static final String PARTITION_FIELD_NAME = "partition"; + public static final NestedField SPEC_ID = NestedField.required(1, "SPEC_ID", IntegerType.get()); + public static final NestedField DATA_RECORD_COUNT = + NestedField.required(2, "DATA_RECORD_COUNT", LongType.get()); + public static final NestedField DATA_FILE_COUNT = + NestedField.required(3, "DATA_FILE_COUNT", IntegerType.get()); + public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = + NestedField.required(4, "TOTAL_DATA_FILE_SIZE_IN_BYTES", LongType.get()); + public static final NestedField POSITION_DELETE_RECORD_COUNT = + NestedField.optional(5, "POSITION_DELETE_RECORD_COUNT", LongType.get()); + public static final NestedField POSITION_DELETE_FILE_COUNT = + NestedField.optional(6, "POSITION_DELETE_FILE_COUNT", IntegerType.get()); + public static final NestedField EQUALITY_DELETE_RECORD_COUNT = + NestedField.optional(7, "EQUALITY_DELETE_RECORD_COUNT", LongType.get()); + public static final NestedField EQUALITY_DELETE_FILE_COUNT = + NestedField.optional(8, "EQUALITY_DELETE_FILE_COUNT", IntegerType.get()); + public static final NestedField TOTAL_RECORD_COUNT = + NestedField.optional(9, "TOTAL_RECORD_COUNT", LongType.get()); + public static final NestedField LAST_UPDATED_AT = + NestedField.optional(10, "LAST_UPDATED_AT", LongType.get()); + public static final NestedField LAST_UPDATED_SNAPSHOT_ID = + NestedField.optional(11, "LAST_UPDATED_SNAPSHOT_ID", LongType.get()); /** * Generates the partition stats file schema based on a combined partition type which considers @@ -97,18 +96,18 @@ public int id() { public static Schema schema(StructType unifiedPartitionType) { Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); return new Schema( - NestedField.required(1, Column.PARTITION.name(), unifiedPartitionType), - NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()), - NestedField.required(3, Column.DATA_RECORD_COUNT.name(), LongType.get()), - NestedField.required(4, Column.DATA_FILE_COUNT.name(), IntegerType.get()), - NestedField.required(5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), LongType.get()), - NestedField.optional(6, Column.POSITION_DELETE_RECORD_COUNT.name(), LongType.get()), - NestedField.optional(7, Column.POSITION_DELETE_FILE_COUNT.name(), IntegerType.get()), - NestedField.optional(8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), LongType.get()), - NestedField.optional(9, Column.EQUALITY_DELETE_FILE_COUNT.name(), IntegerType.get()), - NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), LongType.get()), - NestedField.optional(11, Column.LAST_UPDATED_AT.name(), LongType.get()), - NestedField.optional(12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), LongType.get())); + NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + POSITION_DELETE_RECORD_COUNT, + POSITION_DELETE_FILE_COUNT, + EQUALITY_DELETE_RECORD_COUNT, + EQUALITY_DELETE_FILE_COUNT, + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID); } /** @@ -250,31 +249,30 @@ private static CloseableIterable dataReader(Schema schema, InputFile private static PartitionStats recordToPartitionStats(StructLike record) { PartitionStats stats = new PartitionStats( - record.get(Column.PARTITION.id(), StructLike.class), - record.get(Column.SPEC_ID.id(), Integer.class)); - stats.set(Column.DATA_RECORD_COUNT.id(), record.get(Column.DATA_RECORD_COUNT.id(), Long.class)); - stats.set(Column.DATA_FILE_COUNT.id(), record.get(Column.DATA_FILE_COUNT.id(), Integer.class)); - stats.set( - Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), - record.get(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), Long.class)); + record.get(PARTITION_FIELD_ID, StructLike.class), + record.get(SPEC_ID.fieldId(), Integer.class)); + stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class)); + stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class)); stats.set( - Column.POSITION_DELETE_RECORD_COUNT.id(), - record.get(Column.POSITION_DELETE_RECORD_COUNT.id(), Long.class)); + TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), + record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class)); stats.set( - Column.POSITION_DELETE_FILE_COUNT.id(), - record.get(Column.POSITION_DELETE_FILE_COUNT.id(), Integer.class)); + POSITION_DELETE_RECORD_COUNT.fieldId(), + record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class)); stats.set( - Column.EQUALITY_DELETE_RECORD_COUNT.id(), - record.get(Column.EQUALITY_DELETE_RECORD_COUNT.id(), Long.class)); + POSITION_DELETE_FILE_COUNT.fieldId(), + record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class)); stats.set( - Column.EQUALITY_DELETE_FILE_COUNT.id(), - record.get(Column.EQUALITY_DELETE_FILE_COUNT.id(), Integer.class)); + EQUALITY_DELETE_RECORD_COUNT.fieldId(), + record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class)); stats.set( - Column.TOTAL_RECORD_COUNT.id(), record.get(Column.TOTAL_RECORD_COUNT.id(), Long.class)); - stats.set(Column.LAST_UPDATED_AT.id(), record.get(Column.LAST_UPDATED_AT.id(), Long.class)); + EQUALITY_DELETE_FILE_COUNT.fieldId(), + record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class)); + stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class)); + stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class)); stats.set( - Column.LAST_UPDATED_SNAPSHOT_ID.id(), - record.get(Column.LAST_UPDATED_SNAPSHOT_ID.id(), Long.class)); + LAST_UPDATED_SNAPSHOT_ID.fieldId(), + record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class)); return stats; } } diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java index 1d84b8e229eb..90d3729b6bb2 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java @@ -18,7 +18,17 @@ */ package org.apache.iceberg.data; -import static org.apache.iceberg.data.PartitionStatsHandler.Column; +import static org.apache.iceberg.data.PartitionStatsHandler.DATA_FILE_COUNT; +import static org.apache.iceberg.data.PartitionStatsHandler.DATA_RECORD_COUNT; +import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT; +import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT; +import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_AT; +import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID; +import static org.apache.iceberg.data.PartitionStatsHandler.PARTITION_FIELD_ID; +import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT; +import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT; +import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES; +import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_RECORD_COUNT; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -185,7 +195,7 @@ public void testAllDatatypePartitionWriting() throws Exception { Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); PartitionData partitionData = - new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); partitionData.set(0, true); partitionData.set(1, 42); partitionData.set(2, 42L); @@ -204,9 +214,9 @@ public void testAllDatatypePartitionWriting() throws Exception { partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - partitionStats.set(Column.DATA_RECORD_COUNT.id(), RANDOM.nextLong()); - partitionStats.set(Column.DATA_FILE_COUNT.id(), RANDOM.nextInt()); - partitionStats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), 1024L * RANDOM.nextInt(20)); + partitionStats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong()); + partitionStats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt()); + partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L * RANDOM.nextInt(20)); List expected = Collections.singletonList(partitionStats); PartitionStatisticsFile statisticsFile = PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); @@ -243,21 +253,21 @@ public void testOptionalFieldsWriting() throws Exception { ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); for (int i = 0; i < 5; i++) { PartitionData partitionData = - new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); partitionData.set(0, RANDOM.nextInt()); PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - stats.set(Column.PARTITION.ordinal(), partitionData); - stats.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); - stats.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); - stats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); - stats.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), null); - stats.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), null); - stats.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), null); - stats.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), null); - stats.set(Column.TOTAL_RECORD_COUNT.ordinal(), null); - stats.set(Column.LAST_UPDATED_AT.ordinal(), null); - stats.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), null); + stats.set(PARTITION_FIELD_ID, partitionData); + stats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong()); + stats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt()); + stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L * RANDOM.nextInt(20)); + stats.set(POSITION_DELETE_RECORD_COUNT.fieldId(), null); + stats.set(POSITION_DELETE_FILE_COUNT.fieldId(), null); + stats.set(EQUALITY_DELETE_RECORD_COUNT.fieldId(), null); + stats.set(EQUALITY_DELETE_FILE_COUNT.fieldId(), null); + stats.set(TOTAL_RECORD_COUNT.fieldId(), null); + stats.set(LAST_UPDATED_AT.fieldId(), null); + stats.set(LAST_UPDATED_SNAPSHOT_ID.fieldId(), null); partitionListBuilder.add(stats); } @@ -338,7 +348,7 @@ public void testPartitionStats() throws Exception { Snapshot snapshot1 = testTable.currentSnapshot(); Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); Types.StructType partitionType = - recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); computeAndValidatePartitionStats( testTable, recordSchema, @@ -402,7 +412,7 @@ public void testPartitionStats() throws Exception { Snapshot snapshot3 = testTable.currentSnapshot(); recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); - partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); computeAndValidatePartitionStats( testTable, recordSchema, From 2810b85d1660b53f382c1a8af1a8c393cbcdb3a5 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 17 Mar 2025 16:13:25 +0530 Subject: [PATCH 2/2] smallercase names --- .../iceberg/data/PartitionStatsHandler.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java index 46409185f61b..d44cf6a4e435 100644 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -63,27 +63,27 @@ private PartitionStatsHandler() {} public static final int PARTITION_FIELD_ID = 0; public static final String PARTITION_FIELD_NAME = "partition"; - public static final NestedField SPEC_ID = NestedField.required(1, "SPEC_ID", IntegerType.get()); + public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get()); public static final NestedField DATA_RECORD_COUNT = - NestedField.required(2, "DATA_RECORD_COUNT", LongType.get()); + NestedField.required(2, "data_record_count", LongType.get()); public static final NestedField DATA_FILE_COUNT = - NestedField.required(3, "DATA_FILE_COUNT", IntegerType.get()); + NestedField.required(3, "data_file_count", IntegerType.get()); public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = - NestedField.required(4, "TOTAL_DATA_FILE_SIZE_IN_BYTES", LongType.get()); + NestedField.required(4, "total_data_file_size_in_bytes", LongType.get()); public static final NestedField POSITION_DELETE_RECORD_COUNT = - NestedField.optional(5, "POSITION_DELETE_RECORD_COUNT", LongType.get()); + NestedField.optional(5, "position_delete_record_count", LongType.get()); public static final NestedField POSITION_DELETE_FILE_COUNT = - NestedField.optional(6, "POSITION_DELETE_FILE_COUNT", IntegerType.get()); + NestedField.optional(6, "position_delete_file_count", IntegerType.get()); public static final NestedField EQUALITY_DELETE_RECORD_COUNT = - NestedField.optional(7, "EQUALITY_DELETE_RECORD_COUNT", LongType.get()); + NestedField.optional(7, "equality_delete_record_count", LongType.get()); public static final NestedField EQUALITY_DELETE_FILE_COUNT = - NestedField.optional(8, "EQUALITY_DELETE_FILE_COUNT", IntegerType.get()); + NestedField.optional(8, "equality_delete_file_count", IntegerType.get()); public static final NestedField TOTAL_RECORD_COUNT = - NestedField.optional(9, "TOTAL_RECORD_COUNT", LongType.get()); + NestedField.optional(9, "total_record_count", LongType.get()); public static final NestedField LAST_UPDATED_AT = - NestedField.optional(10, "LAST_UPDATED_AT", LongType.get()); + NestedField.optional(10, "last_updated_at", LongType.get()); public static final NestedField LAST_UPDATED_SNAPSHOT_ID = - NestedField.optional(11, "LAST_UPDATED_SNAPSHOT_ID", LongType.get()); + NestedField.optional(11, "last_updated_snapshot_id", LongType.get()); /** * Generates the partition stats file schema based on a combined partition type which considers