diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index ff982fe96f5f..ec17bd0aac94 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -27,6 +27,7 @@ import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; @@ -48,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; @@ -326,11 +328,17 @@ public void deleteWhere(Filter[] filters) { return; } - icebergTable - .newDelete() - .set("spark.app.id", sparkSession().sparkContext().applicationId()) - .deleteFromRowFilter(deleteExpr) - .commit(); + DeleteFiles deleteFiles = + icebergTable + .newDelete() + .set("spark.app.id", sparkSession().sparkContext().applicationId()) + .deleteFromRowFilter(deleteExpr); + + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(deleteFiles::set); + } + + deleteFiles.commit(); } @Override diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 60dd716c631e..06e4965a068a 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -24,7 +24,6 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -40,18 +39,20 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -59,7 +60,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestDataSourceOptions { +public class TestDataSourceOptions extends SparkTestBaseWithCatalog { private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = @@ -440,12 +441,46 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.setName("test-extra-commit-message-writer-thread"); writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread")); + } + + @Test + public void testExtraSnapshotMetadataWithDelete() + throws InterruptedException, NoSuchTableException { + spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List expectedRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append(); + Thread writerThread = + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-delete-thread"); + writerThread.start(); + writerThread.join(); + + Table table = validationCatalog.loadTable(tableIdent); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index d84528451348..8fb583123cbc 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; @@ -374,7 +375,9 @@ public void deleteWhere(Filter[] filters) { if (branch != null) { deleteFiles.toBranch(branch); } - + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(deleteFiles::set); + } deleteFiles.commit(); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 9f4eab5bb961..342d8085b178 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -24,7 +24,6 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -40,18 +39,20 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -59,7 +60,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestDataSourceOptions { +public class TestDataSourceOptions extends SparkTestBaseWithCatalog { private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = @@ -440,12 +441,46 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.setName("test-extra-commit-message-writer-thread"); writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread")); + } + + @Test + public void testExtraSnapshotMetadataWithDelete() + throws InterruptedException, NoSuchTableException { + spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List expectedRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append(); + Thread writerThread = + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-delete-thread"); + writerThread.start(); + writerThread.join(); + + Table table = validationCatalog.loadTable(tableIdent); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index d84528451348..240a9df0a88c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; @@ -375,6 +376,10 @@ public void deleteWhere(Filter[] filters) { deleteFiles.toBranch(branch); } + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(deleteFiles::set); + } + deleteFiles.commit(); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index a14e7b500e9c..5e819200f56c 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -24,7 +24,6 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -39,18 +38,20 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Assert; @@ -59,7 +60,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestDataSourceOptions { +public class TestDataSourceOptions extends SparkTestBaseWithCatalog { private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = @@ -437,12 +438,46 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.setName("test-extra-commit-message-writer-thread"); writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread")); + } + + @Test + public void testExtraSnapshotMetadataWithDelete() + throws InterruptedException, NoSuchTableException { + spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List expectedRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append(); + Thread writerThread = + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-delete-thread"); + writerThread.start(); + writerThread.join(); + + Table table = validationCatalog.loadTable(tableIdent); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } }