Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 53 additions & 55 deletions data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,29 @@ public class PartitionStatsHandler {

private PartitionStatsHandler() {}

public enum Column {
PARTITION(0),
SPEC_ID(1),
DATA_RECORD_COUNT(2),
DATA_FILE_COUNT(3),
TOTAL_DATA_FILE_SIZE_IN_BYTES(4),
POSITION_DELETE_RECORD_COUNT(5),
POSITION_DELETE_FILE_COUNT(6),
EQUALITY_DELETE_RECORD_COUNT(7),
EQUALITY_DELETE_FILE_COUNT(8),
TOTAL_RECORD_COUNT(9),
LAST_UPDATED_AT(10),
LAST_UPDATED_SNAPSHOT_ID(11);

private final int id;

Column(int id) {
this.id = id;
}

public int id() {
return id;
}
}
public static final int PARTITION_FIELD_ID = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this starting with 0? In the previous code the id was started with 1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionStats.set(pos, value) was based on enum ordinal before (starts from zero).

Now that we don't have ordinal, I have used field_id instead as field_id-1 may look odd.
I even thought of updating the PartitionStats.set(pos, value) to set from 1, but all of the places Structlike starts from 0. So, didn't went with that path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since it is an unreleased feature, modifying this won't cause compatibility issues.

public static final String PARTITION_FIELD_NAME = "partition";
public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get());
public static final NestedField DATA_RECORD_COUNT =
NestedField.required(2, "data_record_count", LongType.get());
public static final NestedField DATA_FILE_COUNT =
NestedField.required(3, "data_file_count", IntegerType.get());
public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
NestedField.required(4, "total_data_file_size_in_bytes", LongType.get());
public static final NestedField POSITION_DELETE_RECORD_COUNT =
NestedField.optional(5, "position_delete_record_count", LongType.get());
public static final NestedField POSITION_DELETE_FILE_COUNT =
NestedField.optional(6, "position_delete_file_count", IntegerType.get());
public static final NestedField EQUALITY_DELETE_RECORD_COUNT =
NestedField.optional(7, "equality_delete_record_count", LongType.get());
public static final NestedField EQUALITY_DELETE_FILE_COUNT =
NestedField.optional(8, "equality_delete_file_count", IntegerType.get());
public static final NestedField TOTAL_RECORD_COUNT =
NestedField.optional(9, "total_record_count", LongType.get());
public static final NestedField LAST_UPDATED_AT =
NestedField.optional(10, "last_updated_at", LongType.get());
public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
NestedField.optional(11, "last_updated_snapshot_id", LongType.get());

/**
* Generates the partition stats file schema based on a combined partition type which considers
Expand All @@ -97,18 +96,18 @@ public int id() {
public static Schema schema(StructType unifiedPartitionType) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
return new Schema(
NestedField.required(1, Column.PARTITION.name(), unifiedPartitionType),
NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()),
NestedField.required(3, Column.DATA_RECORD_COUNT.name(), LongType.get()),
NestedField.required(4, Column.DATA_FILE_COUNT.name(), IntegerType.get()),
NestedField.required(5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), LongType.get()),
NestedField.optional(6, Column.POSITION_DELETE_RECORD_COUNT.name(), LongType.get()),
NestedField.optional(7, Column.POSITION_DELETE_FILE_COUNT.name(), IntegerType.get()),
NestedField.optional(8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), LongType.get()),
NestedField.optional(9, Column.EQUALITY_DELETE_FILE_COUNT.name(), IntegerType.get()),
NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), LongType.get()),
NestedField.optional(11, Column.LAST_UPDATED_AT.name(), LongType.get()),
NestedField.optional(12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), LongType.get()));
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
POSITION_DELETE_RECORD_COUNT,
POSITION_DELETE_FILE_COUNT,
EQUALITY_DELETE_RECORD_COUNT,
EQUALITY_DELETE_FILE_COUNT,
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID);
}

/**
Expand Down Expand Up @@ -250,31 +249,30 @@ private static CloseableIterable<StructLike> dataReader(Schema schema, InputFile
private static PartitionStats recordToPartitionStats(StructLike record) {
PartitionStats stats =
new PartitionStats(
record.get(Column.PARTITION.id(), StructLike.class),
record.get(Column.SPEC_ID.id(), Integer.class));
stats.set(Column.DATA_RECORD_COUNT.id(), record.get(Column.DATA_RECORD_COUNT.id(), Long.class));
stats.set(Column.DATA_FILE_COUNT.id(), record.get(Column.DATA_FILE_COUNT.id(), Integer.class));
stats.set(
Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(),
record.get(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), Long.class));
record.get(PARTITION_FIELD_ID, StructLike.class),
record.get(SPEC_ID.fieldId(), Integer.class));
stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class));
stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class));
stats.set(
Column.POSITION_DELETE_RECORD_COUNT.id(),
record.get(Column.POSITION_DELETE_RECORD_COUNT.id(), Long.class));
TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(),
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class));
stats.set(
Column.POSITION_DELETE_FILE_COUNT.id(),
record.get(Column.POSITION_DELETE_FILE_COUNT.id(), Integer.class));
POSITION_DELETE_RECORD_COUNT.fieldId(),
record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class));
stats.set(
Column.EQUALITY_DELETE_RECORD_COUNT.id(),
record.get(Column.EQUALITY_DELETE_RECORD_COUNT.id(), Long.class));
POSITION_DELETE_FILE_COUNT.fieldId(),
record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class));
stats.set(
Column.EQUALITY_DELETE_FILE_COUNT.id(),
record.get(Column.EQUALITY_DELETE_FILE_COUNT.id(), Integer.class));
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class));
stats.set(
Column.TOTAL_RECORD_COUNT.id(), record.get(Column.TOTAL_RECORD_COUNT.id(), Long.class));
stats.set(Column.LAST_UPDATED_AT.id(), record.get(Column.LAST_UPDATED_AT.id(), Long.class));
EQUALITY_DELETE_FILE_COUNT.fieldId(),
record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class));
stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class));
stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class));
stats.set(
Column.LAST_UPDATED_SNAPSHOT_ID.id(),
record.get(Column.LAST_UPDATED_SNAPSHOT_ID.id(), Long.class));
LAST_UPDATED_SNAPSHOT_ID.fieldId(),
record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class));
return stats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@
*/
package org.apache.iceberg.data;

import static org.apache.iceberg.data.PartitionStatsHandler.Column;
import static org.apache.iceberg.data.PartitionStatsHandler.DATA_FILE_COUNT;
import static org.apache.iceberg.data.PartitionStatsHandler.DATA_RECORD_COUNT;
import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT;
import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT;
import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_AT;
import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID;
import static org.apache.iceberg.data.PartitionStatsHandler.PARTITION_FIELD_ID;
import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT;
import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT;
import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES;
import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_RECORD_COUNT;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -185,7 +195,7 @@ public void testAllDatatypePartitionWriting() throws Exception {
Schema dataSchema = PartitionStatsHandler.schema(partitionSchema);

PartitionData partitionData =
new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType());
new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
partitionData.set(0, true);
partitionData.set(1, 42);
partitionData.set(2, 42L);
Expand All @@ -204,9 +214,9 @@ public void testAllDatatypePartitionWriting() throws Exception {
partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value());

PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10));
partitionStats.set(Column.DATA_RECORD_COUNT.id(), RANDOM.nextLong());
partitionStats.set(Column.DATA_FILE_COUNT.id(), RANDOM.nextInt());
partitionStats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), 1024L * RANDOM.nextInt(20));
partitionStats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong());
partitionStats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt());
partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L * RANDOM.nextInt(20));
List<PartitionStats> expected = Collections.singletonList(partitionStats);
PartitionStatisticsFile statisticsFile =
PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected);
Expand Down Expand Up @@ -243,21 +253,21 @@ public void testOptionalFieldsWriting() throws Exception {
ImmutableList.Builder<PartitionStats> partitionListBuilder = ImmutableList.builder();
for (int i = 0; i < 5; i++) {
PartitionData partitionData =
new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType());
new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
partitionData.set(0, RANDOM.nextInt());

PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10));
stats.set(Column.PARTITION.ordinal(), partitionData);
stats.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong());
stats.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt());
stats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20));
stats.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), null);
stats.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), null);
stats.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), null);
stats.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), null);
stats.set(Column.TOTAL_RECORD_COUNT.ordinal(), null);
stats.set(Column.LAST_UPDATED_AT.ordinal(), null);
stats.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), null);
stats.set(PARTITION_FIELD_ID, partitionData);
stats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong());
stats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt());
stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L * RANDOM.nextInt(20));
stats.set(POSITION_DELETE_RECORD_COUNT.fieldId(), null);
stats.set(POSITION_DELETE_FILE_COUNT.fieldId(), null);
stats.set(EQUALITY_DELETE_RECORD_COUNT.fieldId(), null);
stats.set(EQUALITY_DELETE_FILE_COUNT.fieldId(), null);
stats.set(TOTAL_RECORD_COUNT.fieldId(), null);
stats.set(LAST_UPDATED_AT.fieldId(), null);
stats.set(LAST_UPDATED_SNAPSHOT_ID.fieldId(), null);

partitionListBuilder.add(stats);
}
Expand Down Expand Up @@ -338,7 +348,7 @@ public void testPartitionStats() throws Exception {
Snapshot snapshot1 = testTable.currentSnapshot();
Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable));
Types.StructType partitionType =
recordSchema.findField(Column.PARTITION.name()).type().asStructType();
recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
computeAndValidatePartitionStats(
testTable,
recordSchema,
Expand Down Expand Up @@ -402,7 +412,7 @@ public void testPartitionStats() throws Exception {
Snapshot snapshot3 = testTable.currentSnapshot();

recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable));
partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType();
partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
computeAndValidatePartitionStats(
testTable,
recordSchema,
Expand Down