From d0e892696a47334d61fbd8dffe9445625782a0ef Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Mon, 14 Aug 2023 14:51:31 +0800 Subject: [PATCH] Port #8072 to Spark 3.1/3.2/3.3 --- .../v2/CreateOrReplaceBranchExec.scala | 17 +++++-- .../spark/extensions/TestBranchDDL.java | 48 +++++++++++++++++-- .../v2/CreateOrReplaceBranchExec.scala | 17 +++++-- .../spark/extensions/TestBranchDDL.java | 48 +++++++++++++++++-- .../v2/CreateOrReplaceBranchExec.scala | 17 +++++-- .../spark/extensions/TestBranchDDL.java | 48 +++++++++++++++++-- 6 files changed, 165 insertions(+), 30 deletions(-) diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index 651b5c62e159..2ca586838c95 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec( .map(java.lang.Long.valueOf) .orNull - Preconditions.checkArgument(snapshotId != null, - "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) - val manageSnapshots = iceberg.table().manageSnapshots() val refExists = null != iceberg.table().refs().get(branch) + def safeCreateBranch(): Unit = { + if (snapshotId == null) { + manageSnapshots.createBranch(branch) + } else { + manageSnapshots.createBranch(branch, snapshotId) + } + } + if (create && replace && !refExists) { - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } else if (replace) { + Preconditions.checkArgument(snapshotId != null, + "Cannot complete replace branch operation on %s, main has no snapshot", ident) manageSnapshots.replaceBranch(branch, snapshotId) } else { if (refExists && ifNotExists) { return Nil } - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } if (branchOptions.numSnapshots.nonEmpty) { diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index f23dc58af60b..914ee403297c 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -100,11 +101,25 @@ public void testCreateBranch() throws NoSuchTableException { @Test public void testCreateBranchOnEmptyTable() { - Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "Cannot complete create or replace branch operation on %s, main has no snapshot", - tableName); + String branchName = "b1"; + sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); } @Test @@ -529,6 +544,29 @@ public void createOrReplace() throws NoSuchTableException { assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second); } + @Test + public void testCreateOrReplaceBranchOnEmptyTable() { + String branchName = "b1"; + sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); + } + @Test public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException { Table table = insertRows(); diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index 142ed1357135..ecf1489e0854 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec( .map(java.lang.Long.valueOf) .orNull - Preconditions.checkArgument(snapshotId != null, - "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) - val manageSnapshots = iceberg.table().manageSnapshots() val refExists = null != iceberg.table().refs().get(branch) + def safeCreateBranch(): Unit = { + if (snapshotId == null) { + manageSnapshots.createBranch(branch) + } else { + manageSnapshots.createBranch(branch, snapshotId) + } + } + if (create && replace && !refExists) { - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } else if (replace) { + Preconditions.checkArgument(snapshotId != null, + "Cannot complete replace branch operation on %s, main has no snapshot", ident) manageSnapshots.replaceBranch(branch, snapshotId) } else { if (refExists && ifNotExists) { return Nil } - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } if (branchOptions.numSnapshots.nonEmpty) { diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 2c3cbac02820..adf82d01cdc4 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -101,11 +102,25 @@ public void testCreateBranch() throws NoSuchTableException { @Test public void testCreateBranchOnEmptyTable() { - Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "Cannot complete create or replace branch operation on %s, main has no snapshot", - tableName); + String branchName = "b1"; + sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); } @Test @@ -530,6 +545,29 @@ public void createOrReplace() throws NoSuchTableException { assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second); } + @Test + public void testCreateOrReplaceBranchOnEmptyTable() { + String branchName = "b1"; + sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); + } + @Test public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException { Table table = insertRows(); diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index d4328d4b9227..2be406e7f344 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec( .map(java.lang.Long.valueOf) .orNull - Preconditions.checkArgument(snapshotId != null, - "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) - val manageSnapshots = iceberg.table().manageSnapshots() val refExists = null != iceberg.table().refs().get(branch) + def safeCreateBranch(): Unit = { + if (snapshotId == null) { + manageSnapshots.createBranch(branch) + } else { + manageSnapshots.createBranch(branch, snapshotId) + } + } + if (create && replace && !refExists) { - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } else if (replace) { + Preconditions.checkArgument(snapshotId != null, + "Cannot complete replace branch operation on %s, main has no snapshot", ident) manageSnapshots.replaceBranch(branch, snapshotId) } else { if (refExists && ifNotExists) { return Nil } - manageSnapshots.createBranch(branch, snapshotId) + safeCreateBranch() } if (branchOptions.numSnapshots.nonEmpty) { diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index fcc124dee5b9..e6ac4cfc4dac 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -94,11 +95,25 @@ public void testCreateBranch() throws NoSuchTableException { @Test public void testCreateBranchOnEmptyTable() { - Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "Cannot complete create or replace branch operation on %s, main has no snapshot", - tableName); + String branchName = "b1"; + sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); } @Test @@ -310,6 +325,29 @@ public void createOrReplace() throws NoSuchTableException { assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second); } + @Test + public void testCreateOrReplaceBranchOnEmptyTable() { + String branchName = "b1"; + sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1"); + Table table = validationCatalog.loadTable(tableIdent); + + SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH); + Assertions.assertThat(mainRef).isNull(); + + SnapshotRef ref = table.refs().get(branchName); + Assertions.assertThat(ref).isNotNull(); + Assertions.assertThat(ref.minSnapshotsToKeep()).isNull(); + Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull(); + Assertions.assertThat(ref.maxRefAgeMs()).isNull(); + + Snapshot snapshot = table.snapshot(ref.snapshotId()); + Assertions.assertThat(snapshot.parentId()).isNull(); + Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty(); + Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty(); + } + @Test public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException { Table table = insertRows();