Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;

Expand All @@ -43,9 +45,11 @@
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -68,6 +72,7 @@
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTableWithFilters;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.RowLevelWrite;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -103,6 +108,68 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS parquet_table");
}

@Test
public void testSkewDelete() throws Exception {
Copy link
Contributor Author

@aokolnychyi aokolnychyi May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for CoW row-level operations already cover SparkWrite, which is used in normal writes. There is not much logic on Iceberg side, the rest is covered by Spark tests.

createAndInitPartitionedTable();

Employee[] employees = new Employee[100];
for (int index = 0; index < 100; index++) {
employees[index] = new Employee(index, "hr");
}
append(tableName, employees);
append(tableName, employees);
append(tableName, employees);
append(tableName, employees);

// set the open file cost large enough to produce a separate scan task per file
// use hash distribution to trigger a shuffle
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
DELETE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
() -> {
SparkPlan plan =
executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
});

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);

if (mode(table) == COPY_ON_WRITE) {
// CoW DELETE requests the remaining records to be clustered by `_file`
// each task contains only 1 file and therefore writes only 1 shuffle block
// that means 4 shuffle blocks are distributed among 2 reducers
// AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
// otherwise, there would be 2 tasks processing 2 shuffle blocks each
Comment on lines +157 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] should we also add a UT where coalese is happening ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to do so in a separate PR. This change focuses on skew.

validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
} else {
// MoR DELETE requests the deleted records to be clustered by `_spec_id` and `_partition`
// all tasks belong to the same partition and therefore write only 1 shuffle block per task
// that means there are 4 shuffle blocks, all assigned to the same reducer
// AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
// otherwise, there would be 1 task processing 4 shuffle blocks
validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
}

Assert.assertEquals(
"Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget()));
}

@Test
public void testDeleteWithoutScanningTable() throws Exception {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.spark.sql.functions.lit;
Expand Down Expand Up @@ -95,6 +97,82 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS source");
}

@Test
public void testSkewMerge() {
createAndInitTable("id INT, salary INT, dep STRING");
sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);

String[] records = new String[100];
for (int index = 0; index < 100; index++) {
records[index] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", index);
}
append(tableName, records);
append(tableName, records);
append(tableName, records);
append(tableName, records);

// set the open file cost large enough to produce a separate scan task per file
// use hash distribution to trigger a shuffle
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
MERGE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

spark.range(0, 100).createOrReplaceTempView("source");

// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
// set the min coalesce partition size small enough to avoid coalescing
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "4",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
() -> {
SparkPlan plan =
executeAndKeepPlan(
"MERGE INTO %s t USING source "
+ "ON t.id = source.id "
+ "WHEN MATCHED THEN "
+ " UPDATE SET salary = -1 ",
commitTarget());
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
});

Table table = validationCatalog.loadTable(tableIdent);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);

if (mode(table) == COPY_ON_WRITE) {
// CoW MERGE would perform a join on `id` and then cluster records by `dep`
// the first shuffle distributes records into 4 shuffle partitions so that rows can be merged
// after existing and new rows are merged, the data is clustered by `dep`
// each task with merged data contains records for the same table partition
// that means there are 4 shuffle blocks, all assigned to the same reducer
// AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
// otherwise, there would be 1 task processing all 4 shuffle blocks
validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
} else {
// MoR MERGE would perform a join on `id` and then cluster data based on the partition
// all tasks belong to the same partition and therefore write only 1 shuffle block per task
// that means there are 4 shuffle blocks, all assigned to the same reducer
// AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
// otherwise, there would be 1 task processing 4 shuffle blocks
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
}

Assert.assertEquals(
"Row count must match",
400L,
scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget()));
}

@Test
public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() {
createAndInitTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
Expand All @@ -44,6 +46,7 @@
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
Expand All @@ -64,6 +67,7 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
Expand Down Expand Up @@ -98,6 +102,71 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS deleted_employee");
}

@Test
public void testSkewUpdate() {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);

String[] records = new String[100];
for (int index = 0; index < 100; index++) {
records[index] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", index);
}
append(tableName, records);
append(tableName, records);
append(tableName, records);
append(tableName, records);

// set the open file cost large enough to produce a separate scan task per file
// use hash distribution to trigger a shuffle
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
UPDATE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
() -> {
SparkPlan plan =
executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
});

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);

if (mode(table) == COPY_ON_WRITE) {
// CoW UPDATE requests the updated records to be clustered by `_file`
// each task contains only 1 file and therefore writes only 1 shuffle block
// that means 4 shuffle blocks are distributed among 2 reducers
// AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
// otherwise, there would be 2 tasks processing 2 shuffle blocks each
validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
} else {
// MoR UPDATE requests the deleted records to be clustered by `_spec_id` and `_partition`
// all tasks belong to the same partition and therefore write only 1 shuffle block per task
// that means there are 4 shuffle blocks, all assigned to the same reducer
// AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
// otherwise, there would be 1 task processing 4 shuffle blocks
validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
}

Assert.assertEquals(
"Row count must match",
200L,
scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget()));
}

@Test
public void testExplain() {
createAndInitTable("id INT, dep STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public Distribution requiredDistribution() {
return requiredDistribution;
}

@Override
public boolean distributionStrictlyRequired() {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED is true as well before disabling this requirement ? otherwise it will be a no-op for OptimizeSkewInRebalancePartitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a good reason to return true from this method? We don't require distributions to be strict and it is up to Spark to either handle the skew or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I was mostly comming from, the point that we are overriding this and setting it to false, in a hope that spark will optimize the skew whereas if the above conf is disabled spark will never do the same. I am fine with keeping it as it is.

Copy link
Contributor Author

@aokolnychyi aokolnychyi May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it is better to always return false and leave it up to Spark. It seems the safest way as Spark may add new configs or logic on when to do that in the future.

}

@Override
public SortOrder[] requiredOrdering() {
return requiredOrdering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public Distribution requiredDistribution() {
return requiredDistribution;
}

@Override
public boolean distributionStrictlyRequired() {
return false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may actually need to move it to SparkWriteBuilder as SparkWrite is used for compaction. We explicitly disable table distribution/ordering and AQE in shuffling rewriters but not in bin-pack when the output spec mismatches.

Thoughts, @RussellSpitzer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say I don't really mind our Compaction solution atm. I think disabling AQE is our best bet there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's stick to that then, I agree.

}

@Override
public SortOrder[] requiredOrdering() {
return requiredOrdering;
Expand Down