From da26f80d9d8ed490b5dbc0dfcc206774731fe918 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 4 Mar 2025 11:18:07 +0530 Subject: [PATCH 1/2] Spark-3.5: Add spark action to compute partition stats --- .../iceberg/actions/ActionsProvider.java | 6 + .../actions/ComputePartitionStats.java | 40 +++ .../actions/BaseComputePartitionStats.java | 39 +++ .../ComputePartitionStatsSparkAction.java | 102 +++++++ .../iceberg/spark/actions/SparkActions.java | 6 + .../TestComputePartitionStatsAction.java | 286 ++++++++++++++++++ 6 files changed, 479 insertions(+) create mode 100644 api/src/main/java/org/apache/iceberg/actions/ComputePartitionStats.java create mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseComputePartitionStats.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputePartitionStatsSparkAction.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java index 61750d83fc79..240e34113721 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java +++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java @@ -77,6 +77,12 @@ default ComputeTableStats computeTableStats(Table table) { this.getClass().getName() + " does not implement computeTableStats"); } + /** Instantiates an action to compute partition stats. */ + default ComputePartitionStats computePartitionStats(Table table) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement computePartitionStats"); + } + /** Instantiates an action to rewrite all absolute paths in table metadata. */ default RewriteTablePath rewriteTablePath(Table table) { throw new UnsupportedOperationException( diff --git a/api/src/main/java/org/apache/iceberg/actions/ComputePartitionStats.java b/api/src/main/java/org/apache/iceberg/actions/ComputePartitionStats.java new file mode 100644 index 000000000000..6aa95196345e --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/actions/ComputePartitionStats.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import org.apache.iceberg.PartitionStatisticsFile; + +/** An action that computes and writes the partition statistics of an Iceberg table. */ +public interface ComputePartitionStats + extends Action { + /** + * Choose the table snapshot to compute partition stats, by default the current snapshot is used. + * + * @param snapshotId long ID of the snapshot for which stats need to be computed + * @return this for method chaining + */ + ComputePartitionStats snapshot(long snapshotId); + + /** The result of partition statistics collection. */ + interface Result { + + /** Returns statistics file or none if no statistics were collected. */ + PartitionStatisticsFile statisticsFile(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseComputePartitionStats.java b/core/src/main/java/org/apache/iceberg/actions/BaseComputePartitionStats.java new file mode 100644 index 000000000000..cd0dbacceaf6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/BaseComputePartitionStats.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import javax.annotation.Nullable; +import org.apache.iceberg.PartitionStatisticsFile; +import org.immutables.value.Value; + +@Value.Enclosing +@SuppressWarnings("ImmutablesStyle") +@Value.Style( + typeImmutableEnclosing = "ImmutableComputePartitionStats", + visibilityString = "PUBLIC", + builderVisibilityString = "PUBLIC") +interface BaseComputePartitionStats extends ComputePartitionStats { + + @Value.Immutable + interface Result extends ComputePartitionStats.Result { + @Override + @Nullable + PartitionStatisticsFile statisticsFile(); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputePartitionStatsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputePartitionStatsSparkAction.java new file mode 100644 index 000000000000..69fbf2ab63d6 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ComputePartitionStatsSparkAction.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStatsHandler; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ComputePartitionStats; +import org.apache.iceberg.actions.ImmutableComputePartitionStats; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Computes the stats incrementally after the snapshot that has partition stats file till the given + * snapshot (uses current snapshot if not specified) and writes the combined result into a {@link + * PartitionStatisticsFile} after merging the stats for a given snapshot. Does a full compute if + * previous statistics file does not exist. Also registers the {@link PartitionStatisticsFile} to + * table metadata. + */ +public class ComputePartitionStatsSparkAction + extends BaseSparkAction implements ComputePartitionStats { + + private static final Logger LOG = LoggerFactory.getLogger(ComputePartitionStatsSparkAction.class); + private static final Result EMPTY_RESULT = + ImmutableComputePartitionStats.Result.builder().build(); + + private final Table table; + private Snapshot snapshot; + + ComputePartitionStatsSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.snapshot = table.currentSnapshot(); + } + + @Override + protected ComputePartitionStatsSparkAction self() { + return this; + } + + @Override + public ComputePartitionStats snapshot(long newSnapshotId) { + Snapshot newSnapshot = table.snapshot(newSnapshotId); + Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId); + this.snapshot = newSnapshot; + return this; + } + + @Override + public Result execute() { + if (snapshot == null) { + LOG.info("No snapshot to compute partition stats for table {}", table.name()); + return EMPTY_RESULT; + } + + JobGroupInfo info = newJobGroupInfo("COMPUTE-PARTITION-STATS", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + LOG.info("Computing partition stats for {} (snapshot {})", table.name(), snapshot.snapshotId()); + PartitionStatisticsFile statisticsFile; + try { + statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table, snapshot.snapshotId()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (statisticsFile == null) { + return EMPTY_RESULT; + } + + table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); + return ImmutableComputePartitionStats.Result.builder().statisticsFile(statisticsFile).build(); + } + + private String jobDesc() { + return String.format( + "Computing partition stats for %s (snapshot=%s)", table.name(), snapshot.snapshotId()); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index aa4ef987e788..b7361c336a69 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -20,6 +20,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.ComputePartitionStats; import org.apache.iceberg.actions.ComputeTableStats; import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; import org.apache.iceberg.spark.Spark3Util; @@ -104,6 +105,11 @@ public ComputeTableStats computeTableStats(Table table) { return new ComputeTableStatsSparkAction(spark, table); } + @Override + public ComputePartitionStats computePartitionStats(Table table) { + return new ComputePartitionStatsSparkAction(spark, table); + } + @Override public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) { return new RemoveDanglingDeletesSparkAction(spark, table); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java new file mode 100644 index 000000000000..aec75fdd4287 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatsHandler; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.types.Types; +import org.assertj.core.groups.Tuple; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; + +public class TestComputePartitionStatsAction extends CatalogTestBase { + + @TestTemplate + public void testEmptyTable() { + createPartitionTable(); + Table table = validationCatalog.loadTable(tableIdent); + SparkActions actions = SparkActions.get(); + ComputePartitionStatsSparkAction.Result result = actions.computePartitionStats(table).execute(); + assertThat(result.statisticsFile()).isNull(); + } + + @TestTemplate + public void testEmptyBranch() { + createPartitionTable(); + Table table = validationCatalog.loadTable(tableIdent); + table.manageSnapshots().createBranch("b1").commit(); + SparkActions actions = SparkActions.get(); + ComputePartitionStatsSparkAction.Result result = + actions + .computePartitionStats(table) + .snapshot(table.refs().get("b1").snapshotId()) + .execute(); + assertThat(result.statisticsFile()).isNull(); + } + + @TestTemplate + public void testInvalidSnapshot() { + createPartitionTable(); + Table table = validationCatalog.loadTable(tableIdent); + SparkActions actions = SparkActions.get(); + assertThatThrownBy(() -> actions.computePartitionStats(table).snapshot(42L).execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Snapshot not found: 42"); + } + + @TestTemplate + public void testPartitionStatsCompute() throws IOException { + createPartitionTable(); + // foo, A -> 4 records, + // foo, B -> 2 records, + // bar, A -> 2 records, + // bar, B -> 1 record + sql( + "INSERT into %s values (0, 'foo', 'A'), (1, 'foo', 'A'), (2, 'foo', 'B'), (3, 'foo', 'B')", + tableName); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot1 = table.currentSnapshot(); + sql("INSERT into %s values(4, 'bar', 'A'), (5, 'bar', 'A'), (6, 'bar', 'B')", tableName); + table.refresh(); + Snapshot snapshot2 = table.currentSnapshot(); + sql("INSERT into %s values(7, 'foo', 'A'), (8, 'foo', 'A')", tableName); + // snapshot3 is unused for partition stats as the same partition is modified by snapshot4 + + // delete one record of foo, A + spark.sql("DELETE FROM " + tableName + " WHERE c1=1").show(200, false); + table.refresh(); + Snapshot snapshot4 = table.currentSnapshot(); + + assertThat(table.partitionStatisticsFiles()).isEmpty(); + + SparkActions actions = SparkActions.get(); + PartitionStatisticsFile statisticsFile = + actions.computePartitionStats(table).execute().statisticsFile(); + assertThat(statisticsFile.fileSizeInBytes()).isGreaterThan(0); + assertThat(statisticsFile.snapshotId()).isEqualTo(snapshot4.snapshotId()); + // check table metadata registration + assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile); + + Types.StructType partitionType = Partitioning.partitionType(table); + Schema dataSchema = PartitionStatsHandler.schema(partitionType); + validatePartitionStats( + statisticsFile, + dataSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 4L, // total 4 records for this partition + 2, + datafileSize("foo", "A"), + 1L, // position record from delete operation + 1, // position delete file from delete operation + 0L, + 0, + null, + snapshot4.timestampMillis(), // last modified by snapshot4 + snapshot4.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 2L, + 1, + datafileSize("foo", "B"), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), // added by snapshot1 + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 2L, + 1, + datafileSize("bar", "A"), + 0L, + 0, + 0L, + 0, + null, + snapshot2.timestampMillis(), // added by snapshot2 + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 1L, + 1, + datafileSize("bar", "B"), + 0L, + 0, + 0L, + 0, + null, + snapshot2.timestampMillis(), + snapshot2.snapshotId())); + } + + @TestTemplate + public void testPartitionStatsComputeOnSnapshot() throws IOException { + createPartitionTable(); + // foo, A -> 2 records, + // foo, B -> 1 record, + // bar, A -> 2 records, + sql("INSERT into %s values (0, 'foo', 'A'), (1, 'foo', 'A'), (2, 'foo', 'B')", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot1 = table.currentSnapshot(); + sql("INSERT into %s values(3, 'bar', 'A'), (4, 'bar', 'A')", tableName); + table.refresh(); + + assertThat(table.partitionStatisticsFiles()).isEmpty(); + + SparkActions actions = SparkActions.get(); + PartitionStatisticsFile statisticsFile = + actions + .computePartitionStats(table) + .snapshot(snapshot1.snapshotId()) + .execute() + .statisticsFile(); + assertThat(statisticsFile.fileSizeInBytes()).isGreaterThan(0); + // should be mapped to snapshot1 instead of latest snapshot + assertThat(statisticsFile.snapshotId()).isEqualTo(snapshot1.snapshotId()); + // check table metadata registration + assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile); + + Types.StructType partitionType = Partitioning.partitionType(table); + Schema dataSchema = PartitionStatsHandler.schema(partitionType); + // should contain stats for only partitions of snapshot1 (no entry for partition bar, A) + validatePartitionStats( + statisticsFile, + dataSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 2L, + 1, + datafileSize("foo", "A"), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 1L, + 1, + datafileSize("foo", "B"), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + } + + private long datafileSize(String col1, String col2) { + return (long) + sql( + "SELECT sum(file_size_in_bytes) FROM %s.data_files WHERE partition.c2 = '%s' AND partition.c3 = '%s'", + tableName, col1, col2) + .get(0)[0]; + } + + private void createPartitionTable() { + sql( + "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg PARTITIONED BY (c2, c3) TBLPROPERTIES('write.delete.mode'='merge-on-read')", + tableName); + } + + private static void validatePartitionStats( + PartitionStatisticsFile result, Schema recordSchema, Tuple... expectedValues) + throws IOException { + // read the partition entries from the stats file + List partitionStats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + recordSchema, Files.localInput(result.path()))) { + partitionStats = Lists.newArrayList(recordIterator); + } + + assertThat(partitionStats) + .extracting( + PartitionStats::partition, + PartitionStats::specId, + PartitionStats::dataRecordCount, + PartitionStats::dataFileCount, + PartitionStats::totalDataFileSizeInBytes, + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecords, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .containsExactlyInAnyOrder(expectedValues); + } + + private static StructLike partitionRecord( + Types.StructType partitionType, String val1, String val2) { + GenericRecord record = GenericRecord.create(partitionType); + record.set(0, val1); + record.set(1, val2); + return record; + } + + @AfterEach + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } +} From fc9859d8ed74ba605366cbd722aa236a09e225b9 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 4 Mar 2025 14:02:29 +0530 Subject: [PATCH 2/2] Spark-3.5: Add procedure to compute partition stats --- .../TestComputePartitionStatsProcedure.java | 121 ++++++++++++++++++ .../ComputePartitionStatsProcedure.java | 118 +++++++++++++++++ .../spark/procedures/SparkProcedures.java | 1 + 3 files changed, 240 insertions(+) create mode 100644 spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java new file mode 100644 index 000000000000..2387a4696493 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.util.List; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestComputePartitionStatsProcedure extends ExtensionsTestBase { + + @AfterEach + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testProcedureOnEmptyTable() { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + List result = + sql("CALL %s.system.compute_partition_stats('%s')", catalogName, tableIdent); + assertThat(result).isEmpty(); + } + + @TestTemplate + public void testProcedureWithPositionalArgs() throws NoSuchTableException, ParseException { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName); + List output = + sql("CALL %s.system.compute_partition_stats('%s')", catalogName, tableIdent); + assertThat(output.get(0)).isNotEmpty(); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + assertThat(table.partitionStatisticsFiles()).hasSize(1); + PartitionStatisticsFile statisticsFile = table.partitionStatisticsFiles().get(0); + assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString()); + assertThat(statisticsFile.snapshotId()).isEqualTo(table.currentSnapshot().snapshotId()); + assertThat(new File(statisticsFile.path().replace("file:", ""))).exists(); + } + + @TestTemplate + public void testProcedureWithNamedArgs() throws NoSuchTableException, ParseException { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName); + sql("ALTER TABLE %s CREATE BRANCH `b1`", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + long branchSnapshotId = table.refs().get("b1").snapshotId(); + sql("INSERT INTO TABLE %s VALUES (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h')", tableName); + + List output = + sql( + "CALL %s.system.compute_partition_stats(table => '%s', snapshot_id => %s)", + catalogName, tableIdent, branchSnapshotId); + table.refresh(); + assertThat(table.partitionStatisticsFiles()).hasSize(1); + PartitionStatisticsFile statisticsFile = table.partitionStatisticsFiles().get(0); + assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString()); + // should be from the branch's snapshot instead of latest snapshot of the table + assertThat(statisticsFile.snapshotId()).isEqualTo(branchSnapshotId); + assertThat(new File(statisticsFile.path().replace("file:", ""))).exists(); + } + + @TestTemplate + public void testProcedureWithInvalidSnapshotId() { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + assertThatThrownBy( + () -> + sql( + "CALL %s.system.compute_partition_stats(table => '%s', snapshot_id => 42)", + catalogName, tableIdent)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Snapshot not found: 42"); + } + + @TestTemplate + public void testProcedureWithInvalidTable() { + assertThatThrownBy( + () -> + sql( + "CALL %s.system.compute_partition_stats(table => '%s')", + catalogName, TableIdentifier.of(Namespace.of("default"), "abcd"))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Couldn't load table"); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java new file mode 100644 index 000000000000..f7c81cfa6465 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ComputePartitionStats; +import org.apache.iceberg.actions.ComputePartitionStats.Result; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A procedure that computes the stats incrementally after the snapshot that has partition stats + * file till the given snapshot (uses current snapshot if not specified) and writes the combined + * result into a {@link PartitionStatisticsFile} after merging the stats for a given snapshot. Does + * a full compute if previous statistics file does not exist. Also registers the {@link + * PartitionStatisticsFile} to table metadata. + * + * @see SparkActions#computePartitionStats(Table) + */ +public class ComputePartitionStatsProcedure extends BaseProcedure { + + private static final ProcedureParameter TABLE_PARAM = + ProcedureParameter.required("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + ProcedureParameter.optional("snapshot_id", DataTypes.LongType); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField( + "partition_statistics_file", DataTypes.StringType, true, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + protected ComputePartitionStatsProcedure doBuild() { + return new ComputePartitionStatsProcedure(tableCatalog()); + } + }; + } + + private ComputePartitionStatsProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null); + + return modifyIcebergTable( + tableIdent, + table -> { + ComputePartitionStats action = actions().computePartitionStats(table); + if (snapshotId != null) { + action.snapshot(snapshotId); + } + + return toOutputRows(action.execute()); + }); + } + + private InternalRow[] toOutputRows(Result result) { + PartitionStatisticsFile statisticsFile = result.statisticsFile(); + if (statisticsFile != null) { + InternalRow row = newInternalRow(UTF8String.fromString(statisticsFile.path())); + return new InternalRow[] {row}; + } else { + return new InternalRow[0]; + } + } + + @Override + public String description() { + return "ComputePartitionStatsProcedure"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 353970443025..82f44996c8e1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -62,6 +62,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder); mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder); mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder); + mapBuilder.put("compute_partition_stats", ComputePartitionStatsProcedure::builder); mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder); return mapBuilder.build(); }