diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 6fa8d7f96562..336d40cca0e3 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -19,10 +19,12 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; +import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_MODE; import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.spark.sql.functions.lit; @@ -43,9 +45,11 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; @@ -68,6 +72,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTableWithFilters; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.catalyst.plans.logical.RowLevelWrite; +import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; @@ -103,6 +108,68 @@ public void removeTables() { sql("DROP TABLE IF EXISTS parquet_table"); } + @Test + public void testSkewDelete() throws Exception { + createAndInitPartitionedTable(); + + Employee[] employees = new Employee[100]; + for (int index = 0; index < 100; index++) { + employees[index] = new Employee(index, "hr"); + } + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + + // set the open file cost large enough to produce a separate scan task per file + // use hash distribution to trigger a shuffle + Map tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + DELETE_DISTRIBUTION_MODE, + DistributionMode.HASH.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + // enable AQE and set the advisory partition size small enough to trigger a split + // set the number of shuffle partitions to 2 to only have 2 reducers + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "2", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + () -> { + SparkPlan plan = + executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + + if (mode(table) == COPY_ON_WRITE) { + // CoW DELETE requests the remaining records to be clustered by `_file` + // each task contains only 1 file and therefore writes only 1 shuffle block + // that means 4 shuffle blocks are distributed among 2 reducers + // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks + // otherwise, there would be 2 tasks processing 2 shuffle blocks each + validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4"); + } else { + // MoR DELETE requests the deleted records to be clustered by `_spec_id` and `_partition` + // all tasks belong to the same partition and therefore write only 1 shuffle block per task + // that means there are 4 shuffle blocks, all assigned to the same reducer + // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks + // otherwise, there would be 1 task processing 4 shuffle blocks + validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4"); + } + + Assert.assertEquals( + "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget())); + } + @Test public void testDeleteWithoutScanningTable() throws Exception { createAndInitPartitionedTable(); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 4ec78ec38532..18d42ca6ae85 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -19,10 +19,12 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; +import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.MERGE_MODE; import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.spark.sql.functions.lit; @@ -95,6 +97,82 @@ public void removeTables() { sql("DROP TABLE IF EXISTS source"); } + @Test + public void testSkewMerge() { + createAndInitTable("id INT, salary INT, dep STRING"); + sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); + + String[] records = new String[100]; + for (int index = 0; index < 100; index++) { + records[index] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", index); + } + append(tableName, records); + append(tableName, records); + append(tableName, records); + append(tableName, records); + + // set the open file cost large enough to produce a separate scan task per file + // use hash distribution to trigger a shuffle + Map tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + MERGE_DISTRIBUTION_MODE, + DistributionMode.HASH.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + spark.range(0, 100).createOrReplaceTempView("source"); + + // enable AQE and set the advisory partition size small enough to trigger a split + // set the number of shuffle partitions to 2 to only have 2 reducers + // set the min coalesce partition size small enough to avoid coalescing + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "4", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + () -> { + SparkPlan plan = + executeAndKeepPlan( + "MERGE INTO %s t USING source " + + "ON t.id = source.id " + + "WHEN MATCHED THEN " + + " UPDATE SET salary = -1 ", + commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + + if (mode(table) == COPY_ON_WRITE) { + // CoW MERGE would perform a join on `id` and then cluster records by `dep` + // the first shuffle distributes records into 4 shuffle partitions so that rows can be merged + // after existing and new rows are merged, the data is clustered by `dep` + // each task with merged data contains records for the same table partition + // that means there are 4 shuffle blocks, all assigned to the same reducer + // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks + // otherwise, there would be 1 task processing all 4 shuffle blocks + validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "4"); + } else { + // MoR MERGE would perform a join on `id` and then cluster data based on the partition + // all tasks belong to the same partition and therefore write only 1 shuffle block per task + // that means there are 4 shuffle blocks, all assigned to the same reducer + // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks + // otherwise, there would be 1 task processing 4 shuffle blocks + validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4"); + } + + Assert.assertEquals( + "Row count must match", + 400L, + scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget())); + } + @Test public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() { createAndInitTable( diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 776fbb960055..ccfd83c73303 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -24,7 +24,9 @@ import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP; import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.UPDATE_MODE; import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT; @@ -44,6 +46,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; @@ -64,6 +67,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; import org.junit.After; @@ -98,6 +102,71 @@ public void removeTables() { sql("DROP TABLE IF EXISTS deleted_employee"); } + @Test + public void testSkewUpdate() { + createAndInitTable("id INT, dep STRING"); + sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); + + String[] records = new String[100]; + for (int index = 0; index < 100; index++) { + records[index] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", index); + } + append(tableName, records); + append(tableName, records); + append(tableName, records); + append(tableName, records); + + // set the open file cost large enough to produce a separate scan task per file + // use hash distribution to trigger a shuffle + Map tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + UPDATE_DISTRIBUTION_MODE, + DistributionMode.HASH.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + // enable AQE and set the advisory partition size small enough to trigger a split + // set the number of shuffle partitions to 2 to only have 2 reducers + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "2", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + () -> { + SparkPlan plan = + executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + + if (mode(table) == COPY_ON_WRITE) { + // CoW UPDATE requests the updated records to be clustered by `_file` + // each task contains only 1 file and therefore writes only 1 shuffle block + // that means 4 shuffle blocks are distributed among 2 reducers + // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks + // otherwise, there would be 2 tasks processing 2 shuffle blocks each + validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4"); + } else { + // MoR UPDATE requests the deleted records to be clustered by `_spec_id` and `_partition` + // all tasks belong to the same partition and therefore write only 1 shuffle block per task + // that means there are 4 shuffle blocks, all assigned to the same reducer + // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks + // otherwise, there would be 1 task processing 4 shuffle blocks + validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4"); + } + + Assert.assertEquals( + "Row count must match", + 200L, + scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget())); + } + @Test public void testExplain() { createAndInitTable("id INT, dep STRING"); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 74d46339eed3..416eb6a9eebf 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -135,6 +135,11 @@ public Distribution requiredDistribution() { return requiredDistribution; } + @Override + public boolean distributionStrictlyRequired() { + return false; + } + @Override public SortOrder[] requiredOrdering() { return requiredOrdering; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index a080fcead13b..9e3a15e7387b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -137,6 +137,11 @@ public Distribution requiredDistribution() { return requiredDistribution; } + @Override + public boolean distributionStrictlyRequired() { + return false; + } + @Override public SortOrder[] requiredOrdering() { return requiredOrdering;