From 6aa5948fe8c8ae88ec3bcad6b6d134909010ecee Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 4 May 2023 14:16:21 -0700 Subject: [PATCH 01/15] fix delete version tag --- .../v2/ExtendedDataSourceV2Strategy.scala | 2 + .../source/SparkPositionDeletesRewrite.java | 1 + .../spark/source/SparkPositionDeltaWrite.java | 2 + .../iceberg/spark/source/SparkTable.java | 11 ++-- .../spark/source/TestDataSourceOptions.java | 53 ++++++++++++++++--- 5 files changed, 57 insertions(+), 12 deletions(-) diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index ae582c958c47..9e8f3c0ed931 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -122,6 +122,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi } filter }.toArray + // scalastyle:off + println("DeleteFromIcebergTable") DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil case NoStatsUnaryNode(child) => diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 0aebb6bdb2fd..ba9d2122f2c0 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -135,6 +135,7 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { @Override public void commit(WriterCommitMessage[] messages) { PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + System.out.println("executing SparkPositionDeletesRewrite.commit"); coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index ce4b248e0f54..7b798639b599 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -157,6 +157,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { @Override public void commit(WriterCommitMessage[] messages) { + + System.out.println("executing SparkPositionDeltaWrite.commit"); RowDelta rowDelta = table.newRowDelta(); CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index d84528451348..8c9c17a93f21 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -52,11 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.Spark3Util; -import org.apache.iceberg.spark.SparkFilters; -import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.*; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; @@ -374,7 +370,10 @@ public void deleteWhere(Filter[] filters) { if (branch != null) { deleteFiles.toBranch(branch); } - + System.out.println("executing deleteWhere"); + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(deleteFiles::set); + } deleteFiles.commit(); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 9f4eab5bb961..ef9dfb023964 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -36,6 +36,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -44,14 +45,12 @@ import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -59,7 +58,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestDataSourceOptions { +public class TestDataSourceOptions extends SparkTestBaseWithCatalog { private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = @@ -448,4 +447,46 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Assert.assertTrue(threadNames.contains(null)); Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); } + + @Test + public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, IOException, NoSuchTableException { + spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List expectedRecords = + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.repartition(5, new Column("data")) + .select("id", "data").writeTo(tableName).append(); + spark.sql("SELECT * from " + tableName + ".files").show(); + System.out.println(spark.sql("EXPLAIN DELETE FROM " + tableName + " where id = 1").collectAsList().get(0).get(0)); + System.out.println("finished inserting"); + Thread writerThread = + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-delete-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + spark.sql("SELECT * from " + tableName).show(); + Table table = validationCatalog.loadTable(tableIdent); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + for (String t : threadNames) { + System.out.println(t); + } + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread")); + } } From e0f366c9ea3ebeffb20b86439e43ad7389f93d79 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 18 May 2023 09:54:13 -0700 Subject: [PATCH 02/15] add all tests --- gradle.properties | 1 + .../iceberg/spark/source/SparkTable.java | 29 +++++------- .../spark/source/TestDataSourceOptions.java | 45 +++++++++++++++--- .../iceberg/spark/source/SparkTable.java | 10 ++-- .../spark/source/TestDataSourceOptions.java | 46 ++++++++++++++++--- 5 files changed, 96 insertions(+), 35 deletions(-) diff --git a/gradle.properties b/gradle.properties index eb0da0ac8547..4b249cf3a4b4 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,6 +20,7 @@ systemProp.defaultFlinkVersions=1.17 systemProp.knownFlinkVersions=1.15,1.16,1.17 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 +systemProp.sparkVersions=3.2 systemProp.defaultSparkVersions=3.4 systemProp.knownSparkVersions=3.1,3.2,3.3,3.4 systemProp.defaultScalaVersion=2.12 diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index ff982fe96f5f..7e249fafeedf 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -24,18 +24,10 @@ import java.io.IOException; import java.util.Map; import java.util.Set; -import org.apache.iceberg.BaseMetadataTable; + +import org.apache.iceberg.*; import org.apache.iceberg.BaseTable; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionUtil; @@ -48,11 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.Spark3Util; -import org.apache.iceberg.spark.SparkFilters; -import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.*; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; @@ -326,11 +314,16 @@ public void deleteWhere(Filter[] filters) { return; } - icebergTable + DeleteFiles deleteFiles = icebergTable .newDelete() .set("spark.app.id", sparkSession().sparkContext().applicationId()) - .deleteFromRowFilter(deleteExpr) - .commit(); + .deleteFromRowFilter(deleteExpr); + + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(deleteFiles::set); + } + + deleteFiles.commit(); } @Override diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 60dd716c631e..04ffb29dc144 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -44,14 +44,12 @@ import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -59,7 +57,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestDataSourceOptions { +public class TestDataSourceOptions extends SparkTestBaseWithCatalog { private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = @@ -448,4 +446,39 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Assert.assertTrue(threadNames.contains(null)); Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); } + + @Test + public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, IOException, NoSuchTableException { + spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List expectedRecords = + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.repartition(5, new Column("data")) + .select("id", "data").writeTo(tableName).append(); + Thread writerThread = + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-delete-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + Table table = validationCatalog.loadTable(tableIdent); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread")); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index d84528451348..9c3214b1384d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -52,11 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.Spark3Util; -import org.apache.iceberg.spark.SparkFilters; -import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.*; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; @@ -375,6 +371,10 @@ public void deleteWhere(Filter[] filters) { deleteFiles.toBranch(branch); } + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(deleteFiles::set); + } + deleteFiles.commit(); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index a14e7b500e9c..47690d2693d4 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -43,14 +44,12 @@ import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Assert; @@ -59,7 +58,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestDataSourceOptions { +public class TestDataSourceOptions extends SparkTestBaseWithCatalog { private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = @@ -445,4 +444,39 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Assert.assertTrue(threadNames.contains(null)); Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); } + + @Test + public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, IOException, NoSuchTableException { + spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List expectedRecords = + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.repartition(5, new Column("data")) + .select("id", "data").writeTo(tableName).append(); + Thread writerThread = + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-delete-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + Table table = validationCatalog.loadTable(tableIdent); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread")); + } } From 2e8f2edc2aa52255f928e03d37b11d463ec65ca3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 18 May 2023 10:48:36 -0700 Subject: [PATCH 03/15] yo --- .../iceberg/spark/source/SparkTable.java | 29 ++++++++++---- .../spark/source/TestDataSourceOptions.java | 40 +++++++++++-------- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 7e249fafeedf..ec17bd0aac94 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -24,10 +24,19 @@ import java.io.IOException; import java.util.Map; import java.util.Set; - -import org.apache.iceberg.*; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionUtil; @@ -40,7 +49,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.*; +import org.apache.iceberg.spark.CommitMetadata; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkFilters; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; @@ -314,10 +328,11 @@ public void deleteWhere(Filter[] filters) { return; } - DeleteFiles deleteFiles = icebergTable - .newDelete() - .set("spark.app.id", sparkSession().sparkContext().applicationId()) - .deleteFromRowFilter(deleteExpr); + DeleteFiles deleteFiles = + icebergTable + .newDelete() + .set("spark.app.id", sparkSession().sparkContext().applicationId()) + .deleteFromRowFilter(deleteExpr); if (!CommitMetadata.commitProperties().isEmpty()) { CommitMetadata.commitProperties().forEach(deleteFiles::set); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 04ffb29dc144..e8e7d045f2a0 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -48,7 +48,12 @@ import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.AfterClass; import org.junit.Assert; @@ -448,27 +453,28 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx } @Test - public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, IOException, NoSuchTableException { + public void testExtraSnapshotMetadataWithDelete() + throws InterruptedException, IOException, NoSuchTableException { spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List expectedRecords = - Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); - originalDf.repartition(5, new Column("data")) - .select("id", "data").writeTo(tableName).append(); + originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append(); Thread writerThread = - new Thread( - () -> { - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - CommitMetadata.withCommitProperties( - properties, - () -> { - spark.sql("DELETE FROM " + tableName + " where id = 1"); - return 0; - }, - RuntimeException.class); - }); + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); writerThread.setName("test-extra-commit-message-delete-thread"); writerThread.start(); writerThread.join(); From 2c1b93f49e2c19b59a4896bc4ada3636bb7fa591 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 18 May 2023 11:02:54 -0700 Subject: [PATCH 04/15] add docs --- docs/spark-configuration.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md index 866dd765b320..ecac64831a80 100644 --- a/docs/spark-configuration.md +++ b/docs/spark-configuration.md @@ -184,6 +184,23 @@ df.write .insertInto("catalog.db.table") ``` +specifically, if you run SQL statements, you could use `org.apache.iceberg.spark.CommitMetadata` to add entries with custom-keys and corresponding values in the snapshot summary + +```java + +import org.apache.iceberg.spark.CommitMetadata; +Map properties = Maps.newHashMap(); +properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); +CommitMetadata.withCommitProperties(properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + +``` + + | Spark option | Default | Description | | ---------------------- | -------------------------- | ------------------------------------------------------------ | | write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | From 977381795e9c49fa4c2e8ce600cbb93e49180cf4 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 18 May 2023 13:41:02 -0700 Subject: [PATCH 05/15] remove position --- docs/spark-configuration.md | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md index ecac64831a80..c0e77a8b235b 100644 --- a/docs/spark-configuration.md +++ b/docs/spark-configuration.md @@ -184,21 +184,7 @@ df.write .insertInto("catalog.db.table") ``` -specifically, if you run SQL statements, you could use `org.apache.iceberg.spark.CommitMetadata` to add entries with custom-keys and corresponding values in the snapshot summary -```java - -import org.apache.iceberg.spark.CommitMetadata; -Map properties = Maps.newHashMap(); -properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); -CommitMetadata.withCommitProperties(properties, - () -> { - spark.sql("DELETE FROM " + tableName + " where id = 1"); - return 0; - }, - RuntimeException.class); - -``` | Spark option | Default | Description | @@ -211,3 +197,20 @@ CommitMetadata.withCommitProperties(properties, | check-ordering | true | Checks if input schema and table schema are same | | isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. | | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. | + + +specifically, if you run SQL statements, you could use `org.apache.iceberg.spark.CommitMetadata` to add entries with custom-keys and corresponding values in the snapshot summary + +```java + +import org.apache.iceberg.spark.CommitMetadata; +Map properties = Maps.newHashMap(); +properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); +CommitMetadata.withCommitProperties(properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + +``` \ No newline at end of file From 8b39e57b3d870b2cd24a016c5d4c2b6c5fe60f12 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 18 May 2023 13:53:05 -0700 Subject: [PATCH 06/15] 3.3 --- gradle.properties | 2 +- .../source/SparkPositionDeletesRewrite.java | 1 - .../spark/source/SparkPositionDeltaWrite.java | 1 - .../iceberg/spark/source/SparkTable.java | 8 +++- .../spark/source/TestDataSourceOptions.java | 48 +++++++++++-------- 5 files changed, 36 insertions(+), 24 deletions(-) diff --git a/gradle.properties b/gradle.properties index 4b249cf3a4b4..6bbaf62efd12 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,7 +20,7 @@ systemProp.defaultFlinkVersions=1.17 systemProp.knownFlinkVersions=1.15,1.16,1.17 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 -systemProp.sparkVersions=3.2 +systemProp.sparkVersions=3.3 systemProp.defaultSparkVersions=3.4 systemProp.knownSparkVersions=3.1,3.2,3.3,3.4 systemProp.defaultScalaVersion=2.12 diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index ba9d2122f2c0..0aebb6bdb2fd 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -135,7 +135,6 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { @Override public void commit(WriterCommitMessage[] messages) { PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); - System.out.println("executing SparkPositionDeletesRewrite.commit"); coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 7b798639b599..43635c948396 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -158,7 +158,6 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { @Override public void commit(WriterCommitMessage[] messages) { - System.out.println("executing SparkPositionDeltaWrite.commit"); RowDelta rowDelta = table.newRowDelta(); CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 8c9c17a93f21..8fb583123cbc 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -52,7 +52,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.*; +import org.apache.iceberg.spark.CommitMetadata; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkFilters; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; @@ -370,7 +375,6 @@ public void deleteWhere(Filter[] filters) { if (branch != null) { deleteFiles.toBranch(branch); } - System.out.println("executing deleteWhere"); if (!CommitMetadata.commitProperties().isEmpty()) { CommitMetadata.commitProperties().forEach(deleteFiles::set); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index ef9dfb023964..7fc1c04c5b53 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -36,7 +36,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -49,7 +48,12 @@ import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.AfterClass; import org.junit.Assert; @@ -449,30 +453,36 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx } @Test - public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, IOException, NoSuchTableException { + public void testExtraSnapshotMetadataWithDelete() + throws InterruptedException, IOException, NoSuchTableException { spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List expectedRecords = - Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); - originalDf.repartition(5, new Column("data")) - .select("id", "data").writeTo(tableName).append(); + originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append(); spark.sql("SELECT * from " + tableName + ".files").show(); - System.out.println(spark.sql("EXPLAIN DELETE FROM " + tableName + " where id = 1").collectAsList().get(0).get(0)); + System.out.println( + spark + .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1") + .collectAsList() + .get(0) + .get(0)); System.out.println("finished inserting"); Thread writerThread = - new Thread( - () -> { - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - CommitMetadata.withCommitProperties( - properties, - () -> { - spark.sql("DELETE FROM " + tableName + " where id = 1"); - return 0; - }, - RuntimeException.class); - }); + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); writerThread.setName("test-extra-commit-message-delete-thread"); writerThread.start(); writerThread.join(); From 84d6c91b2bc538a79ab0fc0521a4f909af76bf02 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 18 May 2023 14:04:34 -0700 Subject: [PATCH 07/15] 3.4 --- gradle.properties | 1 - .../iceberg/spark/source/SparkTable.java | 7 +++- .../spark/source/TestDataSourceOptions.java | 41 +++++++++++-------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/gradle.properties b/gradle.properties index 6bbaf62efd12..eb0da0ac8547 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,7 +20,6 @@ systemProp.defaultFlinkVersions=1.17 systemProp.knownFlinkVersions=1.15,1.16,1.17 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 -systemProp.sparkVersions=3.3 systemProp.defaultSparkVersions=3.4 systemProp.knownSparkVersions=3.1,3.2,3.3,3.4 systemProp.defaultScalaVersion=2.12 diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 9c3214b1384d..240a9df0a88c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -52,7 +52,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.*; +import org.apache.iceberg.spark.CommitMetadata; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkFilters; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 47690d2693d4..fc96554276b7 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -48,7 +47,12 @@ import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.assertj.core.api.Assertions; import org.junit.AfterClass; @@ -446,27 +450,28 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx } @Test - public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, IOException, NoSuchTableException { + public void testExtraSnapshotMetadataWithDelete() + throws InterruptedException, IOException, NoSuchTableException { spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List expectedRecords = - Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); - originalDf.repartition(5, new Column("data")) - .select("id", "data").writeTo(tableName).append(); + originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append(); Thread writerThread = - new Thread( - () -> { - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - CommitMetadata.withCommitProperties( - properties, - () -> { - spark.sql("DELETE FROM " + tableName + " where id = 1"); - return 0; - }, - RuntimeException.class); - }); + new Thread( + () -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties( + properties, + () -> { + spark.sql("DELETE FROM " + tableName + " where id = 1"); + return 0; + }, + RuntimeException.class); + }); writerThread.setName("test-extra-commit-message-delete-thread"); writerThread.start(); writerThread.join(); From a5d61b1917220b511b624c1bfefc179459d840be Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Thu, 18 May 2023 14:32:55 -0700 Subject: [PATCH 08/15] Update spark-configuration.md --- docs/spark-configuration.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md index c0e77a8b235b..1b9207c93787 100644 --- a/docs/spark-configuration.md +++ b/docs/spark-configuration.md @@ -184,9 +184,6 @@ df.write .insertInto("catalog.db.table") ``` - - - | Spark option | Default | Description | | ---------------------- | -------------------------- | ------------------------------------------------------------ | | write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | @@ -213,4 +210,4 @@ CommitMetadata.withCommitProperties(properties, }, RuntimeException.class); -``` \ No newline at end of file +``` From 518a0eeca22ddbd9d943df4772a1eb995eccfb5e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 24 May 2023 20:05:46 -0700 Subject: [PATCH 09/15] addr comments #1 --- .../spark/source/TestDataSourceOptions.java | 4 +++- .../v2/ExtendedDataSourceV2Strategy.scala | 2 -- .../spark/source/SparkPositionDeltaWrite.java | 1 - .../spark/source/TestDataSourceOptions.java | 16 +++------------- .../spark/source/TestDataSourceOptions.java | 4 +++- 5 files changed, 9 insertions(+), 18 deletions(-) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index e8e7d045f2a0..dc481619cb44 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -443,6 +443,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.setName("test-extra-commit-message-writer-thread"); writerThread.start(); writerThread.join(); + Set threadNames = Sets.newHashSet(); for (Snapshot snapshot : table.snapshots()) { threadNames.add(snapshot.summary().get("writer-thread")); @@ -454,7 +455,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx @Test public void testExtraSnapshotMetadataWithDelete() - throws InterruptedException, IOException, NoSuchTableException { + throws InterruptedException, NoSuchTableException { spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List expectedRecords = @@ -478,6 +479,7 @@ public void testExtraSnapshotMetadataWithDelete() writerThread.setName("test-extra-commit-message-delete-thread"); writerThread.start(); writerThread.join(); + Set threadNames = Sets.newHashSet(); Table table = validationCatalog.loadTable(tableIdent); for (Snapshot snapshot : table.snapshots()) { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 9e8f3c0ed931..ae582c958c47 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -122,8 +122,6 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi } filter }.toArray - // scalastyle:off - println("DeleteFromIcebergTable") DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil case NoStatsUnaryNode(child) => diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 43635c948396..ce4b248e0f54 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -157,7 +157,6 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { @Override public void commit(WriterCommitMessage[] messages) { - RowDelta rowDelta = table.newRowDelta(); CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 7fc1c04c5b53..db3bd819b3e2 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -443,6 +443,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.setName("test-extra-commit-message-writer-thread"); writerThread.start(); writerThread.join(); + Set threadNames = Sets.newHashSet(); for (Snapshot snapshot : table.snapshots()) { threadNames.add(snapshot.summary().get("writer-thread")); @@ -454,7 +455,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx @Test public void testExtraSnapshotMetadataWithDelete() - throws InterruptedException, IOException, NoSuchTableException { + throws InterruptedException, NoSuchTableException { spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List expectedRecords = @@ -462,14 +463,6 @@ public void testExtraSnapshotMetadataWithDelete() new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append(); - spark.sql("SELECT * from " + tableName + ".files").show(); - System.out.println( - spark - .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1") - .collectAsList() - .get(0) - .get(0)); - System.out.println("finished inserting"); Thread writerThread = new Thread( () -> { @@ -486,15 +479,12 @@ public void testExtraSnapshotMetadataWithDelete() writerThread.setName("test-extra-commit-message-delete-thread"); writerThread.start(); writerThread.join(); + Set threadNames = Sets.newHashSet(); - spark.sql("SELECT * from " + tableName).show(); Table table = validationCatalog.loadTable(tableIdent); for (Snapshot snapshot : table.snapshots()) { threadNames.add(snapshot.summary().get("writer-thread")); } - for (String t : threadNames) { - System.out.println(t); - } Assert.assertEquals(2, threadNames.size()); Assert.assertTrue(threadNames.contains(null)); Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread")); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index fc96554276b7..669caa92ab59 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -440,6 +440,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.setName("test-extra-commit-message-writer-thread"); writerThread.start(); writerThread.join(); + Set threadNames = Sets.newHashSet(); for (Snapshot snapshot : table.snapshots()) { threadNames.add(snapshot.summary().get("writer-thread")); @@ -451,7 +452,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx @Test public void testExtraSnapshotMetadataWithDelete() - throws InterruptedException, IOException, NoSuchTableException { + throws InterruptedException, NoSuchTableException { spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1"); sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List expectedRecords = @@ -475,6 +476,7 @@ public void testExtraSnapshotMetadataWithDelete() writerThread.setName("test-extra-commit-message-delete-thread"); writerThread.start(); writerThread.join(); + Set threadNames = Sets.newHashSet(); Table table = validationCatalog.loadTable(tableIdent); for (Snapshot snapshot : table.snapshots()) { From e86f18199a9e383027ed50e4c1bd51474f9da2c3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 24 May 2023 20:11:32 -0700 Subject: [PATCH 10/15] addr comments #2 --- .../spark/source/TestDataSourceOptions.java | 22 +++++++---------- .../spark/source/TestDataSourceOptions.java | 22 +++++++---------- .../spark/source/TestDataSourceOptions.java | 24 +++++++------------ 3 files changed, 25 insertions(+), 43 deletions(-) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index dc481619cb44..a2a1070a41ea 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -444,13 +444,10 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } @Test @@ -480,13 +477,10 @@ public void testExtraSnapshotMetadataWithDelete() writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); Table table = validationCatalog.loadTable(tableIdent); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread")); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index db3bd819b3e2..4929b13108ea 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -444,13 +444,10 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } @Test @@ -480,13 +477,10 @@ public void testExtraSnapshotMetadataWithDelete() writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); Table table = validationCatalog.loadTable(tableIdent); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread")); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 669caa92ab59..683468bb76b8 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -441,13 +441,10 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.start(); writerThread.join(); - Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } @Test @@ -476,14 +473,11 @@ public void testExtraSnapshotMetadataWithDelete() writerThread.setName("test-extra-commit-message-delete-thread"); writerThread.start(); writerThread.join(); - - Set threadNames = Sets.newHashSet(); + Table table = validationCatalog.loadTable(tableIdent); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - Assert.assertEquals(2, threadNames.size()); - Assert.assertTrue(threadNames.contains(null)); - Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread")); + List snapshots = Lists.newArrayList(table.snapshots()); + Assert.assertEquals(2, snapshots.size()); + Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); + Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } From 8c7f206efbd72acb7d1657ae0edfd340e4463df6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 24 May 2023 20:12:57 -0700 Subject: [PATCH 11/15] rm doc updates --- docs/spark-configuration.md | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md index 1b9207c93787..879411e8fffd 100644 --- a/docs/spark-configuration.md +++ b/docs/spark-configuration.md @@ -195,19 +195,3 @@ df.write | isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. | | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. | - -specifically, if you run SQL statements, you could use `org.apache.iceberg.spark.CommitMetadata` to add entries with custom-keys and corresponding values in the snapshot summary - -```java - -import org.apache.iceberg.spark.CommitMetadata; -Map properties = Maps.newHashMap(); -properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); -CommitMetadata.withCommitProperties(properties, - () -> { - spark.sql("DELETE FROM " + tableName + " where id = 1"); - return 0; - }, - RuntimeException.class); - -``` From 24937af1f124d55452e1692d12fb876a5f19cc14 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 24 May 2023 20:35:52 -0700 Subject: [PATCH 12/15] stylistic fixes --- .../iceberg/spark/source/TestDataSourceOptions.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 683468bb76b8..826ed43ceb06 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -24,7 +24,6 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -39,7 +38,6 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; @@ -444,7 +442,8 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx List snapshots = Lists.newArrayList(table.snapshots()); Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); - Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } @Test @@ -478,6 +477,7 @@ public void testExtraSnapshotMetadataWithDelete() List snapshots = Lists.newArrayList(table.snapshots()); Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); - Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } From 2fc5d9c22050bc1665bde4d90825867851fd6a3f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 24 May 2023 20:37:20 -0700 Subject: [PATCH 13/15] stylistic fixes --- .../iceberg/spark/source/TestDataSourceOptions.java | 8 ++++---- .../iceberg/spark/source/TestDataSourceOptions.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index a2a1070a41ea..e94ab094c3b0 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -24,7 +24,6 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -40,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; @@ -447,7 +445,8 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx List snapshots = Lists.newArrayList(table.snapshots()); Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); - Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } @Test @@ -481,6 +480,7 @@ public void testExtraSnapshotMetadataWithDelete() List snapshots = Lists.newArrayList(table.snapshots()); Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); - Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 4929b13108ea..9a1d4b5d1d22 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -24,7 +24,6 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -40,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; @@ -447,7 +445,8 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx List snapshots = Lists.newArrayList(table.snapshots()); Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); - Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } @Test @@ -481,6 +480,7 @@ public void testExtraSnapshotMetadataWithDelete() List snapshots = Lists.newArrayList(table.snapshots()); Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); - Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + Assert.assertEquals( + "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); } } From 5a33ad340eae6be2ed6670cdfa870aaf41e1d7f5 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 25 May 2023 08:51:30 -0700 Subject: [PATCH 14/15] fix the tests --- .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 2 +- .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 2 +- .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index e94ab094c3b0..06e4965a068a 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -446,7 +446,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); Assert.assertEquals( - "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread")); } @Test diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 9a1d4b5d1d22..342d8085b178 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -446,7 +446,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); Assert.assertEquals( - "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread")); } @Test diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 826ed43ceb06..5e819200f56c 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -443,7 +443,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Assert.assertEquals(2, snapshots.size()); Assert.assertNull(snapshots.get(0).summary().get("writer-thread")); Assert.assertEquals( - "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread")); + "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread")); } @Test From 6e32872914a8ee3c8372f31b53daea3c29642df9 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Thu, 25 May 2023 10:50:48 -0700 Subject: [PATCH 15/15] Update spark-configuration.md --- docs/spark-configuration.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md index 879411e8fffd..866dd765b320 100644 --- a/docs/spark-configuration.md +++ b/docs/spark-configuration.md @@ -194,4 +194,3 @@ df.write | check-ordering | true | Checks if input schema and table schema are same | | isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. | | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. | -