From 6d6093985c6f16b4e1399777432f6341f684a883 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Tue, 23 Aug 2022 16:17:58 -0700 Subject: [PATCH 1/4] Removed deprecations for overwrite files and rollback --- .../org/apache/iceberg/OverwriteFiles.java | 26 --------- .../java/org/apache/iceberg/Rollback.java | 55 ------------------- .../java/org/apache/iceberg/RowDelta.java | 28 ---------- .../main/java/org/apache/iceberg/Table.java | 9 --- .../org/apache/iceberg/BaseMetadataTable.java | 5 -- .../java/org/apache/iceberg/BaseTable.java | 5 -- .../org/apache/iceberg/BaseTransaction.java | 23 +++----- .../apache/iceberg/RollbackToSnapshot.java | 38 ------------- .../org/apache/iceberg/SerializableTable.java | 5 -- .../iceberg/TestIncrementalDataTableScan.java | 4 +- .../java/org/apache/iceberg/TestRowDelta.java | 33 ++++++----- .../source/TestIcebergSourceTablesBase.java | 16 +++--- 12 files changed, 38 insertions(+), 209 deletions(-) delete mode 100644 api/src/main/java/org/apache/iceberg/Rollback.java delete mode 100644 core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java index c51f96485ee1..20f61ac591a7 100644 --- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java @@ -105,32 +105,6 @@ public interface OverwriteFiles extends SnapshotUpdate { */ OverwriteFiles caseSensitive(boolean caseSensitive); - /** - * Enables validation that data files added concurrently do not conflict with this commit's - * operation. - * - *

This method should be called while committing non-idempotent overwrite operations. If a - * concurrent operation commits a new file after the data was read and that file might contain - * rows matching the specified conflict detection filter, the overwrite operation will detect this - * and fail. - * - *

Calling this method with a correct conflict detection filter is required to maintain - * serializable isolation for overwrite operations. Otherwise, the isolation level will be - * snapshot isolation. - * - *

Validation applies to files added to the table since the snapshot passed to {@link - * #validateFromSnapshot(long)}. - * - * @param conflictDetectionFilter an expression on rows in the table - * @return this for method chaining - * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link - * #conflictDetectionFilter(Expression)} and {@link #validateNoConflictingData()} instead. - */ - @Deprecated - default OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter) { - return conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData(); - } - /** * Sets a conflict detection filter used to validate concurrently added data and delete files. * diff --git a/api/src/main/java/org/apache/iceberg/Rollback.java b/api/src/main/java/org/apache/iceberg/Rollback.java deleted file mode 100644 index 48060beef35b..000000000000 --- a/api/src/main/java/org/apache/iceberg/Rollback.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import org.apache.iceberg.exceptions.CommitFailedException; - -/** - * API for rolling table data back to the state at an older table {@link Snapshot snapshot}. - * - *

This API does not allow conflicting calls to {@link #toSnapshotId(long)} and {@link - * #toSnapshotAtTime(long)}. - * - *

When committing, these changes will be applied to the current table metadata. Commit conflicts - * will not be resolved and will result in a {@link CommitFailedException}. - */ -public interface Rollback extends PendingUpdate { - - /** - * Roll this table's data back to a specific {@link Snapshot} identified by id. - * - * @param snapshotId long id of the snapshot to roll back table data to - * @return this for method chaining - * @throws IllegalArgumentException If the table has no snapshot with the given id - * @deprecated Replaced by {@link ManageSnapshots#setCurrentSnapshot(long)} - */ - @Deprecated - Rollback toSnapshotId(long snapshotId); - - /** - * Roll this table's data back to the last {@link Snapshot} before the given timestamp. - * - * @param timestampMillis a long timestamp, as returned by {@link System#currentTimeMillis()} - * @return this for method chaining - * @throws IllegalArgumentException If the table has no old snapshot before the given timestamp - * @deprecated Replaced by {@link ManageSnapshots#rollbackToTime(long)} - */ - @Deprecated - Rollback toSnapshotAtTime(long timestampMillis); -} diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index b8a44a602004..624f6c15d20b 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -94,34 +94,6 @@ public interface RowDelta extends SnapshotUpdate { */ RowDelta validateDeletedFiles(); - /** - * Enables validation that data files added concurrently do not conflict with this commit's - * operation. - * - *

This method should be called when the table is queried to determine which files to - * delete/append. If a concurrent operation commits a new file after the data was read and that - * file might contain rows matching the specified conflict detection filter, the overwrite - * operation will detect this during retries and fail. - * - *

Calling this method with a correct conflict detection filter is required to maintain - * serializable isolation for update/delete operations. Otherwise, the isolation level will be - * snapshot isolation. - * - *

Validation applies to files added to the table since the snapshot passed to {@link - * #validateFromSnapshot(long)}. - * - * @param conflictDetectionFilter an expression on rows in the table - * @return this for method chaining - * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link - * #conflictDetectionFilter(Expression)} and {@link #validateNoConflictingDataFiles()} - * instead. - */ - @Deprecated - default RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter) { - conflictDetectionFilter(conflictDetectionFilter); - return validateNoConflictingDataFiles(); - } - /** * Sets a conflict detection filter used to validate concurrently added data and delete files. * diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 7964bd22c076..ef8fbc22ea82 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -270,15 +270,6 @@ default AppendFiles newFastAppend() { */ ExpireSnapshots expireSnapshots(); - /** - * Create a new {@link Rollback rollback API} to roll back to a previous snapshot and commit. - * - * @return a new {@link Rollback} - * @deprecated Replaced by {@link #manageSnapshots()} - */ - @Deprecated - Rollback rollback(); - /** * Create a new {@link ManageSnapshots manage snapshots API} to manage snapshots in this table and * commit. diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index c6615862de57..920551b67f16 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -228,11 +228,6 @@ public ExpireSnapshots expireSnapshots() { throw new UnsupportedOperationException("Cannot expire snapshots from a metadata table"); } - @Override - public Rollback rollback() { - throw new UnsupportedOperationException("Cannot roll back a metadata table"); - } - @Override public ManageSnapshots manageSnapshots() { throw new UnsupportedOperationException("Cannot manage snapshots in a metadata table"); diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 48a78cce3b65..550dc39a18b7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -214,11 +214,6 @@ public ExpireSnapshots expireSnapshots() { return new RemoveSnapshots(ops); } - @Override - public Rollback rollback() { - return new RollbackToSnapshot(name, ops); - } - @Override public ManageSnapshots manageSnapshots() { return new SnapshotManager(name, ops); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index b162201cf567..64fb3268fe2f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -18,15 +18,6 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; - import java.io.Serializable; import java.util.List; import java.util.Map; @@ -48,6 +39,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + public class BaseTransaction implements Transaction { private static final Logger LOG = LoggerFactory.getLogger(BaseTransaction.class); @@ -716,11 +716,6 @@ public ExpireSnapshots expireSnapshots() { return BaseTransaction.this.expireSnapshots(); } - @Override - public Rollback rollback() { - throw new UnsupportedOperationException("Transaction tables do not support rollback"); - } - @Override public ManageSnapshots manageSnapshots() { throw new UnsupportedOperationException( diff --git a/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java b/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java deleted file mode 100644 index 499f6898a92f..000000000000 --- a/core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -class RollbackToSnapshot extends SnapshotManager implements Rollback { - - RollbackToSnapshot(String tableName, TableOperations ops) { - super(tableName, ops); - } - - @Override - public Rollback toSnapshotId(long snapshotId) { - super.setCurrentSnapshot(snapshotId); - return this; - } - - @Override - public Rollback toSnapshotAtTime(long timestampMillis) { - super.rollbackToTime(timestampMillis); - return this; - } -} diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index ddffdae14edd..7d3eb32ae6bb 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -337,11 +337,6 @@ public ExpireSnapshots expireSnapshots() { throw new UnsupportedOperationException(errorMsg("expireSnapshots")); } - @Override - public Rollback rollback() { - throw new UnsupportedOperationException(errorMsg("rollback")); - } - @Override public ManageSnapshots manageSnapshots() { throw new UnsupportedOperationException(errorMsg("manageSnapshots")); diff --git a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java index 1492070edeea..05f10ab61f15 100644 --- a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java +++ b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java @@ -190,7 +190,7 @@ public void testRollbacks() { add(table.newAppend(), files("B")); add(table.newAppend(), files("C")); // 3 // Go back to snapshot "B" - table.rollback().toSnapshotId(2).commit(); // 2 + table.manageSnapshots().rollbackTo(2).commit(); // 2 Assert.assertEquals(2, table.currentSnapshot().snapshotId()); filesMatch(Lists.newArrayList("B"), appendsBetweenScan(1, 2)); filesMatch(Lists.newArrayList("B"), appendsAfterScan(1)); @@ -201,7 +201,7 @@ public void testRollbacks() { add(transaction.newAppend(), files("F")); transaction.commitTransaction(); // Go back to snapshot "E" - table.rollback().toSnapshotId(5).commit(); + table.manageSnapshots().rollbackTo(2).commit(); Assert.assertEquals(5, table.currentSnapshot().snapshotId()); filesMatch(Lists.newArrayList("B", "D", "E"), appendsBetweenScan(1, 5)); filesMatch(Lists.newArrayList("B", "D", "E"), appendsAfterScan(1)); diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index e2929b470994..8b50e1a25394 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -18,15 +18,6 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.ADDED_POS_DELETES_PROP; -import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP; -import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_PREFIX; -import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; - import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +31,15 @@ import org.junit.Assert; import org.junit.Test; +import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_POS_DELETES_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_PREFIX; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; + public class TestRowDelta extends V2TableTestBase { @Test public void testAddDeleteFile() { @@ -350,7 +350,8 @@ public void testValidateNoConflicts() { .newRowDelta() .addDeletes(FILE_A_DELETES) .validateFromSnapshot(validateFromSnapshotId) - .validateNoConflictingAppends(Expressions.equal("data", "u")) // bucket16("u") -> 0 + .conflictDetectionFilter(Expressions.equal("data", "u")) // bucket16("u") -> 0 + .validateNoConflictingDataFiles() .commit()); Assert.assertEquals( @@ -383,7 +384,8 @@ public void testValidateNoConflictsFromSnapshot() { .validateDeletedFiles() .validateFromSnapshot(validateFromSnapshotId) .validateDataFilesExist(ImmutableList.of(FILE_A.path())) - .validateNoConflictingAppends(Expressions.equal("data", "u")) // bucket16("u") -> 0 + .conflictDetectionFilter(Expressions.equal("data", "u")) // bucket16("u") -> 0 + .validateNoConflictingDataFiles() .commit(); Snapshot snap = table.currentSnapshot(); @@ -741,7 +743,8 @@ public void testValidateDataFilesExistWithConflictDetectionFilter() { .validateDataFilesExist(ImmutableList.of(dataFile1.path())) .validateDeletedFiles() .validateFromSnapshot(baseSnapshot.snapshotId()) - .validateNoConflictingAppends(conflictDetectionFilter); + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles(); // concurrently delete the file for partition B table.newDelete().deleteFile(dataFile2).commit(); @@ -803,7 +806,8 @@ public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() { .validateDataFilesExist(ImmutableList.of(dataFile1.path())) .validateDeletedFiles() .validateFromSnapshot(baseSnapshot.snapshotId()) - .validateNoConflictingAppends(conflictDetectionFilter); + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles(); // concurrently delete the file for partition A table.newDelete().deleteFile(dataFile1).commit(); @@ -1077,7 +1081,8 @@ public void testConcurrentConflictingRowDelta() { .newRowDelta() .addDeletes(FILE_A_DELETES) .validateFromSnapshot(firstSnapshot.snapshotId()) - .validateNoConflictingAppends(conflictDetectionFilter) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles() .commit(); AssertHelpers.assertThrows( diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 3ded9471fe9c..36f647ad5689 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -18,11 +18,6 @@ */ package org.apache.iceberg.spark.source; -import static org.apache.iceberg.ManifestContent.DATA; -import static org.apache.iceberg.ManifestContent.DELETES; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; - import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; @@ -80,6 +75,11 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.iceberg.ManifestContent.DATA; +import static org.apache.iceberg.ManifestContent.DELETES; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + public abstract class TestIcebergSourceTablesBase extends SparkTestBase { private static final Schema SCHEMA = @@ -782,7 +782,7 @@ public void testHistoryTable() { long secondSnapshotId = table.currentSnapshot().snapshotId(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis(); inputDf @@ -869,7 +869,7 @@ public void testSnapshotsTable() { String secondManifestList = table.currentSnapshot().manifestListLocation(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); List actual = spark @@ -942,7 +942,7 @@ public void testPrunedSnapshotsTable() { long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); Dataset actualDf = spark From f1145f512050f95b32d7e376f3c0020fc247009b Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Tue, 23 Aug 2022 16:20:03 -0700 Subject: [PATCH 2/4] Spotless --- .../org/apache/iceberg/BaseTransaction.java | 18 +++++++++--------- .../java/org/apache/iceberg/TestRowDelta.java | 18 +++++++++--------- .../source/TestIcebergSourceTablesBase.java | 10 +++++----- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 64fb3268fe2f..6f45390ce7ea 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -18,6 +18,15 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + import java.io.Serializable; import java.util.List; import java.util.Map; @@ -39,15 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; - public class BaseTransaction implements Transaction { private static final Logger LOG = LoggerFactory.getLogger(BaseTransaction.class); diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 8b50e1a25394..8e15f1f6576c 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -18,6 +18,15 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_POS_DELETES_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_PREFIX; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; + import java.util.List; import java.util.Map; import java.util.Set; @@ -31,15 +40,6 @@ import org.junit.Assert; import org.junit.Test; -import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.ADDED_POS_DELETES_PROP; -import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP; -import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_PREFIX; -import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; -import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; - public class TestRowDelta extends V2TableTestBase { @Test public void testAddDeleteFile() { diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 36f647ad5689..5f01f89ebe1f 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -18,6 +18,11 @@ */ package org.apache.iceberg.spark.source; +import static org.apache.iceberg.ManifestContent.DATA; +import static org.apache.iceberg.ManifestContent.DELETES; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; @@ -75,11 +80,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.apache.iceberg.ManifestContent.DATA; -import static org.apache.iceberg.ManifestContent.DELETES; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; - public abstract class TestIcebergSourceTablesBase extends SparkTestBase { private static final Schema SCHEMA = From 7b28ddcd031f730ba765b2c66c093cb9dcabb0ac Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 26 Aug 2022 09:10:13 -0700 Subject: [PATCH 3/4] Fix tests and older spark versions --- .../org/apache/iceberg/TestIncrementalDataTableScan.java | 2 +- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 +++--- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 +++--- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 +++--- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 +++--- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java index 05f10ab61f15..1e6678fc339c 100644 --- a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java +++ b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java @@ -201,7 +201,7 @@ public void testRollbacks() { add(transaction.newAppend(), files("F")); transaction.commitTransaction(); // Go back to snapshot "E" - table.manageSnapshots().rollbackTo(2).commit(); + table.manageSnapshots().rollbackTo(5).commit(); Assert.assertEquals(5, table.currentSnapshot().snapshotId()); filesMatch(Lists.newArrayList("B", "D", "E"), appendsBetweenScan(1, 5)); filesMatch(Lists.newArrayList("B", "D", "E"), appendsAfterScan(1)); diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index d70e3dcbf181..5f3e84f61276 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -778,7 +778,7 @@ public void testHistoryTable() { long secondSnapshotId = table.currentSnapshot().snapshotId(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis(); inputDf @@ -864,7 +864,7 @@ public void testSnapshotsTable() { String secondManifestList = table.currentSnapshot().manifestListLocation(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); List actual = spark @@ -937,7 +937,7 @@ public void testPrunedSnapshotsTable() { long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); Dataset actualDf = spark diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 386e57b7877f..d97fa3d27a28 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -782,7 +782,7 @@ public void testHistoryTable() { long secondSnapshotId = table.currentSnapshot().snapshotId(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis(); inputDf @@ -868,7 +868,7 @@ public void testSnapshotsTable() { String secondManifestList = table.currentSnapshot().manifestListLocation(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); List actual = spark @@ -941,7 +941,7 @@ public void testPrunedSnapshotsTable() { long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); Dataset actualDf = spark diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 386e57b7877f..d97fa3d27a28 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -782,7 +782,7 @@ public void testHistoryTable() { long secondSnapshotId = table.currentSnapshot().snapshotId(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis(); inputDf @@ -868,7 +868,7 @@ public void testSnapshotsTable() { String secondManifestList = table.currentSnapshot().manifestListLocation(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); List actual = spark @@ -941,7 +941,7 @@ public void testPrunedSnapshotsTable() { long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); Dataset actualDf = spark diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 3ded9471fe9c..5f01f89ebe1f 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -782,7 +782,7 @@ public void testHistoryTable() { long secondSnapshotId = table.currentSnapshot().snapshotId(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis(); inputDf @@ -869,7 +869,7 @@ public void testSnapshotsTable() { String secondManifestList = table.currentSnapshot().manifestListLocation(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); List actual = spark @@ -942,7 +942,7 @@ public void testPrunedSnapshotsTable() { long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis(); // rollback the table state to the first snapshot - table.rollback().toSnapshotId(firstSnapshotId).commit(); + table.manageSnapshots().rollbackTo(firstSnapshotId).commit(); Dataset actualDf = spark From 3e21355cccd6772332d29fd7c5a632583ea8e0fd Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 26 Aug 2022 21:09:24 -0700 Subject: [PATCH 4/4] Accept breaking api changes --- .palantir/revapi.yml | 50 +++++++++++++------ .../examples/SnapshotFunctionalityTest.java | 2 +- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 1a2757c53501..3f8ec4b333a8 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -3,9 +3,21 @@ versionOverrides: acceptedBreaks: apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: + - code: "java.class.removed" + old: "interface org.apache.iceberg.Rollback" + justification: "Deprecations for 1.0 release" - code: "java.method.addedToInterface" new: "method java.lang.String org.apache.iceberg.expressions.Reference::name()" justification: "All subclasses implement name" + - code: "java.method.removed" + old: "method org.apache.iceberg.OverwriteFiles org.apache.iceberg.OverwriteFiles::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)" + justification: "Deprecations for 1.0 release" + - code: "java.method.removed" + old: "method org.apache.iceberg.Rollback org.apache.iceberg.Table::rollback()" + justification: "Deprecations for 1.0 release" + - code: "java.method.removed" + old: "method org.apache.iceberg.RowDelta org.apache.iceberg.RowDelta::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)" + justification: "Deprecations for 1.0 release" release-base-0.13.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" @@ -35,6 +47,10 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method ThisT org.apache.iceberg.SnapshotUpdate::toBranch(java.lang.String)" justification: "Adding toBranch API for supporting committing to a branch" + - code: "java.method.addedToInterface" + new: "method boolean org.apache.iceberg.Scan>::isCaseSensitive()" + justification: "Move a method to the parent interface" - code: "java.method.addedToInterface" new: "method boolean org.apache.iceberg.expressions.BoundTerm::isEquivalentTo(org.apache.iceberg.expressions.BoundTerm)" justification: "new API method" @@ -54,20 +70,25 @@ acceptedBreaks: new: "method java.util.List org.apache.iceberg.Snapshot::deleteManifests(org.apache.iceberg.io.FileIO)" justification: "Allow adding a new method to the interface - old method is deprecated" - code: "java.method.addedToInterface" - new: "method java.util.Map org.apache.iceberg.Table::refs()" + new: "method java.util.Map\ + \ org.apache.iceberg.Table::refs()" justification: "Adding new refs method to Table API for easier access" - - code: "java.method.addedToInterface" - new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedEqualityDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" - - code: "java.method.addedToInterface" - new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" - code: "java.method.addedToInterface" new: "method long org.apache.iceberg.actions.DeleteReachableFiles.Result::deletedEqualityDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" + justification: "Interface is backward compatible, very unlikely anyone implements\ + \ this Result bean interface" - code: "java.method.addedToInterface" new: "method long org.apache.iceberg.actions.DeleteReachableFiles.Result::deletedPositionDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" + justification: "Interface is backward compatible, very unlikely anyone implements\ + \ this Result bean interface" + - code: "java.method.addedToInterface" + new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedEqualityDeleteFilesCount()" + justification: "Interface is backward compatible, very unlikely anyone implements\ + \ this Result bean interface" + - code: "java.method.addedToInterface" + new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()" + justification: "Interface is backward compatible, very unlikely anyone implements\ + \ this Result bean interface" - code: "java.method.addedToInterface" new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::planWith(java.util.concurrent.ExecutorService)" justification: "Accept all changes prior to introducing API compatibility checks" @@ -126,6 +147,11 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method org.apache.iceberg.ReplacePartitions org.apache.iceberg.ReplacePartitions::validateNoConflictingDeletes()" justification: "Accept all changes prior to introducing API compatibility checks" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.expressions.Expression org.apache.iceberg.Scan>::filter()" + justification: "Move a method to the parent interface" - code: "java.method.numberOfParametersChanged" old: "method void org.apache.iceberg.events.IncrementalScanEvent::(java.lang.String,\ \ long, long, org.apache.iceberg.expressions.Expression, org.apache.iceberg.Schema)" @@ -134,9 +160,3 @@ acceptedBreaks: \ boolean)" justification: "IncrementalScanEvent should only be constructed by Iceberg code.\ \ Hence the change of constructor params shouldn't affect users" - - code: "java.method.addedToInterface" - new: "method org.apache.iceberg.expressions.Expression org.apache.iceberg.Scan>::filter()" - justification: "Move a method to the parent interface" - - code: "java.method.addedToInterface" - new: "method boolean org.apache.iceberg.Scan>::isCaseSensitive()" - justification: "Move a method to the parent interface" diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java index 3f3ca13c28ae..d6be87caf269 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java @@ -91,7 +91,7 @@ public void before() throws IOException { public void rollbackToPreviousSnapshotAndReadData() { long oldId = table.history().get(0).snapshotId(); - table.rollback().toSnapshotId(oldId).commit(); + table.manageSnapshots().rollbackTo(oldId).commit(); table.refresh(); Dataset results = spark.read().format("iceberg").load(tableLocation.toString());