Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -48,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
Expand Down Expand Up @@ -326,11 +328,17 @@ public void deleteWhere(Filter[] filters) {
return;
}

icebergTable
.newDelete()
.set("spark.app.id", sparkSession().sparkContext().applicationId())
.deleteFromRowFilter(deleteExpr)
.commit();
DeleteFiles deleteFiles =
icebergTable
.newDelete()
.set("spark.app.id", sparkSession().sparkContext().applicationId())
.deleteFromRowFilter(deleteExpr);

if (!CommitMetadata.commitProperties().isEmpty()) {
CommitMetadata.commitProperties().forEach(deleteFiles::set);
}

deleteFiles.commit();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
Expand All @@ -40,26 +39,28 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestDataSourceOptions {
public class TestDataSourceOptions extends SparkTestBaseWithCatalog {

private static final Configuration CONF = new Configuration();
private static final Schema SCHEMA =
Expand Down Expand Up @@ -440,12 +441,46 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
Set<String> threadNames = Sets.newHashSet();
for (Snapshot snapshot : table.snapshots()) {
threadNames.add(snapshot.summary().get("writer-thread"));
}
Assert.assertEquals(2, threadNames.size());
Assert.assertTrue(threadNames.contains(null));
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));

List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
}

@Test
public void testExtraSnapshotMetadataWithDelete()
throws InterruptedException, NoSuchTableException {
spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
List<SimpleRecord> expectedRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
CommitMetadata.withCommitProperties(
properties,
() -> {
spark.sql("DELETE FROM " + tableName + " where id = 1");
return 0;
},
RuntimeException.class);
});
writerThread.setName("test-extra-commit-message-delete-thread");
writerThread.start();
writerThread.join();

Table table = validationCatalog.loadTable(tableIdent);
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
Expand Down Expand Up @@ -374,7 +375,9 @@ public void deleteWhere(Filter[] filters) {
if (branch != null) {
deleteFiles.toBranch(branch);
}

if (!CommitMetadata.commitProperties().isEmpty()) {
CommitMetadata.commitProperties().forEach(deleteFiles::set);
}
deleteFiles.commit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
Expand All @@ -40,26 +39,28 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestDataSourceOptions {
public class TestDataSourceOptions extends SparkTestBaseWithCatalog {

private static final Configuration CONF = new Configuration();
private static final Schema SCHEMA =
Expand Down Expand Up @@ -440,12 +441,46 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
Set<String> threadNames = Sets.newHashSet();
for (Snapshot snapshot : table.snapshots()) {
threadNames.add(snapshot.summary().get("writer-thread"));
}
Assert.assertEquals(2, threadNames.size());
Assert.assertTrue(threadNames.contains(null));
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));

List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
}

@Test
public void testExtraSnapshotMetadataWithDelete()
throws InterruptedException, NoSuchTableException {
spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
List<SimpleRecord> expectedRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
CommitMetadata.withCommitProperties(
properties,
() -> {
spark.sql("DELETE FROM " + tableName + " where id = 1");
return 0;
},
RuntimeException.class);
});
writerThread.setName("test-extra-commit-message-delete-thread");
writerThread.start();
writerThread.join();

Table table = validationCatalog.loadTable(tableIdent);
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
Expand Down Expand Up @@ -375,6 +376,10 @@ public void deleteWhere(Filter[] filters) {
deleteFiles.toBranch(branch);
}

if (!CommitMetadata.commitProperties().isEmpty()) {
CommitMetadata.commitProperties().forEach(deleteFiles::set);
}

deleteFiles.commit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
Expand All @@ -39,18 +38,20 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -59,7 +60,7 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestDataSourceOptions {
public class TestDataSourceOptions extends SparkTestBaseWithCatalog {

private static final Configuration CONF = new Configuration();
private static final Schema SCHEMA =
Expand Down Expand Up @@ -437,12 +438,46 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
Set<String> threadNames = Sets.newHashSet();
for (Snapshot snapshot : table.snapshots()) {
threadNames.add(snapshot.summary().get("writer-thread"));
}
Assert.assertEquals(2, threadNames.size());
Assert.assertTrue(threadNames.contains(null));
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));

List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
}

@Test
public void testExtraSnapshotMetadataWithDelete()
throws InterruptedException, NoSuchTableException {
spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
List<SimpleRecord> expectedRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
Thread writerThread =
new Thread(
() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
CommitMetadata.withCommitProperties(
properties,
() -> {
spark.sql("DELETE FROM " + tableName + " where id = 1");
return 0;
},
RuntimeException.class);
});
writerThread.setName("test-extra-commit-message-delete-thread");
writerThread.start();
writerThread.join();

Table table = validationCatalog.loadTable(tableIdent);
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
Assert.assertEquals(
"test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
}
}